gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r36377 - in gnunet/src: include set


From: gnunet
Subject: [GNUnet-SVN] r36377 - in gnunet/src: include set
Date: Sun, 27 Sep 2015 06:32:53 +0200

Author: dold
Date: 2015-09-27 06:32:52 +0200 (Sun, 27 Sep 2015)
New Revision: 36377

Added:
   gnunet/src/set/test_set_union_result_symmetric.c
Removed:
   gnunet/src/set/test_set_union_result_full.c
Modified:
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/set/Makefile.am
   gnunet/src/set/gnunet-service-set.c
   gnunet/src/set/gnunet-service-set_protocol.h
   gnunet/src/set/gnunet-service-set_union.c
   gnunet/src/set/gnunet-set-profiler.c
   gnunet/src/set/set_api.c
   gnunet/src/set/test_set.conf
Log:
SET service: accurate results for symmetric mode


Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2015-09-26 17:10:24 UTC (rev 
36376)
+++ gnunet/src/include/gnunet_protocols.h       2015-09-27 04:32:52 UTC (rev 
36377)
@@ -1801,6 +1801,27 @@
  * SET message types
  
******************************************************************************/
 
+/**
+ * Demand the whole element from the other
+ * peer, given only the hash code.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND 566
+
+/**
+ * Tell the other peer to send us a list of
+ * hashes that match an IBF key.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY 567
+
+/**
+ * Tell the other peer which hashes match a
+ * given IBF key.
+ */
+#define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER 568
+
+/**
+ * Reject a set request.
+ */
 #define GNUNET_MESSAGE_TYPE_SET_REJECT 569
 
 /**
@@ -1884,7 +1905,7 @@
 #define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 585
 
 /**
- * Union operation is done.
+ * Set operation is done.
  */
 #define GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE 586
 

Modified: gnunet/src/set/Makefile.am
===================================================================
--- gnunet/src/set/Makefile.am  2015-09-26 17:10:24 UTC (rev 36376)
+++ gnunet/src/set/Makefile.am  2015-09-27 04:32:52 UTC (rev 36377)
@@ -66,7 +66,7 @@
 if HAVE_TESTING
 check_PROGRAMS = \
  test_set_api \
- test_set_union_result_full \
+ test_set_union_result_symmetric \
  test_set_intersection_result_full \
  test_set_union_copy
 endif
@@ -83,9 +83,9 @@
   $(top_builddir)/src/testing/libgnunettesting.la \
   libgnunetset.la
 
-test_set_union_result_full_SOURCES = \
- test_set_union_result_full.c
-test_set_union_result_full_LDADD = \
+test_set_union_result_symmetric_SOURCES = \
+ test_set_union_result_symmetric.c
+test_set_union_result_symmetric_LDADD = \
   $(top_builddir)/src/util/libgnunetutil.la \
   $(top_builddir)/src/testing/libgnunettesting.la \
   libgnunetset.la

Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2015-09-26 17:10:24 UTC (rev 36376)
+++ gnunet/src/set/gnunet-service-set.c 2015-09-27 04:32:52 UTC (rev 36377)
@@ -560,6 +560,7 @@
                                              NULL);
       GNUNET_CONTAINER_multihashmap_destroy (content->elements);
       content->elements = NULL;
+      GNUNET_free (content);
     }
   }
   GNUNET_free_non_null (set->excluded_generations);
@@ -1951,8 +1952,11 @@
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE, 0},
-    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
     { &dispatch_p2p_message, 
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
     { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},

Modified: gnunet/src/set/gnunet-service-set_protocol.h
===================================================================
--- gnunet/src/set/gnunet-service-set_protocol.h        2015-09-26 17:10:24 UTC 
(rev 36376)
+++ gnunet/src/set/gnunet-service-set_protocol.h        2015-09-27 04:32:52 UTC 
(rev 36377)
@@ -58,6 +58,12 @@
 };
 
 
+/**
+ * Message containing buckets of an invertible bloom filter.
+ *
+ * If an IBF has too many buckets for an IBF message,
+ * it is split into multiple messages.
+ */
 struct IBFMessage
 {
   /**
@@ -86,7 +92,7 @@
    */
   uint32_t salt GNUNET_PACKED;
 
-  /* rest: strata */
+  /* rest: buckets */
 };
 
 

Modified: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c   2015-09-26 17:10:24 UTC (rev 
36376)
+++ gnunet/src/set/gnunet-service-set_union.c   2015-09-27 04:32:52 UTC (rev 
36377)
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2013 Christian Grothoff (and other contributing authors)
+      Copyright (C) 2013-2015 Christian Grothoff (and other contributing 
authors)
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -31,6 +31,11 @@
 #include <gcrypt.h>
 
 
+#define LOG(kind,...) GNUNET_log_from (kind, "set-union",__VA_ARGS__)
+
+#define LOG_OP(kind,msg,op,...) GNUNET_log_from (kind, "set-union","[OP %x] " 
msg,((void *)op),__VA_ARGS__)
+
+
 /**
  * Number of IBFs in a strata estimator.
  */
@@ -40,7 +45,7 @@
  */
 #define SE_IBF_SIZE 80
 /**
- * hash num parameter for the difference digests and strata estimators
+ * The hash num parameter for the difference digests and strata estimators.
  */
 #define SE_IBF_HASH_NUM 4
 
@@ -69,7 +74,7 @@
 enum UnionOperationPhase
 {
   /**
-   * We sent the request message, and expect a strata estimator
+   * We sent the request message, and expect a strata estimator.
    */
   PHASE_EXPECT_SE,
 
@@ -77,6 +82,8 @@
    * We sent the strata estimator, and expect an IBF. This phase is entered 
once
    * upon initialization and later via #PHASE_EXPECT_ELEMENTS_AND_REQUESTS.
    *
+   * XXX: could use better wording.
+   *
    * After receiving the complete IBF, we enter #PHASE_EXPECT_ELEMENTS
    */
   PHASE_EXPECT_IBF,
@@ -87,33 +94,33 @@
   PHASE_EXPECT_IBF_CONT,
 
   /**
-   * We are sending request and elements,
-   * and thus only expect elements from the other peer.
-   *
-   * We are currently decoding an IBF until it can no longer be decoded,
-   * we currently send requests and expect elements
-   * The remote peer is in #PHASE_EXPECT_ELEMENTS_AND_REQUESTS
+   * We are decoding an IBF.
    */
-  PHASE_EXPECT_ELEMENTS,
+  PHASE_INVENTORY_ACTIVE,
 
   /**
-   * We are expecting elements and requests, and send
-   * requested elements back to the other peer.
-   *
-   * We are in this phase if we have SENT an IBF for the remote peer to decode.
-   * We expect requests, send elements or could receive an new IBF, which takes
-   * us via #PHASE_EXPECT_IBF to phase #PHASE_EXPECT_ELEMENTS
-   *
-   * The remote peer is thus in:
-   * #PHASE_EXPECT_ELEMENTS
+   * The other peer is decoding the IBF we just sent.
    */
-  PHASE_EXPECT_ELEMENTS_AND_REQUESTS,
+  PHASE_INVENTORY_PASSIVE,
 
   /**
-   * The protocol is over.
-   * Results may still have to be sent to the client.
+   * The protocol is almost finished, but we still have to flush our message
+   * queue and/or expect some elements.
    */
-  PHASE_FINISHED
+  PHASE_FINISH_CLOSING,
+
+  /**
+   * In the penultimate phase,
+   * we wait until all our demands
+   * are satisfied.  Then we send a done
+   * message, and wait for another done message.*/
+  PHASE_FINISH_WAITING,
+
+  /**
+   * In the ultimate phase, we wait until
+   * our demands are satisfied and then
+   * quit (sending another DONE message). */
+  PHASE_DONE,
 };
 
 
@@ -122,20 +129,19 @@
  */
 struct OperationState
 {
-
   /**
    * Copy of the set's strata estimator at the time of
-   * creation of this operation
+   * creation of this operation.
    */
   struct StrataEstimator *se;
 
   /**
-   * The ibf we currently receive
+   * The IBF we currently receive.
    */
   struct InvertibleBloomFilter *remote_ibf;
 
   /**
-   * IBF of the set's element.
+   * The IBF with the local set's element.
    */
   struct InvertibleBloomFilter *local_ibf;
 
@@ -147,11 +153,6 @@
   struct GNUNET_CONTAINER_MultiHashMap32 *key_to_element;
 
   /**
-   * Iterator for sending elements on the key to element mapping to the client.
-   */
-  struct GNUNET_CONTAINER_MultiHashMap32Iterator *full_result_iter;
-
-  /**
    * Current state of the operation.
    */
   enum UnionOperationPhase phase;
@@ -162,10 +163,14 @@
   int client_done_sent;
 
   /**
-   * Number of ibf buckets received
+   * Number of ibf buckets already received into the @a remote_ibf.
    */
   unsigned int ibf_buckets_received;
 
+  /**
+   * Hashes for elements that we have demanded from the other peer.
+   */
+  struct GNUNET_CONTAINER_MultiHashMap *demanded_hashes;
 };
 
 
@@ -181,14 +186,11 @@
 
   /**
    * The actual element associated with the key.
+   *
+   * Only owned by the union operation if element->operation
+   * is #GNUNET_YES.
    */
   struct ElementEntry *element;
-
-  /**
-   * Element that collides with this element
-   * on the ibf key. All colliding entries must have the same ibf key.
-   */
-  struct KeyEntry *next_colliding;
 };
 
 
@@ -215,7 +217,7 @@
 /**
  * Extra state required for efficient set union.
  */
-struct SetState
+  struct SetState
 {
   /**
    * The strata estimator is only generated once for
@@ -244,18 +246,13 @@
 {
   struct KeyEntry *k = value;
 
-  while (NULL != k)
+  GNUNET_assert (NULL != k);
+  if (GNUNET_YES == k->element->remote)
   {
-    struct KeyEntry *k_tmp = k;
-
-    k = k->next_colliding;
-    if (GNUNET_YES == k_tmp->element->remote)
-    {
-      GNUNET_free (k_tmp->element);
-      k_tmp->element = NULL;
-    }
-    GNUNET_free (k_tmp);
+    GNUNET_free (k->element);
+    k->element = NULL;
   }
+  GNUNET_free (k);
   return GNUNET_YES;
 }
 
@@ -269,8 +266,8 @@
 static void
 union_op_cancel (struct Operation *op)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "destroying union op\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "destroying union op\n");
   /* check if the op was canceled twice */
   GNUNET_assert (NULL != op->state);
   if (NULL != op->state->remote_ibf)
@@ -278,6 +275,11 @@
     ibf_destroy (op->state->remote_ibf);
     op->state->remote_ibf = NULL;
   }
+  if (NULL != op->state->demanded_hashes)
+  {
+    GNUNET_CONTAINER_multihashmap_destroy (op->state->demanded_hashes);
+    op->state->demanded_hashes = NULL;
+  }
   if (NULL != op->state->local_ibf)
   {
     ibf_destroy (op->state->local_ibf);
@@ -298,8 +300,8 @@
   }
   GNUNET_free (op->state);
   op->state = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "destroying union op done\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "destroying union op done\n");
 }
 
 
@@ -315,8 +317,8 @@
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *msg;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-              "union operation failed\n");
+  LOG (GNUNET_ERROR_TYPE_ERROR,
+       "union operation failed\n");
   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
   msg->request_id = htonl (op->spec->client_request_id);
@@ -340,57 +342,25 @@
 {
   struct IBF_Key key;
 
-  GNUNET_CRYPTO_hkdf (&key, sizeof (key),
-                     GCRY_MD_SHA512, GCRY_MD_SHA256,
-                      src, sizeof *src,
-                     &salt, sizeof (salt),
-                     NULL, 0);
+  GNUNET_CRYPTO_kdf (&key, sizeof (key),
+                     src, sizeof *src,
+                     &salt, sizeof (salt),
+                     NULL, 0);
   return key;
 }
 
 
 /**
- * Iterator to create the mapping between ibf keys
- * and element entries.
+ * Iterator over the mapping from IBF keys to element entries.  Checks if we
+ * have an element with a given GNUNET_HashCode.
  *
  * @param cls closure
  * @param key current key code
  * @param value value in the hash map
- * @return #GNUNET_YES if we should continue to iterate,
- *         #GNUNET_NO if not.
+ * @return #GNUNET_YES if we should search further,
+ *         #GNUNET_NO if we've found the element.
  */
 static int
-op_register_element_iterator (void *cls,
-                              uint32_t key,
-                              void *value)
-{
-  struct KeyEntry *const new_k = cls;
-  struct KeyEntry *old_k = value;
-
-  GNUNET_assert (NULL != old_k);
-  /* check if our ibf key collides with the ibf key in the existing entry */
-  if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
-  {
-    /* insert the the new key in the collision chain */
-    new_k->next_colliding = old_k->next_colliding;
-    old_k->next_colliding = new_k;
-    /* signal to the caller that we were able to insert into a colliding 
bucket */
-    return GNUNET_NO;
-  }
-  return GNUNET_YES;
-}
-
-
-/**
- * Iterator to create the mapping between ibf keys
- * and element entries.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return #GNUNET_YES (we should continue to iterate)
- */
-static int
 op_has_element_iterator (void *cls,
                          uint32_t key,
                          void *value)
@@ -399,13 +369,9 @@
   struct KeyEntry *k = value;
 
   GNUNET_assert (NULL != k);
-  while (NULL != k)
-  {
-    if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
-                                     element_hash))
-      return GNUNET_NO;
-    k = k->next_colliding;
-  }
+  if (0 == GNUNET_CRYPTO_hash_cmp (&k->element->element_hash,
+                                   element_hash))
+    return GNUNET_NO;
   return GNUNET_YES;
 }
 
@@ -446,6 +412,8 @@
  * This is done to speed up re-tried operations, if some elements
  * were transmitted, and then the IBF fails to decode.
  *
+ * XXX: clarify ownership, doesn't sound right.
+ *
  * @param op the union operation
  * @param ee the element entry
  */
@@ -453,7 +421,6 @@
 op_register_element (struct Operation *op,
                      struct ElementEntry *ee)
 {
-  int ret;
   struct IBF_Key ibf_key;
   struct KeyEntry *k;
 
@@ -461,18 +428,11 @@
   k = GNUNET_new (struct KeyEntry);
   k->element = ee;
   k->ibf_key = ibf_key;
-  ret = GNUNET_CONTAINER_multihashmap32_get_multiple 
(op->state->key_to_element,
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_CONTAINER_multihashmap32_put 
(op->state->key_to_element,
                                                       (uint32_t) 
ibf_key.key_val,
-                                                      
op_register_element_iterator,
-                                                      k);
-
-  /* was the element inserted into a colliding bucket? */
-  if (GNUNET_SYSERR == ret)
-    return;
-  GNUNET_CONTAINER_multihashmap32_put (op->state->key_to_element,
-                                       (uint32_t) ibf_key.key_val,
-                                       k,
-                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+                                                      k,
+                                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
 }
 
 
@@ -488,13 +448,15 @@
                       uint32_t key,
                       void *value)
 {
-  struct InvertibleBloomFilter *ibf = cls;
+  struct Operation *op = cls;
   struct KeyEntry *ke = value;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "inserting %x into ibf\n",
-              ke->ibf_key.key_val);
-  ibf_insert (ibf, ke->ibf_key);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "[OP %x] inserting %lx (hash %s) into ibf\n",
+       (void *) op,
+       (unsigned long) ke->ibf_key.key_val,
+       GNUNET_h2s (&ke->element->element_hash));
+  ibf_insert (op->state->local_ibf, ke->ibf_key);
   return GNUNET_YES;
 }
 
@@ -554,7 +516,7 @@
   op->state->local_ibf = ibf_create (size, SE_IBF_HASH_NUM);
   GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
                                            &prepare_ibf_iterator,
-                                           op->state->local_ibf);
+                                           op);
 }
 
 
@@ -561,6 +523,8 @@
 /**
  * Send an ibf of appropriate size.
  *
+ * Fragments the IBF into multiple messages if necessary.
+ *
  * @param op the union operation
  * @param ibf_order order of the ibf to send, size=2^order
  */
@@ -573,9 +537,9 @@
 
   prepare_ibf (op, 1<<ibf_order);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "sending ibf of size %u\n",
-              1<<ibf_order);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "sending ibf of size %u\n",
+       1<<ibf_order);
 
   ibf = op->state->local_ibf;
 
@@ -599,7 +563,7 @@
     ibf_write_slice (ibf, buckets_sent,
                      buckets_in_message, &msg[1]);
     buckets_sent += buckets_in_message;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
                 "ibf chunk size %u, %u/%u sent\n",
                 buckets_in_message,
                 buckets_sent,
@@ -607,7 +571,9 @@
     GNUNET_MQ_send (op->mq, ev);
   }
 
-  op->state->phase = PHASE_EXPECT_ELEMENTS_AND_REQUESTS;
+  /* The other peer must decode the IBF, so
+   * we're passive. */
+  op->state->phase = PHASE_INVENTORY_PASSIVE;
 }
 
 
@@ -629,7 +595,7 @@
   GNUNET_MQ_send (op->mq,
                   ev);
   op->state->phase = PHASE_EXPECT_IBF;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
               "sent SE, expecting IBF\n");
 }
 
@@ -696,7 +662,7 @@
   strata_estimator_destroy (remote_se);
   strata_estimator_destroy (op->state->se);
   op->state->se = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
               "got se diff=%d, using ibf size %d\n",
               diff,
               1<<get_order_from_difference (diff));
@@ -714,56 +680,44 @@
  * @param value the key entry
  */
 static int
-send_element_iterator (void *cls,
-                       uint32_t key,
-                       void *value)
+send_offers_iterator (void *cls,
+                      uint32_t key,
+                      void *value)
 {
   struct SendElementClosure *sec = cls;
-  struct IBF_Key ibf_key = sec->ibf_key;
   struct Operation *op = sec->op;
   struct KeyEntry *ke = value;
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_MessageHeader *mh;
 
-  if (ke->ibf_key.key_val != ibf_key.key_val)
+  /* Detect 32-bit key collision for the 64-bit IBF keys. */
+  if (ke->ibf_key.key_val != sec->ibf_key.key_val)
     return GNUNET_YES;
-  while (NULL != ke)
-  {
-    const struct GNUNET_SET_Element *const element = &ke->element->element;
-    struct GNUNET_MQ_Envelope *ev;
-    struct GNUNET_MessageHeader *mh;
 
-    GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
-    ev = GNUNET_MQ_msg_header_extra (mh,
-                                     element->size,
-                                     GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
-    if (NULL == ev)
-    {
-      /* element too large */
-      GNUNET_break (0);
-      continue;
-    }
-    memcpy (&mh[1],
-            element->data,
-            element->size);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "sending element (%s) to peer\n",
-                GNUNET_h2s (&ke->element->element_hash));
-    GNUNET_MQ_send (op->mq, ev);
-    ke = ke->next_colliding;
-  }
-  return GNUNET_NO;
+  ev = GNUNET_MQ_msg_header_extra (mh,
+                                   sizeof (struct GNUNET_HashCode),
+                                   GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER);
+
+  GNUNET_assert (NULL != ev);
+  *(struct GNUNET_HashCode *) &mh[1] = ke->element->element_hash;
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+              "[OP %x] sending element offer (%s) to peer\n",
+              (void *) op,
+              GNUNET_h2s (&ke->element->element_hash));
+  GNUNET_MQ_send (op->mq, ev);
+  return GNUNET_YES;
 }
 
 
 /**
- * Send all elements that have the specified IBF key
- * to the remote peer of the union operation
+ * Send offers (in the form of GNUNET_Hash-es) to the remote peer for the 
given IBF key.
  *
  * @param op union operation
  * @param ibf_key IBF key of interest
  */
 static void
-send_elements_for_key (struct Operation *op,
-                       struct IBF_Key ibf_key)
+send_offers_for_key (struct Operation *op,
+                     struct IBF_Key ibf_key)
 {
   struct SendElementClosure send_cls;
 
@@ -771,7 +725,7 @@
   send_cls.op = op;
   (void) GNUNET_CONTAINER_multihashmap32_get_multiple 
(op->state->key_to_element,
                                                        (uint32_t) 
ibf_key.key_val,
-                                                       &send_element_iterator,
+                                                       &send_offers_iterator,
                                                        &send_cls);
 }
 
@@ -778,7 +732,7 @@
 
 /**
  * Decode which elements are missing on each side, and
- * send the appropriate elemens and requests
+ * send the appropriate offers and inquiries.
  *
  * @param op union operation
  */
@@ -791,7 +745,7 @@
   unsigned int num_decoded;
   struct InvertibleBloomFilter *diff_ibf;
 
-  GNUNET_assert (PHASE_EXPECT_ELEMENTS == op->state->phase);
+  GNUNET_assert (PHASE_INVENTORY_ACTIVE == op->state->phase);
 
   prepare_ibf (op, op->state->remote_ibf->size);
   diff_ibf = ibf_dup (op->state->local_ibf);
@@ -800,7 +754,7 @@
   ibf_destroy (op->state->remote_ibf);
   op->state->remote_ibf = NULL;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
               "decoding IBF (size=%u)\n",
               diff_ibf->size);
 
@@ -817,14 +771,14 @@
     res = ibf_decode (diff_ibf, &side, &key);
     if (res == GNUNET_OK)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
                   "decoded ibf key %lx\n",
-                  key.key_val);
+                  (unsigned long) key.key_val);
       num_decoded += 1;
       if ( (num_decoded > diff_ibf->size) ||
            (num_decoded > 1 && last_key.key_val == key.key_val) )
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+        LOG (GNUNET_ERROR_TYPE_DEBUG,
                     "detected cyclic ibf (decoded %u/%u)\n",
                     num_decoded,
                     diff_ibf->size);
@@ -841,15 +795,17 @@
       next_order++;
       if (next_order <= MAX_IBF_ORDER)
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                   "decoding failed, sending larger ibf (size %u)\n",
-                    1<<next_order);
+        LOG_OP (GNUNET_ERROR_TYPE_DEBUG,
+                "decoding failed, sending larger ibf (size %u)\n",
+                op,
+                1<<next_order);
         send_ibf (op, next_order);
       }
       else
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                   "set union failed: reached ibf limit\n");
+        // XXX: Send the whole set, element-by-element
+        LOG (GNUNET_ERROR_TYPE_ERROR,
+                    "set union failed: reached ibf limit\n");
       }
       break;
     }
@@ -857,15 +813,18 @@
     {
       struct GNUNET_MQ_Envelope *ev;
 
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
                   "transmitted all values, sending DONE\n");
       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
       GNUNET_MQ_send (op->mq, ev);
+      /* We now wait until we get a DONE message back
+       * and then wait for our MQ to be flushed and all our
+       * demands be delivered. */
       break;
     }
     if (1 == side)
     {
-      send_elements_for_key (op, key);
+      send_offers_for_key (op, key);
     }
     else if (-1 == side)
     {
@@ -872,17 +831,18 @@
       struct GNUNET_MQ_Envelope *ev;
       struct GNUNET_MessageHeader *msg;
 
-      /* It may be nice to merge multiple requests, but with cadet's corking 
it is not worth
+      /* It may be nice to merge multiple requests, but with CADET's corking 
it is not worth
        * the effort additional complexity. */
       ev = GNUNET_MQ_msg_header_extra (msg,
                                        sizeof (struct IBF_Key),
-                                       
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS);
+                                       
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY);
 
       memcpy (&msg[1],
               &key,
               sizeof (struct IBF_Key));
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "sending element request\n");
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+                  "sending element inquiry for IBF key %lx\n",
+                  (unsigned long) key.key_val);
       GNUNET_MQ_send (op->mq, ev);
     }
     else
@@ -897,6 +857,9 @@
 /**
  * Handle an IBF message from a remote peer.
  *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
  * @param cls the union operation
  * @param mh the header of the message
  * @return #GNUNET_SYSERR if the tunnel should be disconnected,
@@ -917,12 +880,12 @@
     return GNUNET_SYSERR;
   }
   msg = (const struct IBFMessage *) mh;
-  if ( (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS) ||
+  if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
        (op->state->phase == PHASE_EXPECT_IBF) )
   {
     op->state->phase = PHASE_EXPECT_IBF_CONT;
     GNUNET_assert (NULL == op->state->remote_ibf);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
                 "Creating new ibf of size %u\n",
                 1 << msg->order);
     op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
@@ -944,6 +907,13 @@
       return GNUNET_SYSERR;
     }
   }
+  else
+  {
+    LOG_OP (GNUNET_ERROR_TYPE_DEBUG,
+            "wrong phase\n",
+            op, NULL);
+    GNUNET_assert (0);
+  }
 
   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / 
IBF_BUCKET_SIZE;
 
@@ -961,6 +931,8 @@
     return GNUNET_SYSERR;
   }
 
+  GNUNET_assert (NULL != op->state->remote_ibf);
+
   ibf_read_slice (&msg[1],
                   op->state->ibf_buckets_received,
                   buckets_in_message,
@@ -969,9 +941,9 @@
 
   if (op->state->ibf_buckets_received == op->state->remote_ibf->size)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
                 "received full ibf\n");
-    op->state->phase = PHASE_EXPECT_ELEMENTS;
+    op->state->phase = PHASE_INVENTORY_ACTIVE;
     decode_and_send (op);
   }
   return GNUNET_OK;
@@ -984,15 +956,17 @@
  *
  * @param op union operation
  * @param element element to send
+ * @param status status to send with the new element
  */
 static void
 send_client_element (struct Operation *op,
-                     struct GNUNET_SET_Element *element)
+                     struct GNUNET_SET_Element *element,
+                     int status)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *rm;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
               "sending element (size %u) to client\n",
               element->size);
   GNUNET_assert (0 != op->spec->client_request_id);
@@ -1003,12 +977,7 @@
     GNUNET_break (0);
     return;
   }
-
-  if (GNUNET_SET_RESULT_ADDED == op->spec->result_mode)
-    rm->result_status = htons (GNUNET_SET_STATUS_OK);
-  else if (GNUNET_SET_RESULT_SYMMETRIC == op->spec->result_mode)
-    rm->result_status = htons (GNUNET_SET_STATUS_ADD_LOCAL);
-
+  rm->result_status = htons (status);
   rm->request_id = htonl (op->spec->client_request_id);
   rm->element_type = element->element_type;
   memcpy (&rm[1], element->data, element->size);
@@ -1034,102 +1003,50 @@
   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
   rm->element_type = htons (0);
   GNUNET_MQ_send (op->spec->set->client_mq, ev);
+  /* Will also call the union-specific cancel function. */
   _GSS_operation_destroy (op, GNUNET_YES);
-  op->keep--;
-  if (0 == op->keep)
-    GNUNET_free (op);
 }
 
 
-/**
- * Send all remaining elements in the full result iterator.
- *
- * @param cls operation
- */
 static void
-send_remaining_elements (void *cls)
+maybe_finish (struct Operation *op)
 {
-  struct Operation *op = cls;
-  struct KeyEntry *ke;
-  int res;
+  unsigned int num_demanded;
 
-  res = GNUNET_CONTAINER_multihashmap32_iterator_next 
(op->state->full_result_iter,
-                                                       NULL,
-                                                       (const void **) &ke);
-  if (GNUNET_NO == res)
+  num_demanded = GNUNET_CONTAINER_multihashmap_size 
(op->state->demanded_hashes);
+
+  if (PHASE_FINISH_WAITING == op->state->phase)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "sending done and destroy because iterator ran out\n");
-    send_done_and_destroy (op);
-    return;
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "In PHASE_FINISH_WAITING, pending %u demands\n",
+         num_demanded);
+    if (0 == num_demanded)
+    {
+      struct GNUNET_MQ_Envelope *ev;
+
+      op->state->phase = PHASE_DONE;
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
+      GNUNET_MQ_send (op->mq, ev);
+
+      /* We now wait until the other peer closes the channel
+       * after it got all elements from us. */
+    }
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "sending elements from key entry\n");
-  while (1)
+  if (PHASE_FINISH_CLOSING == op->state->phase)
   {
-    struct GNUNET_MQ_Envelope *ev;
-    struct GNUNET_SET_ResultMessage *rm;
-    struct GNUNET_SET_Element *element;
-
-    element = &ke->element->element;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "sending element (size %u) to client (full set)\n",
-                element->size);
-    GNUNET_assert (0 != op->spec->client_request_id);
-    ev = GNUNET_MQ_msg_extra (rm,
-                              element->size,
-                              GNUNET_MESSAGE_TYPE_SET_RESULT);
-    if (NULL == ev)
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "In PHASE_FINISH_CLOSING, pending %u demands\n",
+         num_demanded);
+    if (0 == num_demanded)
     {
-      GNUNET_MQ_discard (ev);
-      GNUNET_break (0);
-      continue;
+      op->state->phase = PHASE_DONE;
+      send_done_and_destroy (op);
     }
-    rm->result_status = htons (GNUNET_SET_STATUS_OK);
-    rm->request_id = htonl (op->spec->client_request_id);
-    rm->element_type = element->element_type;
-    memcpy (&rm[1], element->data, element->size);
-    if (NULL == ke->next_colliding)
-    {
-      GNUNET_MQ_notify_sent (ev, send_remaining_elements, op);
-      GNUNET_MQ_send (op->spec->set->client_mq, ev);
-      break;
-    }
-    GNUNET_MQ_send (op->spec->set->client_mq, ev);
-    ke = ke->next_colliding;
   }
 }
 
 
 /**
- * Send a result message to the client indicating
- * that the operation is over.
- * After the result done message has been sent to the client,
- * destroy the evaluate operation.
- *
- * @param op union operation
- */
-static void
-finish_and_destroy (struct Operation *op)
-{
-  GNUNET_assert (GNUNET_NO == op->state->client_done_sent);
-  op->keep++;
-  if (GNUNET_SET_RESULT_FULL == op->spec->result_mode)
-  {
-    /* prevent that the op is free'd by the tunnel end handler */
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "sending full result set\n");
-    GNUNET_assert (NULL == op->state->full_result_iter);
-    op->state->full_result_iter =
-        GNUNET_CONTAINER_multihashmap32_iterator_create 
(op->state->key_to_element);
-    send_remaining_elements (op);
-    return;
-  }
-  send_done_and_destroy (op);
-}
-
-
-/**
  * Handle an element message from a remote peer.
  *
  * @param cls the union operation
@@ -1141,51 +1058,92 @@
 {
   struct Operation *op = cls;
   struct ElementEntry *ee;
+  const struct GNUNET_SET_ElementMessage *emsg;
   uint16_t element_size;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Got element from peer\n");
-  if ( (op->state->phase != PHASE_EXPECT_ELEMENTS) &&
-       (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
+  if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
   {
+    GNUNET_break_op (0);
     fail_union_operation (op);
+    return;
+  }
+
+  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+  {
     GNUNET_break_op (0);
+    fail_union_operation (op);
     return;
   }
-  element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
+
+  emsg = (struct GNUNET_SET_ElementMessage *) mh;
+
+  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
-  memcpy (&ee[1], &mh[1], element_size);
+  memcpy (&ee[1], &emsg[1], element_size);
   ee->element.size = element_size;
   ee->element.data = &ee[1];
+  ee->element.element_type = ntohs (emsg->element_type);
   ee->remote = GNUNET_YES;
   GNUNET_CRYPTO_hash (ee->element.data,
                       ee->element.size,
                       &ee->element_hash);
 
-  if (GNUNET_YES == op_has_element (op, &ee->element_hash))
+  if (GNUNET_NO == GNUNET_CONTAINER_multihashmap_remove 
(op->state->demanded_hashes, &ee->element_hash, NULL))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "got existing element from peer\n");
+    /* We got something we didn't demand, since it's not in our map. */
+    GNUNET_break_op (0);
     GNUNET_free (ee);
+    fail_union_operation (op);
     return;
   }
 
-  op_register_element (op, ee);
-  /* only send results immediately if the client wants it */
-  if (GNUNET_SET_RESULT_FULL != op->spec->result_mode)
-    send_client_element (op, &ee->element);
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+              "Got element (size %u, hash %s) from peer\n",
+              (unsigned int) element_size,
+              GNUNET_h2s (&ee->element_hash));
+
+  if (GNUNET_YES == op_has_element (op, &ee->element_hash))
+  {
+    /* Got repeated element.  Should not happen since
+     * we track demands. */
+    GNUNET_break (0);
+    GNUNET_free (ee);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Registering new element from remote peer\n");
+    op_register_element (op, ee);
+    /* only send results immediately if the client wants it */
+    switch (op->spec->result_mode)
+    {
+      case GNUNET_SET_RESULT_ADDED:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_OK);
+        break;
+      case GNUNET_SET_RESULT_SYMMETRIC:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_LOCAL);
+        break;
+      default:
+        /* Result mode not supported, should have been caught earlier. */
+        GNUNET_break (0);
+        break;
+    }
+  }
+
+  maybe_finish (op);
 }
 
 
 /**
- * Handle an element request from a remote peer.
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
  *
  * @param cls the union operation
  * @param mh the message
  */
 static void
-handle_p2p_element_requests (void *cls,
-                             const struct GNUNET_MessageHeader *mh)
+handle_p2p_inquiry (void *cls,
+                    const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
   const struct IBF_Key *ibf_key;
@@ -1192,7 +1150,7 @@
   unsigned int num_keys;
 
   /* look up elements and send them */
-  if (op->state->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
+  if (op->state->phase != PHASE_INVENTORY_PASSIVE)
   {
     GNUNET_break_op (0);
     fail_union_operation (op);
@@ -1199,7 +1157,7 @@
     return;
   }
   num_keys = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
-    / sizeof (struct IBF_Key);
+      / sizeof (struct IBF_Key);
   if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
       != num_keys * sizeof (struct IBF_Key))
   {
@@ -1211,13 +1169,150 @@
   ibf_key = (const struct IBF_Key *) &mh[1];
   while (0 != num_keys--)
   {
-    send_elements_for_key (op, *ibf_key);
+    send_offers_for_key (op, *ibf_key);
     ibf_key++;
   }
 }
 
 
+
+static void
+handle_p2p_demand (void *cls,
+                    const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  struct GNUNET_SET_ElementMessage *emsg;
+  const struct GNUNET_HashCode *hash;
+  unsigned int num_hashes;
+  struct GNUNET_MQ_Envelope *ev;
+
+  num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+    / sizeof (struct GNUNET_HashCode);
+  if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+      != num_hashes * sizeof (struct GNUNET_HashCode))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  for (hash = (const struct GNUNET_HashCode *) &mh[1];
+       num_hashes > 0;
+       hash++, num_hashes--)
+  {
+    ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, 
hash);
+    if (NULL == ee)
+    {
+      /* Demand for non-existing element. */
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return;
+    }
+    if (GNUNET_NO == _GSS_is_element_of_operation (ee, op))
+    {
+      /* Probably confused lazily copied sets. */
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return;
+    }
+    ev = GNUNET_MQ_msg_extra (emsg, ee->element.size, 
GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
+    memcpy (&emsg[1], ee->element.data, ee->element.size);
+    emsg->reserved = htons (0);
+    emsg->element_type = htons (ee->element.element_type);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "[OP %x] Sending demanded element (size %u, hash %s) to peer\n",
+         (void *) op,
+         (unsigned int) ee->element.size,
+         GNUNET_h2s (&ee->element_hash));
+    GNUNET_MQ_send (op->mq, ev);
+
+    switch (op->spec->result_mode)
+    {
+      case GNUNET_SET_RESULT_ADDED:
+        /* Nothing to do. */
+        break;
+      case GNUNET_SET_RESULT_SYMMETRIC:
+        send_client_element (op, &ee->element, GNUNET_SET_STATUS_ADD_REMOTE);
+        break;
+      default:
+        /* Result mode not supported, should have been caught earlier. */
+        GNUNET_break (0);
+        break;
+    }
+  }
+}
+
+
 /**
+ * Handle offers (of GNUNET_HashCode-s) and
+ * respond with demands (of GNUNET_HashCode-s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+static void
+handle_p2p_offer (void *cls,
+                    const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  const struct GNUNET_HashCode *hash;
+  unsigned int num_hashes;
+
+  /* look up elements and send them */
+  if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+       (op->state->phase != PHASE_INVENTORY_ACTIVE))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+  num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+    / sizeof (struct GNUNET_HashCode);
+  if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+      != num_hashes * sizeof (struct GNUNET_HashCode))
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+
+  for (hash = (const struct GNUNET_HashCode *) &mh[1];
+       num_hashes > 0;
+       hash++, num_hashes--)
+  {
+    struct ElementEntry *ee;
+    struct GNUNET_MessageHeader *demands;
+    struct GNUNET_MQ_Envelope *ev;
+    ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, 
hash);
+    if (NULL != ee)
+      if (GNUNET_YES == _GSS_is_element_of_operation (ee, op))
+        continue;
+
+    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains 
(op->state->demanded_hashes, hash))
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Skipped sending duplicate demand\n");
+      continue;
+    }
+
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multihashmap_put 
(op->state->demanded_hashes,
+                                                      hash,
+                                                      NULL,
+                                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "[OP %x] Requesting element (hash %s)\n",
+         (void *) op, GNUNET_h2s (hash));
+    ev = GNUNET_MQ_msg_header_extra (demands, sizeof (struct GNUNET_HashCode), 
GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND);
+    *(struct GNUNET_HashCode *) &demands[1] = *hash;
+    GNUNET_MQ_send (op->mq, ev);
+  }
+}
+
+
+/**
  * Handle a done message from a remote peer
  *
  * @param cls the union operation
@@ -1228,25 +1323,40 @@
                  const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
-  struct GNUNET_MQ_Envelope *ev;
 
-  if (op->state->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
+  if (op->state->phase == PHASE_INVENTORY_PASSIVE)
   {
-    /* we got all requests, but still have to send our elements as response */
+    /* We got all requests, but still have to send our elements in response. */
 
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "got DONE, sending final DONE after elements\n");
-    op->state->phase = PHASE_FINISHED;
-    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE);
-    GNUNET_MQ_send (op->mq, ev);
+    op->state->phase = PHASE_FINISH_WAITING;
+
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+                "got DONE (as passive partner), waiting for our demands to be 
satisfied\n");
+    /* The active peer is done sending offers
+     * and inquiries.  This means that all
+     * our responses to that (demands and offers)
+     * must be in flight (queued or in mesh).
+     *
+     * We should notify the active peer once
+     * all our demands are satisfied, so that the active
+     * peer can quit if we gave him everything.
+     */
+    maybe_finish (op);
     return;
   }
-  if (op->state->phase == PHASE_EXPECT_ELEMENTS)
+  if (op->state->phase == PHASE_INVENTORY_ACTIVE)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "got final DONE\n");
-    op->state->phase = PHASE_FINISHED;
-    finish_and_destroy (op);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+                "got DONE (as active partner), waiting to finish\n");
+    /* All demands of the other peer are satisfied,
+     * and we processed all offers, thus we know
+     * exactly what our demands must be.
+     *
+     * We'll close the channel
+     * to the other peer once our demands are met.
+     */
+    op->state->phase = PHASE_FINISH_CLOSING;
+    maybe_finish (op);
     return;
   }
   GNUNET_break_op (0);
@@ -1268,12 +1378,14 @@
   struct GNUNET_MQ_Envelope *ev;
   struct OperationRequestMessage *msg;
 
+  GNUNET_assert (NULL == op->state);
   op->state = GNUNET_new (struct OperationState);
+  op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 
GNUNET_NO);
   /* copy the current generation's strata estimator for this operation */
   op->state->se = strata_estimator_dup (op->spec->set->state->se);
   /* we started the operation, thus we have to send the operation request */
   op->state->phase = PHASE_EXPECT_SE;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
               "Initiating union operation evaluation\n");
   ev = GNUNET_MQ_msg_nested_mh (msg,
                                 GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
@@ -1291,10 +1403,10 @@
                   ev);
 
   if (NULL != opaque_context)
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
                 "sent op request with context message\n");
   else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
                 "sent op request without context message\n");
 }
 
@@ -1308,10 +1420,12 @@
 static void
 union_accept (struct Operation *op)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
               "accepting set union operation\n");
+  GNUNET_assert (NULL == op->state);
   op->state = GNUNET_new (struct OperationState);
   op->state->se = strata_estimator_dup (op->spec->set->state->se);
+  op->state->demanded_hashes = GNUNET_CONTAINER_multihashmap_create (32, 
GNUNET_NO);
   /* kick off the operation */
   send_strata_estimator (op);
 }
@@ -1330,7 +1444,7 @@
 {
   struct SetState *set_state;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
               "union set created\n");
   set_state = GNUNET_new (struct SetState);
   set_state->se = strata_estimator_create (SE_STRATA_COUNT,
@@ -1397,10 +1511,10 @@
 union_handle_p2p_message (struct Operation *op,
                           const struct GNUNET_MessageHeader *mh)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "received p2p message (t: %u, s: %u)\n",
-              ntohs (mh->type),
-              ntohs (mh->size));
+  //LOG (GNUNET_ERROR_TYPE_DEBUG,
+  //            "received p2p message (t: %u, s: %u)\n",
+  //            ntohs (mh->type),
+  //            ntohs (mh->size));
   switch (ntohs (mh->type))
   {
     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
@@ -1410,22 +1524,29 @@
     case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
       handle_p2p_elements (op, mh);
       break;
-    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS:
-      handle_p2p_element_requests (op, mh);
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
+      handle_p2p_inquiry (op, mh);
       break;
     case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
       handle_p2p_done (op, mh);
       break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
+      handle_p2p_offer (op, mh);
+      break;
+    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
+      handle_p2p_demand (op, mh);
+      break;
     default:
-      /* something wrong with cadet's message handlers? */
+      /* Something wrong with cadet's message handlers? */
       GNUNET_assert (0);
   }
   return GNUNET_OK;
 }
 
+
 /**
- * handler for peer-disconnects, notifies the client
- * about the aborted operation in case the op was not concluded
+ * Handler for peer-disconnects, notifies the client
+ * about the aborted operation in case the op was not concluded.
  *
  * @param op the destroyed operation
  */
@@ -1432,7 +1553,7 @@
 static void
 union_peer_disconnect (struct Operation *op)
 {
-  if (PHASE_FINISHED != op->state->phase)
+  if (PHASE_DONE != op->state->phase)
   {
     struct GNUNET_MQ_Envelope *ev;
     struct GNUNET_SET_ResultMessage *msg;
@@ -1444,19 +1565,27 @@
     msg->element_type = htons (0);
     GNUNET_MQ_send (op->spec->set->client_mq,
                     ev);
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "other peer disconnected prematurely\n");
+    LOG (GNUNET_ERROR_TYPE_WARNING,
+                "other peer disconnected prematurely, phase %u\n",
+                op->state->phase);
     _GSS_operation_destroy (op,
                             GNUNET_YES);
     return;
   }
   // else: the session has already been concluded
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
               "other peer disconnected (finished)\n");
   if (GNUNET_NO == op->state->client_done_sent)
-    finish_and_destroy (op);
+    send_done_and_destroy (op);
 }
 
+
+/**
+ * Copy union-specific set state.
+ *
+ * @param set source set for copying the union state
+ * @return a copy of the union-specific set state
+ */
 static struct SetState *
 union_copy_state (struct Set *set)
 {
@@ -1469,6 +1598,7 @@
   return new_state;
 }
 
+
 /**
  * Get the table with implementing functions for
  * set union.

Modified: gnunet/src/set/gnunet-set-profiler.c
===================================================================
--- gnunet/src/set/gnunet-set-profiler.c        2015-09-26 17:10:24 UTC (rev 
36376)
+++ gnunet/src/set/gnunet-set-profiler.c        2015-09-27 04:32:52 UTC (rev 
36377)
@@ -98,6 +98,7 @@
                  enum GNUNET_SET_Status status)
 {
   struct SetInfo *info = cls;
+  struct GNUNET_HashCode hash;
 
   GNUNET_assert (GNUNET_NO == info->done);
   switch (status)
@@ -114,8 +115,15 @@
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n");
       GNUNET_SCHEDULER_shutdown ();
       return;
-    case GNUNET_SET_STATUS_OK:
+    case GNUNET_SET_STATUS_ADD_LOCAL:
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: local element\n", info->id);
       break;
+    case GNUNET_SET_STATUS_ADD_REMOTE:
+      GNUNET_CRYPTO_hash (element->data, element->size, &hash);
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: remote element %s\n", 
info->id,
+                  GNUNET_h2s (&hash));
+      // XXX: record and check
+      return;
     default:
       GNUNET_assert (0);
   }
@@ -122,7 +130,7 @@
 
   if (element->size != sizeof (struct GNUNET_HashCode))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u\n", 
element->size);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u, expected 
%u\n", element->size, sizeof (struct GNUNET_HashCode));
     GNUNET_assert (0);
   }
 
@@ -180,6 +188,8 @@
 handle_shutdown (void *cls,
                  const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Shutting down set profiler\n");
   if (NULL != set_listener)
   {
     GNUNET_SET_listen_cancel (set_listener);
@@ -209,11 +219,13 @@
 
 
 static void
-run (void *cls, char *const *args, const char *cfgfile,
-     const struct GNUNET_CONFIGURATION_Handle *cfg)
+run (void *cls,
+     const struct GNUNET_CONFIGURATION_Handle *cfg,
+     struct GNUNET_TESTING_Peer *peer)
 {
   unsigned int i;
   struct GNUNET_HashCode hash;
+  struct GNUNET_HashCode hashhash;
 
   config = cfg;
 
@@ -239,6 +251,9 @@
   for (i = 0; i < num_a; i++)
   {
     GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+    GNUNET_CRYPTO_hash (&hash, sizeof (struct GNUNET_HashCode), &hashhash);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Set a: Created element %s\n",
+                GNUNET_h2s (&hashhash));
     GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, NULL,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   }
@@ -246,6 +261,9 @@
   for (i = 0; i < num_b; i++)
   {
     GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+    GNUNET_CRYPTO_hash (&hash, sizeof (struct GNUNET_HashCode), &hashhash);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Set b: Created element %s\n",
+                GNUNET_h2s (&hashhash));
     GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, NULL,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   }
@@ -253,12 +271,14 @@
   for (i = 0; i < num_c; i++)
   {
     GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
+    GNUNET_CRYPTO_hash (&hash, sizeof (struct GNUNET_HashCode), &hashhash);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Set c: Created element %s\n",
+                GNUNET_h2s (&hashhash));
     GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, NULL,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   }
 
-  /* use last hash for app id */
-  app_id = hash;
+  GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &app_id);
 
   /* FIXME: also implement intersection etc. */
   info1.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
@@ -281,6 +301,17 @@
 }
 
 
+static void
+pre_run (void *cls, char *const *args, const char *cfgfile,
+         const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  if (0 != GNUNET_TESTING_peer_run ("set-profiler",
+                                    cfgfile,
+                                    &run, NULL))
+    ret = 2;
+}
+
+
 int
 main (int argc, char **argv)
 {
@@ -295,13 +326,13 @@
         gettext_noop ("number of values"),
         GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_c },
       { 'x', "operation", NULL,
-        gettext_noop ("oeration to execute"),
+        gettext_noop ("operation to execute"),
         GNUNET_YES, &GNUNET_GETOPT_set_string, &op_str },
       GNUNET_GETOPT_OPTION_END
   };
-  GNUNET_PROGRAM_run (argc, argv, "gnunet-set-profiler",
-                      "help",
-                      options, &run, NULL);
+  GNUNET_PROGRAM_run2 (argc, argv, "gnunet-set-profiler",
+                     "help",
+                     options, &pre_run, NULL, GNUNET_YES);
   return ret;
 }
 

Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c    2015-09-26 17:10:24 UTC (rev 36376)
+++ gnunet/src/set/set_api.c    2015-09-27 04:32:52 UTC (rev 36377)
@@ -398,40 +398,49 @@
                 "Ignoring result from canceled operation\n");
     return;
   }
-  if (GNUNET_SET_STATUS_OK != result_status)
+
+  switch (result_status)
   {
-    /* status is not #GNUNET_SET_STATUS_OK => there's no attached element,
-     * and this is the last result message we get */
-    GNUNET_MQ_assoc_remove (set->mq,
-                            ntohl (msg->request_id));
-    GNUNET_CONTAINER_DLL_remove (set->ops_head,
-                                 set->ops_tail,
-                                 oh);
-    if ( (GNUNET_YES == set->destroy_requested) &&
-         (NULL == set->ops_head) )
-      GNUNET_SET_destroy (set);
-    if (NULL != oh->result_cb)
-      oh->result_cb (oh->result_cls,
-                     NULL,
-                     result_status);
-    switch (result_status)
-    {
     case GNUNET_SET_STATUS_OK:
     case GNUNET_SET_STATUS_ADD_LOCAL:
     case GNUNET_SET_STATUS_ADD_REMOTE:
-      break;
+      goto do_element;
     case GNUNET_SET_STATUS_FAILURE:
-      oh->result_cb = NULL;
-      break;
+    case GNUNET_SET_STATUS_DONE:
+      goto do_final;
     case GNUNET_SET_STATUS_HALF_DONE:
-      break;
-    case GNUNET_SET_STATUS_DONE:
-      oh->result_cb = NULL;
-      break;
-    }
-    GNUNET_free (oh);
-    return;
+      /* not used anymore */
+      GNUNET_assert (0);
   }
+
+do_final:
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Treating result as final status\n");
+  GNUNET_MQ_assoc_remove (set->mq,
+                          ntohl (msg->request_id));
+  GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                               set->ops_tail,
+                               oh);
+  if (NULL != oh->result_cb)
+  {
+    oh->result_cb (oh->result_cls,
+                   NULL,
+                   result_status);
+  }
+  else
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "No callback for final status\n");
+  }
+  if ( (GNUNET_YES == set->destroy_requested) &&
+       (NULL == set->ops_head) )
+    GNUNET_SET_destroy (set);
+  GNUNET_free (oh);
+  return;
+
+do_element:
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Treating result as element\n");
   e.data = &msg[1];
   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
   e.element_type = msg->element_type;

Modified: gnunet/src/set/test_set.conf
===================================================================
--- gnunet/src/set/test_set.conf        2015-09-26 17:10:24 UTC (rev 36376)
+++ gnunet/src/set/test_set.conf        2015-09-27 04:32:52 UTC (rev 36377)
@@ -1,11 +1,12 @@
address@hidden@ ../../contrib/no_forcestart.conf
+
 [PATHS]
 GNUNET_TEST_HOME = /tmp/test-gnunet-set/
 
 [set]
 AUTOSTART = YES
address@hidden@ PORT = 2106
 #PREFIX = valgrind
-#PREFIX = valgrind -v --leak-check=full
+PREFIX = valgrind --leak-check=full
 #PREFIX = gdbserver :1234
 OPTIONS = -L INFO
 
@@ -21,38 +22,13 @@
 [peerinfo]
 NO_IO = YES
 
-[nse]
-WORKBITS = 0
+[nat]
+# Use addresses from the local network interfaces (inluding loopback, but also 
others)
+USE_LOCALADDR = YES
 
-[hostlist]
-FORCESTART = NO
-AUTOSTART = NO
+# Disable IPv6 support
+DISABLEV6 = NO
 
-[fs]
-FORCESTART = NO
-AUTOSTART = NO
+# Do we use addresses from localhost address ranges? (::1, 127.0.0.0/8)
+RETURN_LOCAL_ADDRESSES = YES
 
-[vpn]
-FORCESTART = NO
-AUTOSTART = NO
-
-[revocation]
-FORCESTART = NO
-AUTOSTART = NO
-
-[gns]
-FORCESTART = NO
-AUTOSTART = NO
-
-[namestore]
-FORCESTART = NO
-AUTOSTART = NO
-
-[namecache]
-FORCESTART = NO
-AUTOSTART = NO
-
-[topology]
-FORCESTART = NO
-AUTOSTART = NO
-

Deleted: gnunet/src/set/test_set_union_result_full.c
===================================================================
--- gnunet/src/set/test_set_union_result_full.c 2015-09-26 17:10:24 UTC (rev 
36376)
+++ gnunet/src/set/test_set_union_result_full.c 2015-09-27 04:32:52 UTC (rev 
36377)
@@ -1,359 +0,0 @@
-/*
-     This file is part of GNUnet.
-     Copyright (C) 2012 Christian Grothoff (and other contributing authors)
-
-     GNUnet is free software; you can redistribute it and/or modify
-     it under the terms of the GNU General Public License as published
-     by the Free Software Foundation; either version 3, or (at your
-     option) any later version.
-
-     GNUnet is distributed in the hope that it will be useful, but
-     WITHOUT ANY WARRANTY; without even the implied warranty of
-     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-     General Public License for more details.
-
-     You should have received a copy of the GNU General Public License
-     along with GNUnet; see the file COPYING.  If not, write to the
-     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
-     Boston, MA 02110-1301, USA.
-*/
-
-/**
- * @file set/test_set_union_result_full.c
- * @brief testcase for full result mode of the union set operation
- */
-#include "platform.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_testing_lib.h"
-#include "gnunet_set_service.h"
-
-
-/**
- * Value to return from #main().
- */
-static int ret;
-
-static struct GNUNET_PeerIdentity local_id;
-
-static struct GNUNET_HashCode app_id;
-static struct GNUNET_SET_Handle *set1;
-
-static struct GNUNET_SET_Handle *set2;
-
-static struct GNUNET_SET_ListenHandle *listen_handle;
-
-static const struct GNUNET_CONFIGURATION_Handle *config;
-
-static int iter_count;
-
-/**
- * Are we testing correctness for the empty set union?
- */
-static int empty;
-
-/**
- * Number of elements found in set 1
- */
-static unsigned int count_set1;
-
-/**
- * Number of elements found in set 2
- */
-static unsigned int count_set2;
-
-
-static void
-result_cb_set1 (void *cls,
-                const struct GNUNET_SET_Element *element,
-                enum GNUNET_SET_Status status)
-{
-  switch (status)
-  {
-    case GNUNET_SET_STATUS_OK:
-      count_set1++;
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "set 1: got element\n");
-      break;
-    case GNUNET_SET_STATUS_FAILURE:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "set 1: failure\n");
-      ret = 1;
-      GNUNET_SCHEDULER_shutdown ();
-      break;
-    case GNUNET_SET_STATUS_DONE:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "set 1: done\n");
-      GNUNET_SET_destroy (set1);
-      set1 = NULL;
-      if (NULL == set2)
-        GNUNET_SCHEDULER_shutdown ();
-      break;
-    default:
-      GNUNET_assert (0);
-  }
-}
-
-
-static void
-result_cb_set2 (void *cls,
-                const struct GNUNET_SET_Element *element,
-                enum GNUNET_SET_Status status)
-{
-  switch (status)
-  {
-    case GNUNET_SET_STATUS_OK:
-      count_set2++;
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "set 2: got element\n");
-      break;
-    case GNUNET_SET_STATUS_FAILURE:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "set 2: failure\n");
-      ret = 1;
-      GNUNET_SCHEDULER_shutdown ();
-      break;
-    case GNUNET_SET_STATUS_DONE:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "set 2: done\n");
-      GNUNET_SET_destroy (set2);
-      set2 = NULL;
-      if (NULL == set1)
-        GNUNET_SCHEDULER_shutdown ();
-      break;
-    default:
-      GNUNET_assert (0);
-  }
-}
-
-
-static void
-listen_cb (void *cls,
-           const struct GNUNET_PeerIdentity *other_peer,
-           const struct GNUNET_MessageHeader *context_msg,
-           struct GNUNET_SET_Request *request)
-{
-  struct GNUNET_SET_OperationHandle *oh;
-
-  GNUNET_assert (NULL != context_msg);
-  GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_TEST);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "listen cb called\n");
-  GNUNET_SET_listen_cancel (listen_handle);
-  oh = GNUNET_SET_accept (request,
-                          GNUNET_SET_RESULT_FULL,
-                          &result_cb_set2,
-                          NULL);
-  GNUNET_SET_commit (oh, set2);
-}
-
-
-/**
- * Start the set operation.
- *
- * @param cls closure, unused
- */
-static void
-start (void *cls)
-{
-  struct GNUNET_SET_OperationHandle *oh;
-  struct GNUNET_MessageHeader context_msg;
-
-  context_msg.size = htons (sizeof context_msg);
-  context_msg.type = htons (GNUNET_MESSAGE_TYPE_TEST);
-
-  listen_handle = GNUNET_SET_listen (config,
-                                     GNUNET_SET_OPERATION_UNION,
-                                     &app_id,
-                                     &listen_cb, NULL);
-  oh = GNUNET_SET_prepare (&local_id,
-                           &app_id,
-                           &context_msg,
-                           GNUNET_SET_RESULT_FULL,
-                           &result_cb_set1, NULL);
-  GNUNET_SET_commit (oh, set1);
-}
-
-
-/**
- * Initialize the second set, continue
- *
- * @param cls closure, unused
- */
-static void
-init_set2 (void *cls)
-{
-  struct GNUNET_SET_Element element;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "initializing set 2\n");
-  if (empty)
-  {
-    start (NULL);
-    return;
-  }
-  element.element_type = 0;
-  element.data = "hello";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (set2,
-                          &element,
-                          NULL,
-                          NULL);
-  element.data = "quux";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (set2,
-                          &element,
-                          NULL,
-                          NULL);
-  element.data = "baz";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (set2,
-                          &element,
-                          &start, NULL);
-}
-
-
-/**
- * Initialize the first set, continue.
- */
-static void
-init_set1 (void)
-{
-  struct GNUNET_SET_Element element;
-
-  if (empty)
-  {
-    init_set2 (NULL);
-    return;
-  }
-  element.element_type = 0;
-  element.data = "hello";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (set1,
-                          &element,
-                          NULL,
-                          NULL);
-  element.data = "bar";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (set1,
-                          &element,
-                          &init_set2,
-                          NULL);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "initialized set 1\n");
-}
-
-
-static int
-iter_cb (void *cls,
-         const struct GNUNET_SET_Element *element)
-{
-  if (NULL == element)
-  {
-    GNUNET_assert (iter_count == 3);
-    GNUNET_SET_destroy (cls);
-    return GNUNET_YES;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "iter: got element\n");
-  iter_count++;
-  return GNUNET_YES;
-}
-
-
-static void
-test_iter ()
-{
-  struct GNUNET_SET_Element element;
-  struct GNUNET_SET_Handle *iter_set;
-
-  iter_count = 0;
-  iter_set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
-  element.element_type = 0;
-  element.data = "hello";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
-  element.data = "bar";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
-  element.data = "quux";
-  element.size = strlen(element.data);
-  GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
-
-  GNUNET_SET_iterate (iter_set,
-                      &iter_cb,
-                      iter_set);
-}
-
-
-/**
- * Signature of the main function of a task.
- *
- * @param cls closure
- * @param tc context information (why was this task triggered now)
- */
-static void
-timeout_fail (void *cls,
-              const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    return;
-  GNUNET_SCHEDULER_shutdown ();
-  ret = 1;
-}
-
-
-/**
- * Signature of the 'main' function for a (single-peer) testcase that
- * is run using 'GNUNET_TESTING_peer_run'.
- *
- * @param cls closure
- * @param cfg configuration of the peer that was started
- * @param peer identity of the peer that was created
- */
-static void
-run (void *cls,
-     const struct GNUNET_CONFIGURATION_Handle *cfg,
-     struct GNUNET_TESTING_Peer *peer)
-{
-  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 5),
-                                &timeout_fail,
-                                NULL);
-
-  config = cfg;
-  GNUNET_TESTING_peer_get_identity (peer,
-                                    &local_id);
-
-  test_iter ();
-
-  set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
-  set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
-  GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
-
-  /* test the real set reconciliation */
-  init_set1 ();
-}
-
-
-int
-main (int argc, char **argv)
-{
-  empty = 1;
-  if (0 != GNUNET_TESTING_peer_run ("test_set_api",
-                                    "test_set.conf",
-                                    &run, NULL))
-  {
-    return 1;
-  }
-  GNUNET_assert (0 == count_set1);
-  GNUNET_assert (0 == count_set2);
-  empty = 0;
-  if (0 != GNUNET_TESTING_peer_run ("test_set_api",
-                                    "test_set.conf",
-                                    &run, NULL))
-  {
-    return 1;
-  }
-  GNUNET_assert (4 == count_set1);
-  GNUNET_assert (4 == count_set2);
-  return ret;
-}

Copied: gnunet/src/set/test_set_union_result_symmetric.c (from rev 36376, 
gnunet/src/set/test_set_union_result_full.c)
===================================================================
--- gnunet/src/set/test_set_union_result_symmetric.c                            
(rev 0)
+++ gnunet/src/set/test_set_union_result_symmetric.c    2015-09-27 04:32:52 UTC 
(rev 36377)
@@ -0,0 +1,363 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2012 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
+*/
+
+/**
+ * @file set/test_set_union_result_smmetric
+ * @brief testcase for symmetric result mode of the union set operation
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_set_service.h"
+
+
+/**
+ * Value to return from #main().
+ */
+static int ret;
+
+static struct GNUNET_PeerIdentity local_id;
+
+static struct GNUNET_HashCode app_id;
+static struct GNUNET_SET_Handle *set1;
+
+static struct GNUNET_SET_Handle *set2;
+
+static struct GNUNET_SET_ListenHandle *listen_handle;
+
+static const struct GNUNET_CONFIGURATION_Handle *config;
+
+static int iter_count;
+
+/**
+ * Are we testing correctness for the empty set union?
+ */
+static int empty;
+
+/**
+ * Number of elements found in set 1
+ */
+static unsigned int count_set1;
+
+/**
+ * Number of elements found in set 2
+ */
+static unsigned int count_set2;
+
+
+static void
+result_cb_set1 (void *cls,
+                const struct GNUNET_SET_Element *element,
+                enum GNUNET_SET_Status status)
+{
+  switch (status)
+  {
+    case GNUNET_SET_STATUS_ADD_LOCAL:
+      count_set1++;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "set 1: got element\n");
+      break;
+    case GNUNET_SET_STATUS_FAILURE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "set 1: failure\n");
+      ret = 1;
+      GNUNET_SCHEDULER_shutdown ();
+      break;
+    case GNUNET_SET_STATUS_DONE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "set 1: done\n");
+      GNUNET_SET_destroy (set1);
+      set1 = NULL;
+      if (NULL == set2)
+        GNUNET_SCHEDULER_shutdown ();
+      break;
+    case GNUNET_SET_STATUS_ADD_REMOTE:
+      break;
+    default:
+      GNUNET_assert (0);
+  }
+}
+
+
+static void
+result_cb_set2 (void *cls,
+                const struct GNUNET_SET_Element *element,
+                enum GNUNET_SET_Status status)
+{
+  switch (status)
+  {
+    case GNUNET_SET_STATUS_ADD_LOCAL:
+      count_set2++;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "set 2: got element\n");
+      break;
+    case GNUNET_SET_STATUS_FAILURE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "set 2: failure\n");
+      ret = 1;
+      GNUNET_SCHEDULER_shutdown ();
+      break;
+    case GNUNET_SET_STATUS_DONE:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "set 2: done\n");
+      GNUNET_SET_destroy (set2);
+      set2 = NULL;
+      if (NULL == set1)
+        GNUNET_SCHEDULER_shutdown ();
+      break;
+    case GNUNET_SET_STATUS_ADD_REMOTE:
+      break;
+    default:
+      GNUNET_assert (0);
+  }
+}
+
+
+static void
+listen_cb (void *cls,
+           const struct GNUNET_PeerIdentity *other_peer,
+           const struct GNUNET_MessageHeader *context_msg,
+           struct GNUNET_SET_Request *request)
+{
+  struct GNUNET_SET_OperationHandle *oh;
+
+  GNUNET_assert (NULL != context_msg);
+  GNUNET_assert (ntohs (context_msg->type) == GNUNET_MESSAGE_TYPE_TEST);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "listen cb called\n");
+  GNUNET_SET_listen_cancel (listen_handle);
+  oh = GNUNET_SET_accept (request,
+                          GNUNET_SET_RESULT_SYMMETRIC,
+                          &result_cb_set2,
+                          NULL);
+  GNUNET_SET_commit (oh, set2);
+}
+
+
+/**
+ * Start the set operation.
+ *
+ * @param cls closure, unused
+ */
+static void
+start (void *cls)
+{
+  struct GNUNET_SET_OperationHandle *oh;
+  struct GNUNET_MessageHeader context_msg;
+
+  context_msg.size = htons (sizeof context_msg);
+  context_msg.type = htons (GNUNET_MESSAGE_TYPE_TEST);
+
+  listen_handle = GNUNET_SET_listen (config,
+                                     GNUNET_SET_OPERATION_UNION,
+                                     &app_id,
+                                     &listen_cb, NULL);
+  oh = GNUNET_SET_prepare (&local_id,
+                           &app_id,
+                           &context_msg,
+                           GNUNET_SET_RESULT_SYMMETRIC,
+                           &result_cb_set1, NULL);
+  GNUNET_SET_commit (oh, set1);
+}
+
+
+/**
+ * Initialize the second set, continue
+ *
+ * @param cls closure, unused
+ */
+static void
+init_set2 (void *cls)
+{
+  struct GNUNET_SET_Element element;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "initializing set 2\n");
+  if (empty)
+  {
+    start (NULL);
+    return;
+  }
+  element.element_type = 0;
+  element.data = "hello";
+  element.size = strlen(element.data);
+  GNUNET_SET_add_element (set2,
+                          &element,
+                          NULL,
+                          NULL);
+  element.data = "quux";
+  element.size = strlen(element.data);
+  GNUNET_SET_add_element (set2,
+                          &element,
+                          NULL,
+                          NULL);
+  element.data = "baz";
+  element.size = strlen(element.data);
+  GNUNET_SET_add_element (set2,
+                          &element,
+                          &start, NULL);
+}
+
+
+/**
+ * Initialize the first set, continue.
+ */
+static void
+init_set1 (void)
+{
+  struct GNUNET_SET_Element element;
+
+  if (empty)
+  {
+    init_set2 (NULL);
+    return;
+  }
+  element.element_type = 0;
+  element.data = "hello";
+  element.size = strlen(element.data);
+  GNUNET_SET_add_element (set1,
+                          &element,
+                          NULL,
+                          NULL);
+  element.data = "bar";
+  element.size = strlen(element.data);
+  GNUNET_SET_add_element (set1,
+                          &element,
+                          &init_set2,
+                          NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "initialized set 1\n");
+}
+
+
+static int
+iter_cb (void *cls,
+         const struct GNUNET_SET_Element *element)
+{
+  if (NULL == element)
+  {
+    GNUNET_assert (iter_count == 3);
+    GNUNET_SET_destroy (cls);
+    return GNUNET_YES;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "iter: got element\n");
+  iter_count++;
+  return GNUNET_YES;
+}
+
+
+static void
+test_iter ()
+{
+  struct GNUNET_SET_Element element;
+  struct GNUNET_SET_Handle *iter_set;
+
+  iter_count = 0;
+  iter_set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+  element.element_type = 0;
+  element.data = "hello";
+  element.size = strlen(element.data);
+  GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
+  element.data = "bar";
+  element.size = strlen(element.data);
+  GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
+  element.data = "quux";
+  element.size = strlen(element.data);
+  GNUNET_SET_add_element (iter_set, &element, NULL, NULL);
+
+  GNUNET_SET_iterate (iter_set,
+                      &iter_cb,
+                      iter_set);
+}
+
+
+/**
+ * Signature of the main function of a task.
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now)
+ */
+static void
+timeout_fail (void *cls,
+              const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+    return;
+  GNUNET_SCHEDULER_shutdown ();
+  ret = 1;
+}
+
+
+/**
+ * Signature of the 'main' function for a (single-peer) testcase that
+ * is run using 'GNUNET_TESTING_peer_run'.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer that was started
+ * @param peer identity of the peer that was created
+ */
+static void
+run (void *cls,
+     const struct GNUNET_CONFIGURATION_Handle *cfg,
+     struct GNUNET_TESTING_Peer *peer)
+{
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 5),
+                                &timeout_fail,
+                                NULL);
+
+  config = cfg;
+  GNUNET_TESTING_peer_get_identity (peer,
+                                    &local_id);
+
+  test_iter ();
+
+  set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+  GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
+
+  /* test the real set reconciliation */
+  init_set1 ();
+}
+
+
+int
+main (int argc, char **argv)
+{
+  empty = 1;
+  if (0 != GNUNET_TESTING_peer_run ("test_set_api",
+                                    "test_set.conf",
+                                    &run, NULL))
+  {
+    return 1;
+  }
+  GNUNET_assert (0 == count_set1);
+  GNUNET_assert (0 == count_set2);
+  empty = 0;
+  if (0 != GNUNET_TESTING_peer_run ("test_set_api",
+                                    "test_set.conf",
+                                    &run, NULL))
+  {
+    return 1;
+  }
+  GNUNET_assert (2 == count_set1);
+  GNUNET_assert (1 == count_set2);
+  return ret;
+}




reply via email to

[Prev in Thread] Current Thread [Next in Thread]