gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (5c9c418d2 -> caf375948)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (5c9c418d2 -> caf375948)
Date: Thu, 23 Feb 2017 17:47:44 +0100

This is an automated email from the git hooks/post-receive script.

dold pushed a change to branch master
in repository gnunet.

    from 5c9c418d2 limit queue size CORE maintains for any client, warn if we 
exceed it
     new 5a2d590a6 abort union if we receive too little fresh elements
     new 0581ec4f8 comments
     new d5af1252b add set operation options
     new caf375948 implement union via sending whole set

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/consensus/gnunet-service-consensus.c           |   2 +
 src/dv/gnunet-service-dv.c                         |   2 +
 src/include/gnunet_protocols.h                     |  13 +
 src/include/gnunet_set_service.h                   |  51 +++
 src/revocation/gnunet-service-revocation.c         |   2 +
 .../gnunet-service-scalarproduct-ecc_alice.c       |   1 +
 .../gnunet-service-scalarproduct-ecc_bob.c         |   1 +
 .../gnunet-service-scalarproduct_alice.c           |   1 +
 .../gnunet-service-scalarproduct_bob.c             |   1 +
 src/set/gnunet-service-set.c                       |  16 +
 src/set/gnunet-service-set_union.c                 | 383 +++++++++++++++++++--
 src/set/gnunet-set-profiler.c                      |   4 +-
 src/set/set_api.c                                  |   2 +
 src/set/test_set.conf                              |   2 +-
 src/set/test_set_api.c                             |   3 +
 src/set/test_set_intersection_result_full.c        |   2 +
 src/set/test_set_union_result_symmetric.c          |   2 +
 17 files changed, 452 insertions(+), 36 deletions(-)

diff --git a/src/consensus/gnunet-service-consensus.c 
b/src/consensus/gnunet-service-consensus.c
index 16ca6a57f..4036d2b11 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -2013,6 +2013,7 @@ task_start_reconcile (struct TaskEntry *task)
                                     &session->global_id,
                                     &rcm.header,
                                     GNUNET_SET_RESULT_SYMMETRIC,
+                                    (struct GNUNET_SET_Option[]) { 0 },
                                     set_result_cb,
                                     task);
 
@@ -2439,6 +2440,7 @@ set_listen_cb (void *cls,
 
   task->cls.setop.op = GNUNET_SET_accept (request,
                                           GNUNET_SET_RESULT_SYMMETRIC,
+                                          (struct GNUNET_SET_Option[]) { 0 },
                                           set_result_cb,
                                           task);
 
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index 6adaa04d9..e4b664f4b 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -1528,6 +1528,7 @@ listen_set_union (void *cls,
                                        GNUNET_SET_OPERATION_UNION);
   neighbor->set_op = GNUNET_SET_accept (request,
                                        GNUNET_SET_RESULT_ADDED,
+                                        (struct GNUNET_SET_Option[]) { 0 },
                                        &handle_set_union_result,
                                        neighbor);
   neighbor->consensus_insertion_offset = 0;
@@ -1558,6 +1559,7 @@ initiate_set_union (void *cls)
                                          &neighbor->real_session_id,
                                          NULL,
                                          GNUNET_SET_RESULT_ADDED,
+                                         (struct GNUNET_SET_Option[]) { 0 },
                                          &handle_set_union_result,
                                          neighbor);
   neighbor->consensus_insertion_offset = 0;
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index eeb9a8a92..f478edd27 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1795,6 +1795,19 @@ extern "C"
  */
 #define GNUNET_MESSAGE_TYPE_SET_COPY_LAZY_CONNECT 596
 
+/**
+ * Request all missing elements from the other peer,
+ * based on their sets and the elements we previously sent
+ * with #GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE 597
+
+/**
+ * Send a set element, not as response to a demand but because
+ * we're sending the full set.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT 598
+
 
 
/*******************************************************************************
  * TESTBED LOGGER message types
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h
index 52613ad59..f9b477f47 100644
--- a/src/include/gnunet_set_service.h
+++ b/src/include/gnunet_set_service.h
@@ -153,6 +153,7 @@ enum GNUNET_SET_Status
 };
 
 
+
 /**
  * The way results are given to the client.
  */
@@ -212,6 +213,54 @@ struct GNUNET_SET_Element
 
 
 /**
+ * Possible options to pass to a set operation.
+ *
+ * Used as tag for struct #GNUNET_SET_Option.
+ */
+enum GNUNET_SET_OptionType
+{
+  /**
+   * Fail set operations when the other peer shows weird behavior
+   * that might by a Byzantine fault.
+   *
+   * For set union, 'v.num' is a lower bound on elements
+   * that the other peer must have in common with us.
+   */
+  GNUNET_SET_OPTION_BYZANTINE=1,
+  /**
+   * Do not use the optimized set operation, but send full sets.
+   * Might trigger Byzantine fault detection.
+   */
+  GNUNET_SET_OPTION_FORCE_FULL=2,
+  /**
+   * Only use optimized set operations, even though for this
+   * particular set operation they might be much slower.
+   * Might trigger Byzantine fault detection.
+   */
+  GNUNET_SET_OPTION_FORCE_DELTA=4,
+};
+
+
+/**
+ * Option for set operations.
+ */
+struct GNUNET_SET_Option
+{
+  /**
+   * Type of the option.
+   */
+  enum GNUNET_SET_OptionType type;
+
+  /**
+   * Value for the option, only used with some options.
+   */
+  union {
+    uint64_t num;
+  } v;
+};
+
+
+/**
  * Continuation used for some of the set operations
  *
  * @param cls closure
@@ -367,6 +416,7 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity 
*other_peer,
                     const struct GNUNET_HashCode *app_id,
                     const struct GNUNET_MessageHeader *context_msg,
                     enum GNUNET_SET_ResultMode result_mode,
+                    struct GNUNET_SET_Option options[],
                     GNUNET_SET_ResultIterator result_cb,
                     void *result_cls);
 
@@ -420,6 +470,7 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle 
*lh);
 struct GNUNET_SET_OperationHandle *
 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
                    enum GNUNET_SET_ResultMode result_mode,
+                   struct GNUNET_SET_Option options[],
                    GNUNET_SET_ResultIterator result_cb,
                    void *result_cls);
 
diff --git a/src/revocation/gnunet-service-revocation.c 
b/src/revocation/gnunet-service-revocation.c
index 2965808fa..b5669d9ea 100644
--- a/src/revocation/gnunet-service-revocation.c
+++ b/src/revocation/gnunet-service-revocation.c
@@ -509,6 +509,7 @@ transmit_task_cb (void *cls)
                                        &revocation_set_union_app_id,
                                        NULL,
                                        GNUNET_SET_RESULT_ADDED,
+                                       (struct GNUNET_SET_Option[]) { 0 },
                                        &add_revocation,
                                        peer_entry);
   if (GNUNET_OK !=
@@ -755,6 +756,7 @@ handle_revocation_union_request (void *cls,
   }
   peer_entry->so = GNUNET_SET_accept (request,
                                       GNUNET_SET_RESULT_ADDED,
+                                      (struct GNUNET_SET_Option[]) { 0 },
                                       &add_revocation,
                                       peer_entry);
   if (GNUNET_OK !=
diff --git a/src/scalarproduct/gnunet-service-scalarproduct-ecc_alice.c 
b/src/scalarproduct/gnunet-service-scalarproduct-ecc_alice.c
index 9f8d98657..34149435c 100644
--- a/src/scalarproduct/gnunet-service-scalarproduct-ecc_alice.c
+++ b/src/scalarproduct/gnunet-service-scalarproduct-ecc_alice.c
@@ -788,6 +788,7 @@ cb_intersection_request_alice (void *cls,
   s->intersection_op
     = GNUNET_SET_accept (request,
                          GNUNET_SET_RESULT_REMOVED,
+                         (struct GNUNET_SET_Option[]) { 0 },
                          &cb_intersection_element_removed,
                          s);
   if (NULL == s->intersection_op)
diff --git a/src/scalarproduct/gnunet-service-scalarproduct-ecc_bob.c 
b/src/scalarproduct/gnunet-service-scalarproduct-ecc_bob.c
index 7fd69a4ea..db8241bb7 100644
--- a/src/scalarproduct/gnunet-service-scalarproduct-ecc_bob.c
+++ b/src/scalarproduct/gnunet-service-scalarproduct-ecc_bob.c
@@ -670,6 +670,7 @@ start_intersection (struct BobServiceSession *s)
                           &set_sid,
                           NULL,
                           GNUNET_SET_RESULT_REMOVED,
+                          (struct GNUNET_SET_Option[]) { 0 },
                           &cb_intersection_element_removed,
                           s);
   if (GNUNET_OK !=
diff --git a/src/scalarproduct/gnunet-service-scalarproduct_alice.c 
b/src/scalarproduct/gnunet-service-scalarproduct_alice.c
index 779d84b60..f99ff6168 100644
--- a/src/scalarproduct/gnunet-service-scalarproduct_alice.c
+++ b/src/scalarproduct/gnunet-service-scalarproduct_alice.c
@@ -1022,6 +1022,7 @@ cb_intersection_request_alice (void *cls,
   s->intersection_op
     = GNUNET_SET_accept (request,
                          GNUNET_SET_RESULT_REMOVED,
+                         (struct GNUNET_SET_Option[]) { 0 },
                          &cb_intersection_element_removed,
                          s);
   if (NULL == s->intersection_op)
diff --git a/src/scalarproduct/gnunet-service-scalarproduct_bob.c 
b/src/scalarproduct/gnunet-service-scalarproduct_bob.c
index a2bceba43..4da2ba50e 100644
--- a/src/scalarproduct/gnunet-service-scalarproduct_bob.c
+++ b/src/scalarproduct/gnunet-service-scalarproduct_bob.c
@@ -964,6 +964,7 @@ start_intersection (struct BobServiceSession *s)
                           &s->session_id,
                           NULL,
                           GNUNET_SET_RESULT_REMOVED,
+                          (struct GNUNET_SET_Option[]) { 0 },
                           &cb_intersection_element_removed,
                           s);
   if (GNUNET_OK !=
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index a545e8a06..1072407f1 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -1371,6 +1371,10 @@ handle_client_listen (void *cls,
                            struct GNUNET_MessageHeader,
                            NULL),
     GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
                            struct GNUNET_MessageHeader,
                            NULL),
@@ -1379,6 +1383,10 @@ handle_client_listen (void *cls,
                            struct GNUNET_MessageHeader,
                            NULL),
     GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_hd_var_size (p2p_message,
                            
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
                            struct GNUNET_MessageHeader,
                            NULL),
@@ -1634,6 +1642,14 @@ handle_client_evaluate (void *cls,
                            struct GNUNET_MessageHeader,
                            op),
     GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                           struct GNUNET_MessageHeader,
+                           op),
+    GNUNET_MQ_hd_var_size (p2p_message,
+                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
+                           struct GNUNET_MessageHeader,
+                           op),
+    GNUNET_MQ_hd_var_size (p2p_message,
                            
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
                            struct GNUNET_MessageHeader,
                            op),
diff --git a/src/set/gnunet-service-set_union.c 
b/src/set/gnunet-service-set_union.c
index acaabd94a..d2dfe049b 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -115,14 +115,22 @@ enum UnionOperationPhase
    * In the penultimate phase,
    * we wait until all our demands
    * are satisfied.  Then we send a done
-   * message, and wait for another done message.*/
+   * message, and wait for another done message.
+   */
   PHASE_FINISH_WAITING,
 
   /**
    * In the ultimate phase, we wait until
    * our demands are satisfied and then
-   * quit (sending another DONE message). */
-  PHASE_DONE
+   * quit (sending another DONE message).
+   */
+  PHASE_DONE,
+
+  /**
+   * After sending the full set, wait for responses with the elements
+   * that the local peer is missing.
+   */
+  PHASE_FULL_SENDING,
 };
 
 
@@ -148,7 +156,7 @@ struct OperationState
   struct InvertibleBloomFilter *local_ibf;
 
   /**
-   * Maps IBF-Keys (specific to the current salt) to elements.
+   * Maps unsalted IBF-Keys to elements.
    * Used as a multihashmap, the keys being the lower 32bit of the IBF-Key.
    * Colliding IBF-Keys are linked.
    */
@@ -183,6 +191,17 @@ struct OperationState
    * Salt for the IBF we've received and that we're currently decoding.
    */
   uint32_t salt_receive;
+
+  /**
+   * Number of elements we received from the other peer
+   * that were not in the local set yet.
+   */
+  uint32_t received_fresh;
+
+  /**
+   * Total number of elements received from the other peer.
+   */
+  uint32_t received_total;
 };
 
 
@@ -203,6 +222,14 @@ struct KeyEntry
    * is #GNUNET_YES.
    */
   struct ElementEntry *element;
+
+  /**
+   * Did we receive this element?
+   * Even if element->is_foreign is false, we might
+   * have received the element, so this indicates that
+   * the other peer has it.
+   */
+  int received;
 };
 
 
@@ -362,6 +389,16 @@ get_ibf_key (const struct GNUNET_HashCode *src)
 
 
 /**
+ * Context for #op_get_element_iterator
+ */
+struct GetElementContext
+{
+  struct GNUNET_HashCode hash;
+  struct KeyEntry *k;
+};
+
+
+/**
  * Iterator over the mapping from IBF keys to element entries.  Checks if we
  * have an element with a given GNUNET_HashCode.
  *
@@ -372,17 +409,20 @@ get_ibf_key (const struct GNUNET_HashCode *src)
  *         #GNUNET_NO if we've found the element.
  */
 static int
-op_has_element_iterator (void *cls,
+op_get_element_iterator (void *cls,
                          uint32_t key,
                          void *value)
 {
-  struct GNUNET_HashCode *element_hash = cls;
+  struct GetElementContext *ctx = cls;
   struct KeyEntry *k = value;
 
   GNUNET_assert (NULL != k);
   if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
-                                   element_hash))
+                                   &ctx->hash))
+  {
+    ctx->k = k;
     return GNUNET_NO;
+  }
   return GNUNET_YES;
 }
 
@@ -395,23 +435,29 @@ op_has_element_iterator (void *cls,
  * @param element_hash hash of the element to look for
  * @return #GNUNET_YES if the element has been found, #GNUNET_NO otherwise
  */
-static int
-op_has_element (struct Operation *op,
+static struct KeyEntry *
+op_get_element (struct Operation *op,
                 const struct GNUNET_HashCode *element_hash)
 {
   int ret;
   struct IBF_Key ibf_key;
+  struct GetElementContext ctx = { 0 };
+
+  ctx.hash = *element_hash;
 
   ibf_key = get_ibf_key (element_hash);
   ret = GNUNET_CONTAINER_multihashmap32_get_multiple 
(op->state->key_to_element,
                                                       (uint32_t) 
ibf_key.key_val,
-                                                      op_has_element_iterator,
-                                                      (void *) element_hash);
+                                                      op_get_element_iterator,
+                                                      &ctx);
 
   /* was the iteration aborted because we found the element? */
   if (GNUNET_SYSERR == ret)
-    return GNUNET_YES;
-  return GNUNET_NO;
+  {
+    GNUNET_assert (NULL != ctx.k);
+    return ctx.k;
+  }
+  return NULL;
 }
 
 
@@ -427,10 +473,12 @@ op_has_element (struct Operation *op,
  *
  * @param op the union operation
  * @param ee the element entry
+ * @parem received was this element received from the remote peer?
  */
 static void
 op_register_element (struct Operation *op,
-                     struct ElementEntry *ee)
+                     struct ElementEntry *ee,
+                     int received)
 {
   struct IBF_Key ibf_key;
   struct KeyEntry *k;
@@ -439,6 +487,7 @@ op_register_element (struct Operation *op,
   k = GNUNET_new (struct KeyEntry);
   k->element = ee;
   k->ibf_key = ibf_key;
+  k->received = received;
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap32_put 
(op->state->key_to_element,
                                                       (uint32_t) 
ibf_key.key_val,
@@ -524,12 +573,30 @@ init_key_to_element_iterator (void *cls,
 
   GNUNET_assert (GNUNET_NO == ee->remote);
 
-  op_register_element (op, ee);
+  op_register_element (op, ee, GNUNET_NO);
   return GNUNET_YES;
 }
 
 
 /**
+ * Initialize the IBF key to element mapping local to this set
+ * operation.
+ *
+ * @param op the set union operation
+ */
+static void
+initialize_key_to_element (struct Operation *op)
+{
+  unsigned int len;
+
+  GNUNET_assert (NULL == op->state->key_to_element);
+  len = GNUNET_CONTAINER_multihashmap_size (op->spec->set->content->elements);
+  op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
+  GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements, 
init_key_to_element_iterator, op);
+}
+
+
+/**
  * Create an ibf with the operation's elements
  * of the specified size
  *
@@ -541,15 +608,8 @@ static int
 prepare_ibf (struct Operation *op,
              uint32_t size)
 {
-  if (NULL == op->state->key_to_element)
-  {
-    unsigned int len;
+  GNUNET_assert (NULL != op->state->key_to_element);
 
-    len = GNUNET_CONTAINER_multihashmap_size 
(op->spec->set->content->elements);
-    op->state->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 
1);
-    GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
-                                           init_key_to_element_iterator, op);
-  }
   if (NULL != op->state->local_ibf)
     ibf_destroy (op->state->local_ibf);
   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
@@ -698,6 +758,47 @@ get_order_from_difference (unsigned int diff)
 
 
 /**
+ * Send a set element.
+ *
+ * @param cls the union operation `struct Operation *`
+ * @param key unused
+ * @param value the `struct ElementEntry *` to insert
+ *        into the key-to-element mapping
+ * @return #GNUNET_YES (to continue iterating)
+ */
+static int
+send_element_iterator (void *cls,
+                       const struct GNUNET_HashCode *key,
+                       void *value)
+{
+  struct Operation *op = cls;
+  struct GNUNET_SET_ElementMessage *emsg;
+  struct GNUNET_SET_Element *el = value;
+  struct GNUNET_MQ_Envelope *ev;
+
+  ev = GNUNET_MQ_msg_extra (emsg, el->size, 
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  emsg->element_type = htonl (el->element_type);
+  GNUNET_memcpy (&emsg[1], el->data, el->size);
+  GNUNET_MQ_send (op->mq, ev);
+  return GNUNET_YES;
+}
+
+
+static void
+send_full_set (struct Operation *op)
+{
+  struct GNUNET_MQ_Envelope *ev;
+
+  op->state->phase = PHASE_FULL_SENDING;
+
+  (void) GNUNET_CONTAINER_multihashmap_iterate 
(op->spec->set->content->elements,
+                                                &send_element_iterator, op);
+  ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+  GNUNET_MQ_send (op->mq, ev);
+}
+
+
+/**
  * Handle a strata estimator from a remote peer
  *
  * @param cls the union operation
@@ -765,16 +866,29 @@ handle_p2p_strata_estimator (void *cls,
        "got se diff=%d, using ibf size %d\n",
        diff,
        1<<get_order_from_difference (diff));
-  if (GNUNET_OK !=
-      send_ibf (op,
-                get_order_from_difference (diff)))
+
+  if (diff > GNUNET_CONTAINER_multihashmap_size 
(op->spec->set->content->elements) / 2)
   {
-    /* Internal error, best we can do is shut the connection */
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to send IBF, closing connection\n");
-    fail_union_operation (op);
-    return GNUNET_SYSERR;
+    LOG (GNUNET_ERROR_TYPE_INFO,
+         "Sending full set (diff=%d, own set=%u)\n",
+         diff,
+         GNUNET_CONTAINER_multihashmap_size 
(op->spec->set->content->elements));
+    send_full_set (op);
   }
+  else
+  {
+    if (GNUNET_OK !=
+        send_ibf (op,
+                  get_order_from_difference (diff)))
+    {
+      /* Internal error, best we can do is shut the connection */
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to send IBF, closing connection\n");
+      fail_union_operation (op);
+      return GNUNET_SYSERR;
+    }
+  }
+
   return GNUNET_OK;
 }
 
@@ -1210,6 +1324,8 @@ maybe_finish (struct Operation *op)
 
 /**
  * Handle an element message from a remote peer.
+ * Sent by the other peer either because we decoded an IBF and placed a demand,
+ * or because the other peer switched to full set transmission.
  *
  * @param cls the union operation
  * @param mh the message
@@ -1273,7 +1389,11 @@ handle_p2p_elements (void *cls,
                             1,
                             GNUNET_NO);
 
-  if (GNUNET_YES == op_has_element (op, &ee->element_hash))
+  op->state->received_total += 1;
+
+  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+  if (NULL != ke)
   {
     /* Got repeated element.  Should not happen since
      * we track demands. */
@@ -1281,13 +1401,15 @@ handle_p2p_elements (void *cls,
                               "# repeated elements",
                               1,
                               GNUNET_NO);
+    ke->received = GNUNET_YES;
     GNUNET_free (ee);
   }
   else
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Registering new element from remote peer\n");
-    op_register_element (op, ee);
+    op->state->received_fresh += 1;
+    op_register_element (op, ee, GNUNET_YES);
     /* only send results immediately if the client wants it */
     switch (op->spec->result_mode)
     {
@@ -1304,11 +1426,112 @@ handle_p2p_elements (void *cls,
     }
   }
 
+  if (op->state->received_total > 8 && op->state->received_fresh < 
op->state->received_total / 3)
+  {
+    /* The other peer gave us lots of old elements, there's something wrong. */
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
   maybe_finish (op);
 }
 
 
 /**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_p2p_full_element (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  const struct GNUNET_SET_ElementMessage *emsg;
+  uint16_t element_size;
+
+  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+
+  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+  ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
+  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+  ee->element.size = element_size;
+  ee->element.data = &ee[1];
+  ee->element.element_type = ntohs (emsg->element_type);
+  ee->remote = GNUNET_YES;
+  GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Got element (full diff, size %u, hash %s) from peer\n",
+       (unsigned int) element_size,
+       GNUNET_h2s (&ee->element_hash));
+
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# received elements",
+                            1,
+                            GNUNET_NO);
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# exchanged elements",
+                            1,
+                            GNUNET_NO);
+
+  op->state->received_total += 1;
+
+  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+
+  if (NULL != ke)
+  {
+    /* Got repeated element.  Should not happen since
+     * we track demands. */
+    GNUNET_STATISTICS_update (_GSS_statistics,
+                              "# repeated elements",
+                              1,
+                              GNUNET_NO);
+    ke->received = GNUNET_YES;
+    GNUNET_free (ee);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Registering new element from remote peer\n");
+    op->state->received_fresh += 1;
+    op_register_element (op, ee, GNUNET_YES);
+    /* only send results immediately if the client wants it */
+    switch (op->spec->result_mode)
+    {
+      case GNUNET_SET_RESULT_ADDED:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
+        break;
+      case GNUNET_SET_RESULT_SYMMETRIC:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
+        break;
+      default:
+        /* Result mode not supported, should have been caught earlier. */
+        GNUNET_break (0);
+        break;
+    }
+  }
+
+  if (op->state->received_total > 8 && op->state->received_fresh < 
op->state->received_total / 3)
+  {
+    /* The other peer gave us lots of old elements, there's something wrong. */
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+}
+
+/**
  * Send offers (for GNUNET_Hash-es) in response
  * to inquiries (for IBF_Key-s).
  *
@@ -1355,7 +1578,90 @@ handle_p2p_inquiry (void *cls,
 
 
 /**
- * FIXME
+ * Iterator over hash map entries, called to
+ * destroy the linked list of colliding ibf key entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return #GNUNET_YES if we should continue to iterate,
+ *         #GNUNET_NO if not.
+ */
+static int
+send_missing_elements_iter (void *cls,
+                            uint32_t key,
+                            void *value)
+{
+  struct Operation *op = cls;
+  struct KeyEntry *ke = value;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_SET_ElementMessage *emsg;
+  struct ElementEntry *ee = ke->element;
+
+  if (GNUNET_YES == ke->received)
+    return GNUNET_YES;
+
+  ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, 
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT);
+  GNUNET_memcpy (&emsg[1], ee->element.data, ee->element.size);
+  emsg->reserved = htons (0);
+  emsg->element_type = htons (ee->element.element_type);
+  GNUNET_MQ_send (op->mq, ev);
+
+  return GNUNET_YES;
+}
+
+/**
+ * Handle a "full done" message.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ */
+static void
+handle_p2p_full_done (void *cls,
+                      const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+
+  if (PHASE_EXPECT_IBF == op->state->phase)
+  {
+    struct GNUNET_MQ_Envelope *ev;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other 
peer is missing\n");
+
+    /* send all the elements that did not come from the remote peer */
+    GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                             &send_missing_elements_iter,
+                                             op);
+
+    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+    GNUNET_MQ_send (op->mq, ev);
+    op->state->phase = PHASE_DONE;
+
+    /* we now wait until the other peer shuts the tunnel down*/
+  }
+  else if (PHASE_FULL_SENDING == op->state->phase)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
+    /* We sent the full set, and got the response for that.  We're done. */
+    op->state->phase = PHASE_DONE;
+    send_done_and_destroy (op);
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", 
(unsigned) op->state->phase);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+}
+
+
+/**
+ * Handle a demand by the other peer for elements based on a list
+ * of GNUNET_HashCode-s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
  */
 static void
 handle_p2p_demand (void *cls,
@@ -1607,6 +1913,8 @@ union_evaluate (struct Operation *op,
   else
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "sent op request without context message\n");
+
+  initialize_key_to_element (op);
 }
 
 
@@ -1636,6 +1944,7 @@ union_accept (struct Operation *op)
   op->state->se = strata_estimator_dup (op->spec->set->state->se);
   op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 
GNUNET_NO);
   op->state->salt_receive = op->state->salt_send = 42;
+  initialize_key_to_element (op);
   /* kick off the operation */
   send_strata_estimator (op);
 }
@@ -1743,6 +2052,9 @@ union_handle_p2p_message (struct Operation *op,
     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
       handle_p2p_elements (op, mh);
       break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
+      handle_p2p_full_element (op, mh);
+      break;
     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
       handle_p2p_inquiry (op, mh);
       break;
@@ -1755,6 +2067,9 @@ union_handle_p2p_message (struct Operation *op,
     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
       handle_p2p_demand (op, mh);
       break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
+      handle_p2p_full_done (op, mh);
+      break;
     default:
       /* Something wrong with cadet's message handlers? */
       GNUNET_assert (0);
diff --git a/src/set/gnunet-set-profiler.c b/src/set/gnunet-set-profiler.c
index f89817ff5..22a9d85cf 100644
--- a/src/set/gnunet-set-profiler.c
+++ b/src/set/gnunet-set-profiler.c
@@ -225,7 +225,8 @@ set_listen_cb (void *cls,
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "set listen cb called\n");
   info2.oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_SYMMETRIC,
-                               set_result_cb, &info2);
+                                (struct GNUNET_SET_Option[]) { 0 },
+                                set_result_cb, &info2);
   GNUNET_SET_commit (info2.oh, info2.set);
 }
 
@@ -352,6 +353,7 @@ run (void *cls,
 
   info1.oh = GNUNET_SET_prepare (&local_peer, &app_id, NULL,
                                  GNUNET_SET_RESULT_SYMMETRIC,
+                                 (struct GNUNET_SET_Option[]) { 0 },
                                  set_result_cb, &info1);
   GNUNET_SET_commit (info1.oh, info1.set);
   GNUNET_SET_destroy (info1.set);
diff --git a/src/set/set_api.c b/src/set/set_api.c
index baeee6da0..c2e2cd1e9 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -766,6 +766,7 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity 
*other_peer,
                     const struct GNUNET_HashCode *app_id,
                     const struct GNUNET_MessageHeader *context_msg,
                     enum GNUNET_SET_ResultMode result_mode,
+                    struct GNUNET_SET_Option options[],
                     GNUNET_SET_ResultIterator result_cb,
                     void *result_cls)
 {
@@ -1006,6 +1007,7 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle 
*lh)
 struct GNUNET_SET_OperationHandle *
 GNUNET_SET_accept (struct GNUNET_SET_Request *request,
                    enum GNUNET_SET_ResultMode result_mode,
+                   struct GNUNET_SET_Option options[],
                    GNUNET_SET_ResultIterator result_cb,
                    void *result_cls)
 {
diff --git a/src/set/test_set.conf b/src/set/test_set.conf
index 69e7f5c52..30ccbde55 100644
--- a/src/set/test_set.conf
+++ b/src/set/test_set.conf
@@ -5,7 +5,7 @@ GNUNET_TEST_HOME = /tmp/test-gnunet-set/
 
 [set]
 AUTOSTART = YES
-# PREFIX = valgrind
+PREFIX = valgrind
 #PREFIX = valgrind --leak-check=full
 #PREFIX = gdbserver :1234
 OPTIONS = -L INFO
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c
index 21af45f8a..4bc6bd1c3 100644
--- a/src/set/test_set_api.c
+++ b/src/set/test_set_api.c
@@ -149,6 +149,7 @@ listen_cb (void *cls,
   listen_handle = NULL;
   oh2 = GNUNET_SET_accept (request,
                            GNUNET_SET_RESULT_ADDED,
+                           (struct GNUNET_SET_Option[]) { 0 },
                            &result_cb_set2,
                            NULL);
   GNUNET_SET_commit (oh2,
@@ -179,6 +180,7 @@ start (void *cls)
                             &app_id,
                             &context_msg,
                             GNUNET_SET_RESULT_ADDED,
+                            (struct GNUNET_SET_Option[]) { 0 },
                             &result_cb_set1,
                             NULL);
   GNUNET_SET_commit (oh1,
@@ -378,6 +380,7 @@ run (void *cls,
                               &app_id,
                               NULL,
                               GNUNET_SET_RESULT_ADDED,
+                              (struct GNUNET_SET_Option[]) { 0 },
                               NULL,
                               NULL);
 
diff --git a/src/set/test_set_intersection_result_full.c 
b/src/set/test_set_intersection_result_full.c
index b2d6ce8a9..cbe1ce149 100644
--- a/src/set/test_set_intersection_result_full.c
+++ b/src/set/test_set_intersection_result_full.c
@@ -133,6 +133,7 @@ listen_cb (void *cls,
   listen_handle = NULL;
   oh2 = GNUNET_SET_accept (request,
                            GNUNET_SET_RESULT_FULL,
+                           (struct GNUNET_SET_Option[]) { 0 },
                            &result_cb_set2,
                            NULL);
   GNUNET_SET_commit (oh2,
@@ -163,6 +164,7 @@ start (void *cls)
                             &app_id,
                             &context_msg,
                             GNUNET_SET_RESULT_FULL,
+                            (struct GNUNET_SET_Option[]) { 0 },
                             &result_cb_set1,
                             NULL);
   GNUNET_SET_commit (oh1,
diff --git a/src/set/test_set_union_result_symmetric.c 
b/src/set/test_set_union_result_symmetric.c
index ab191a34a..8dff40ec0 100644
--- a/src/set/test_set_union_result_symmetric.c
+++ b/src/set/test_set_union_result_symmetric.c
@@ -184,6 +184,7 @@ listen_cb (void *cls,
   listen_handle = NULL;
   oh2 = GNUNET_SET_accept (request,
                            GNUNET_SET_RESULT_SYMMETRIC,
+                           (struct GNUNET_SET_Option[]) { 0 },
                            &result_cb_set2,
                            NULL);
   GNUNET_SET_commit (oh2,
@@ -212,6 +213,7 @@ start (void *cls)
                             &app_id,
                             &context_msg,
                             GNUNET_SET_RESULT_SYMMETRIC,
+                            (struct GNUNET_SET_Option[]) { 0 },
                             &result_cb_set1, NULL);
   GNUNET_SET_commit (oh1, set1);
 }

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]