gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (3b76938ba -> abdec5e11)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (3b76938ba -> abdec5e11)
Date: Sat, 11 Mar 2017 18:15:40 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a change to branch master
in repository gnunet.

    from 3b76938ba rename cadet*new to just cadet, except for 
libgnunetcadetnew-logic (where the 'old' one is not yet entirely dead)
     new 4e981fb2b fix memory leak
     new abdec5e11 cleaning up set handlers, eliminating 2nd level 
demultiplexing and improving use of types

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/arm/gnunet-service-arm.c              |   1 +
 src/set/Makefile.am                       |   2 +-
 src/set/gnunet-service-set.c              | 284 ++++++-------
 src/set/gnunet-service-set.h              |  50 ++-
 src/set/gnunet-service-set_intersection.c | 139 +++----
 src/set/gnunet-service-set_union.c        | 645 +++++++++++++++++-------------
 src/set/set_api.c                         |  12 +-
 src/set/test_set_union_copy.c             |   1 +
 8 files changed, 599 insertions(+), 535 deletions(-)

diff --git a/src/arm/gnunet-service-arm.c b/src/arm/gnunet-service-arm.c
index 4f3e964e3..cc23ef1f6 100644
--- a/src/arm/gnunet-service-arm.c
+++ b/src/arm/gnunet-service-arm.c
@@ -812,6 +812,7 @@ start_process (struct ServiceList *sl,
                        "%s %s",
                        fin_options,
                        optpos);
+      GNUNET_free (fin_options);
       GNUNET_free (optpos);
     }
     else
diff --git a/src/set/Makefile.am b/src/set/Makefile.am
index cfe95bc1a..03c258352 100644
--- a/src/set/Makefile.am
+++ b/src/set/Makefile.am
@@ -51,7 +51,7 @@ gnunet_set_ibf_profiler_LDADD = \
 
 gnunet_service_set_SOURCES = \
  gnunet-service-set.c gnunet-service-set.h \
- gnunet-service-set_union.c \
+ gnunet-service-set_union.c gnunet-service-set_union.h \
  gnunet-service-set_intersection.c \
  ibf.c ibf.h \
  gnunet-service-set_union_strata_estimator.c 
gnunet-service-set_union_strata_estimator.h \
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 454ad9784..8f1506c6a 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -24,6 +24,8 @@
  * @author Christian Grothoff
  */
 #include "gnunet-service-set.h"
+#include "gnunet-service-set_union.h"
+#include "gnunet-service-set_intersection.h"
 #include "gnunet-service-set_protocol.h"
 #include "gnunet_statistics_service.h"
 
@@ -476,6 +478,7 @@ _GSS_operation_destroy (struct Operation *op,
     op->channel = NULL;
     GNUNET_CADET_channel_destroy (channel);
   }
+
   if (GNUNET_YES == gc)
     collect_generation_garbage (set);
   /* We rely on the channel end handler to free 'op'. When 'op->channel' was 
NULL,
@@ -682,7 +685,7 @@ client_disconnect_cb (void *cls,
     {
       struct Operation *curr = op;
       op = op->next;
-      if ( (GNUNET_YES == curr->is_incoming) && 
+      if ( (GNUNET_YES == curr->is_incoming) &&
            (curr->listener == listener) )
         incoming_destroy (curr);
     }
@@ -733,6 +736,38 @@ incoming_suggest (struct Operation *incoming,
 
 
 /**
+ * Check a request for a set operation from another peer.
+ *
+ * @param cls the operation state
+ * @param msg the received message
+ * @return #GNUNET_OK if the channel should be kept alive,
+ *         #GNUNET_SYSERR to destroy the channel
+ */
+static int
+check_incoming_msg (void *cls,
+                    const struct OperationRequestMessage *msg)
+{
+  struct Operation *op = cls;
+  const struct GNUNET_MessageHeader *nested_context;
+
+  /* double operation request */
+  if (NULL != op->spec)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  nested_context = GNUNET_MQ_extract_nested_mh (msg);
+  if ( (NULL != nested_context) &&
+       (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
  * Handle a request for a set operation from another peer.  Checks if we
  * have a listener waiting for such a request (and in that case initiates
  * asking the listener about accepting the connection). If no listener
@@ -744,42 +779,23 @@ incoming_suggest (struct Operation *incoming,
  * our virtual table and subsequent msgs would be routed differently (as
  * we then know what type of operation this is).
  *
- * @param op the operation state
- * @param mh the received message
+ * @param cls the operation state
+ * @param msg the received message
  * @return #GNUNET_OK if the channel should be kept alive,
  *         #GNUNET_SYSERR to destroy the channel
  */
-static int
-handle_incoming_msg (struct Operation *op,
-                     const struct GNUNET_MessageHeader *mh)
+static void
+handle_incoming_msg (void *cls,
+                     const struct OperationRequestMessage *msg)
 {
-  const struct OperationRequestMessage *msg;
+  struct Operation *op = cls;
   struct Listener *listener = op->listener;
   struct OperationSpecification *spec;
   const struct GNUNET_MessageHeader *nested_context;
 
-  msg = (const struct OperationRequestMessage *) mh;
   GNUNET_assert (GNUNET_YES == op->is_incoming);
-  if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
-  /* double operation request */
-  if (NULL != op->spec)
-  {
-    GNUNET_break_op (0);
-    return GNUNET_SYSERR;
-  }
   spec = GNUNET_new (struct OperationSpecification);
   nested_context = GNUNET_MQ_extract_nested_mh (msg);
-  if ( (NULL != nested_context) &&
-       (ntohs (nested_context->size) > GNUNET_SET_CONTEXT_MESSAGE_MAX_SIZE) )
-  {
-    GNUNET_break_op (0);
-    GNUNET_free (spec);
-    return GNUNET_SYSERR;
-  }
   /* Make a copy of the nested_context (application-specific context
      information that is opaque to set) so we can pass it to the
      listener later on */
@@ -792,7 +808,6 @@ handle_incoming_msg (struct Operation *op,
   spec->peer = op->peer;
   spec->remote_element_count = ntohl (msg->element_count);
   op->spec = spec;
-
   listener = op->listener;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received P2P operation request (op %u, port %s) for active 
listener\n",
@@ -800,7 +815,6 @@ handle_incoming_msg (struct Operation *op,
               GNUNET_h2s (&listener->app_id));
   incoming_suggest (op,
                     listener);
-  return GNUNET_OK;
 }
 
 
@@ -1103,9 +1117,11 @@ handle_client_create_set (void *cls,
   {
   case GNUNET_SET_OPERATION_INTERSECTION:
     set->vt = _GSS_intersection_vt ();
+    set->type = OT_INTERSECTION;
     break;
   case GNUNET_SET_OPERATION_UNION:
     set->vt = _GSS_union_vt ();
+    set->type = OT_UNION;
     break;
   default:
     GNUNET_free (set);
@@ -1196,7 +1212,6 @@ channel_new_cb (void *cls,
                 const struct GNUNET_PeerIdentity *source)
 {
   static const struct SetVT incoming_vt = {
-    .msg_handler = &handle_incoming_msg,
     .peer_disconnect = &handle_incoming_disconnect
   };
   struct Listener *listener = cls;
@@ -1290,60 +1305,6 @@ channel_window_cb (void *cls,
   /* FIXME: not implemented, we could do flow control here... */
 }
 
-/**
- * FIXME: hack-job. Migrate to proper handler array use!
- *
- * @param cls local state associated with the channel.
- * @param message The actual message.
- */
-static int
-check_p2p_message (void *cls,
-                   const struct GNUNET_MessageHeader *message)
-{
-  return GNUNET_OK;
-}
-
-
-/**
- * FIXME: hack-job. Migrate to proper handler array use!
- *
- * Functions with this signature are called whenever a message is
- * received via a cadet channel.
- *
- * The msg_handler is a virtual table set in initially either when a peer
- * creates a new channel with us, or once we create a new channel
- * ourselves (evaluate).
- *
- * Once we know the exact type of operation (union/intersection), the vt is
- * replaced with an operation specific instance (_GSS_[op]_vt).
- *
- * @param cls local state associated with the channel.
- * @param message The actual message.
- */
-static void
-handle_p2p_message (void *cls,
-                    const struct GNUNET_MessageHeader *message)
-{
-  struct Operation *op = cls;
-  int ret;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Dispatching cadet message (type: %u)\n",
-              ntohs (message->type));
-  /* do this before the handler, as the handler might kill the channel */
-  GNUNET_CADET_receive_done (op->channel);
-  if (NULL != op->vt)
-    ret = op->vt->msg_handler (op,
-                               message);
-  else
-    ret = GNUNET_SYSERR;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Handled cadet message (type: %u)\n",
-              ntohs (message->type));
-  if (GNUNET_OK != ret)
-    GNUNET_CADET_channel_destroy (op->channel);
-}
-
 
 /**
  * Called when a client wants to create a new listener.
@@ -1357,66 +1318,66 @@ handle_client_listen (void *cls,
 {
   struct GNUNET_SERVICE_Client *client = cls;
   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (incoming_msg,
                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
-                           struct GNUNET_MessageHeader,
+                           struct OperationRequestMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_ibf,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
-                           struct GNUNET_MessageHeader,
+                           struct IBFMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_elements,
                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
-                           struct GNUNET_MessageHeader,
+                           struct GNUNET_SET_ElementMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_offer,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
                            struct GNUNET_MessageHeader,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_inquiry,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
-                           struct GNUNET_MessageHeader,
+                           struct InquiryMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_demand,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
                            struct GNUNET_MessageHeader,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_fixed_size (union_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+                             struct GNUNET_MessageHeader,
+                             NULL),
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
-                           struct GNUNET_MessageHeader,
+                           struct StrataEstimatorMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
-                           struct GNUNET_MessageHeader,
+                           struct StrataEstimatorMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_full_element,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
-                           struct GNUNET_MessageHeader,
+                           struct GNUNET_SET_ElementMessage,
                            NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
+                             
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
+                             struct IntersectionElementInfoMessage,
+                             NULL),
+    GNUNET_MQ_hd_var_size (intersection_p2p_bf,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
-                           struct GNUNET_MessageHeader,
-                           NULL),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
-                           struct GNUNET_MessageHeader,
+                           struct BFMessage,
                            NULL),
+    GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
+                             struct IntersectionDoneMessage,
+                             NULL),
     GNUNET_MQ_handler_end ()
   };
   struct Listener *listener;
@@ -1623,66 +1584,66 @@ handle_client_evaluate (void *cls,
   struct GNUNET_SERVICE_Client *client = cls;
   struct Operation *op = GNUNET_new (struct Operation);
   const struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (incoming_msg,
                            GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
-                           struct GNUNET_MessageHeader,
+                           struct OperationRequestMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_ibf,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF,
-                           struct GNUNET_MessageHeader,
+                           struct IBFMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_elements,
                            GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS,
-                           struct GNUNET_MessageHeader,
+                           struct GNUNET_SET_ElementMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_offer,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER,
                            struct GNUNET_MessageHeader,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_inquiry,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY,
-                           struct GNUNET_MessageHeader,
+                           struct InquiryMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_demand,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND,
                            struct GNUNET_MessageHeader,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_fixed_size (union_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_fixed_size (union_p2p_full_done,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_fixed_size (union_p2p_request_full,
+                             GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
+                             struct GNUNET_MessageHeader,
+                             op),
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE,
-                           struct GNUNET_MessageHeader,
+                           struct StrataEstimatorMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_strata_estimator,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL,
-                           struct GNUNET_MessageHeader,
+                           struct StrataEstimatorMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_var_size (union_p2p_full_element,
                            GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
-                           struct GNUNET_MessageHeader,
+                           struct GNUNET_SET_ElementMessage,
                            op),
-    GNUNET_MQ_hd_var_size (p2p_message,
+    GNUNET_MQ_hd_fixed_size (intersection_p2p_element_info,
+                             
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO,
+                             struct IntersectionElementInfoMessage,
+                             op),
+    GNUNET_MQ_hd_var_size (intersection_p2p_bf,
                            GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF,
-                           struct GNUNET_MessageHeader,
-                           op),
-    GNUNET_MQ_hd_var_size (p2p_message,
-                           GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
-                           struct GNUNET_MessageHeader,
+                           struct BFMessage,
                            op),
+    GNUNET_MQ_hd_fixed_size (intersection_p2p_done,
+                             GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE,
+                             struct IntersectionDoneMessage,
+                             op),
     GNUNET_MQ_handler_end ()
   };
   struct Set *set;
@@ -1717,7 +1678,7 @@ handle_client_evaluate (void *cls,
   // mutations won't interfer with the running operation.
   op->generation_created = set->current_generation;
   advance_generation (set);
-
+  op->type = set->type;
   op->vt = set->vt;
   GNUNET_CONTAINER_DLL_insert (set->ops_head,
                                set->ops_tail,
@@ -1886,9 +1847,11 @@ handle_client_copy_lazy_connect (void *cls,
   {
   case GNUNET_SET_OPERATION_INTERSECTION:
     set->vt = _GSS_intersection_vt ();
+    set->type = OT_INTERSECTION;
     break;
   case GNUNET_SET_OPERATION_UNION:
     set->vt = _GSS_union_vt ();
+    set->type = OT_UNION;
     break;
   default:
     GNUNET_assert (0);
@@ -2057,6 +2020,7 @@ handle_client_accept (void *cls,
   advance_generation (set);
 
   op->vt = set->vt;
+  op->type = set->type;
   op->vt->accept (op);
   GNUNET_SERVICE_client_continue (client);
 }
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index 68d8fe81f..c981430ef 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -213,20 +213,6 @@ typedef void
 
 
 /**
- * Signature of functions that implement the message handling for
- * the different set operations.
- *
- * @param op operation state
- * @param msg received message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to
- *         destroy the operation and the tunnel
- */
-typedef int
-(*MsgHandlerImpl) (struct Operation *op,
-                   const struct GNUNET_MessageHeader *msg);
-
-
-/**
  * Signature of functions that implement operation cancellation
  *
  * @param op operation state
@@ -276,11 +262,6 @@ struct SetVT
   DestroySetImpl destroy_set;
 
   /**
-   * Callback for handling operation-specific messages.
-   */
-  MsgHandlerImpl msg_handler;
-
-  /**
    * Callback for handling the remote peer's disconnect.
    */
   PeerDisconnectImpl peer_disconnect;
@@ -364,6 +345,27 @@ struct Listener;
 
 
 /**
+ * Possible set operations.
+ */
+enum OperationType {
+  /**
+   * Operation type unknown.
+   */
+  OT_UNKNOWN = 0,
+
+  /**
+   * We are performing a union.
+   */
+  OT_UNION,
+
+  /**
+   * We are performing an intersection.
+   */
+  OT_INTERSECTION
+};
+
+
+/**
  * Operation context used to execute a set operation.
  */
 struct Operation
@@ -427,6 +429,11 @@ struct Operation
   struct GNUNET_SCHEDULER_Task *timeout_task;
 
   /**
+   * What type of operation is this?
+   */
+  enum OperationType type;
+
+  /**
    * Unique request id for the request from a remote peer, sent to the
    * client, which will accept or reject the request.  Set to '0' iff
    * the request has not been suggested yet.
@@ -582,6 +589,11 @@ struct Set
   struct Operation *ops_tail;
 
   /**
+   * What type of operation is this set for?
+   */
+  enum OperationType type;
+
+  /**
    * Current generation, that is, number of previously executed
    * operations and lazy copies on the underlying set content.
    */
diff --git a/src/set/gnunet-service-set_intersection.c 
b/src/set/gnunet-service-set_intersection.c
index 9fe1eabe6..b298f7b41 100644
--- a/src/set/gnunet-service-set_intersection.c
+++ b/src/set/gnunet-service-set_intersection.c
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2013, 2014 GNUnet e.V.
+      Copyright (C) 2013-2017 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -28,6 +28,7 @@
 #include "gnunet-service-set.h"
 #include "gnunet_block_lib.h"
 #include "gnunet-service-set_protocol.h"
+#include "gnunet-service-set_intersection.h"
 #include <gcrypt.h>
 
 
@@ -550,6 +551,8 @@ send_remaining_elements (void *cls)
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Sending done and destroy because iterator ran out\n");
     op->keep--;
+    GNUNET_CONTAINER_multihashmap_iterator_destroy 
(op->state->full_result_iter);
+    op->state->full_result_iter = NULL;
     send_client_done_and_destroy (op);
     return;
   }
@@ -627,9 +630,6 @@ process_bf (struct Operation *op)
   case PHASE_COUNT_SENT:
     /* This is the first BF being sent, build our initial map with
        filtering in place */
-    op->state->my_elements
-      = GNUNET_CONTAINER_multihashmap_create (op->spec->remote_element_count,
-                                              GNUNET_YES);
     op->state->my_element_count = 0;
     GNUNET_CONTAINER_multihashmap_iterate (op->spec->set->content->elements,
                                            &filtered_map_initialization,
@@ -665,41 +665,53 @@ process_bf (struct Operation *op)
 
 
 /**
+ * Check an BF message from a remote peer.
+ *
+ * @param cls the intersection operation
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
+ */
+int
+check_intersection_p2p_bf (void *cls,
+                           const struct BFMessage *msg)
+{
+  struct Operation *op = cls;
+
+  if (OT_INTERSECTION != op->type)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
  * Handle an BF message from a remote peer.
  *
  * @param cls the intersection operation
- * @param mh the header of the message
+ * @param msg the header of the message
  */
-static void
-handle_p2p_bf (void *cls,
-               const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_bf (void *cls,
+                            const struct BFMessage *msg)
 {
   struct Operation *op = cls;
-  const struct BFMessage *msg;
   uint32_t bf_size;
   uint32_t chunk_size;
   uint32_t bf_bits_per_element;
-  uint16_t msize;
 
-  msize = htons (mh->size);
-  if (msize < sizeof (struct BFMessage))
-  {
-    GNUNET_break_op (0);
-    fail_intersection_operation (op);
-    return;
-  }
-  msg = (const struct BFMessage *) mh;
   switch (op->state->phase)
   {
   case PHASE_INITIAL:
     GNUNET_break_op (0);
     fail_intersection_operation (op);
-    break;
+    return;
   case PHASE_COUNT_SENT:
   case PHASE_BF_EXCHANGE:
     bf_size = ntohl (msg->bloomfilter_total_length);
     bf_bits_per_element = ntohl (msg->bits_per_element);
-    chunk_size = msize - sizeof (struct BFMessage);
+    chunk_size = htons (msg->header.size) - sizeof (struct BFMessage);
     op->state->other_xor = msg->element_xor_hash;
     if (bf_size == chunk_size)
     {
@@ -717,7 +729,7 @@ handle_p2p_bf (void *cls,
       op->state->salt = ntohl (msg->sender_mutator);
       op->spec->remote_element_count = ntohl (msg->sender_element_count);
       process_bf (op);
-      return;
+      break;
     }
     /* multipart chunk */
     if (NULL == op->state->bf_data)
@@ -764,8 +776,9 @@ handle_p2p_bf (void *cls,
   default:
     GNUNET_break_op (0);
     fail_intersection_operation (op);
-    break;
+    return;
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -836,6 +849,7 @@ static void
 begin_bf_exchange (struct Operation *op)
 {
   op->state->phase = PHASE_BF_EXCHANGE;
+  GNUNET_assert (NULL == op->state->my_elements);
   op->state->my_elements
     = GNUNET_CONTAINER_multihashmap_create (op->state->my_element_count,
                                             GNUNET_YES);
@@ -853,20 +867,18 @@ begin_bf_exchange (struct Operation *op)
  * @param cls the intersection operation
  * @param mh the header of the message
  */
-static void
-handle_p2p_element_info (void *cls,
-                         const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_element_info (void *cls,
+                                      const struct 
IntersectionElementInfoMessage *msg)
 {
   struct Operation *op = cls;
-  const struct IntersectionElementInfoMessage *msg;
 
-  if (ntohs (mh->size) != sizeof (struct IntersectionElementInfoMessage))
+  if (OT_INTERSECTION != op->type)
   {
     GNUNET_break_op (0);
     fail_intersection_operation(op);
     return;
   }
-  msg = (const struct IntersectionElementInfoMessage *) mh;
   op->spec->remote_element_count = ntohl (msg->sender_element_count);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Received remote element count (%u), I have %u\n",
@@ -884,6 +896,7 @@ handle_p2p_element_info (void *cls,
   }
   GNUNET_break (NULL == op->state->remote_bf);
   begin_bf_exchange (op);
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -955,28 +968,26 @@ filter_all (void *cls,
  * @param cls the intersection operation
  * @param mh the message
  */
-static void
-handle_p2p_done (void *cls,
-                 const struct GNUNET_MessageHeader *mh)
+void
+handle_intersection_p2p_done (void *cls,
+                              const struct IntersectionDoneMessage *idm)
 {
   struct Operation *op = cls;
-  const struct IntersectionDoneMessage *idm;
 
-  if (PHASE_BF_EXCHANGE != op->state->phase)
+  if (OT_INTERSECTION != op->type)
   {
-    /* wrong phase to conclude? FIXME: Or should we allow this
-       if the other peer has _initially_ already an empty set? */
     GNUNET_break_op (0);
-    fail_intersection_operation (op);
+    fail_intersection_operation(op);
     return;
   }
-  if (ntohs (mh->size) != sizeof (struct IntersectionDoneMessage))
+  if (PHASE_BF_EXCHANGE != op->state->phase)
   {
+    /* wrong phase to conclude? FIXME: Or should we allow this
+       if the other peer has _initially_ already an empty set? */
     GNUNET_break_op (0);
     fail_intersection_operation (op);
     return;
   }
-  idm = (const struct IntersectionDoneMessage *) mh;
   if (0 == ntohl (idm->final_element_count))
   {
     /* other peer determined empty set is the intersection,
@@ -1000,6 +1011,7 @@ handle_p2p_done (void *cls,
               op->state->my_element_count);
   op->state->phase = PHASE_FINISHED;
   finish_and_destroy (op);
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1064,11 +1076,11 @@ intersection_accept (struct Operation *op)
   op->state->phase = PHASE_INITIAL;
   op->state->my_element_count
     = op->spec->set->state->current_set_element_count;
+  GNUNET_assert (NULL == op->state->my_elements);
   op->state->my_elements
-    = GNUNET_CONTAINER_multihashmap_create
-    (GNUNET_MIN (op->state->my_element_count,
-                 op->spec->remote_element_count),
-     GNUNET_YES);
+    = GNUNET_CONTAINER_multihashmap_create (GNUNET_MIN 
(op->state->my_element_count,
+                                                        
op->spec->remote_element_count),
+                                            GNUNET_YES);
   if (op->spec->remote_element_count < op->state->my_element_count)
   {
     /* If the other peer (Alice) has fewer elements than us (Bob),
@@ -1083,43 +1095,6 @@ intersection_accept (struct Operation *op)
 
 
 /**
- * Dispatch messages for a intersection operation.
- *
- * @param op the state of the intersection evaluate operation
- * @param mh the received message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- *         #GNUNET_OK otherwise
- */
-static int
-intersection_handle_p2p_message (struct Operation *op,
-                                 const struct GNUNET_MessageHeader *mh)
-{
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Received p2p message (t: %u, s: %u)\n",
-              ntohs (mh->type), ntohs (mh->size));
-  switch (ntohs (mh->type))
-  {
-    /* this message handler is not active until after we received an
-     * operation request message, thus the ops request is not handled here
-     */
-  case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO:
-    handle_p2p_element_info (op, mh);
-    break;
-  case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF:
-    handle_p2p_bf (op, mh);
-    break;
-  case GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_DONE:
-    handle_p2p_done (op, mh);
-    break;
-  default:
-    /* something wrong with cadet's message handlers? */
-    GNUNET_assert (0);
-  }
-  return GNUNET_OK;
-}
-
-
-/**
  * Handler for peer-disconnects, notifies the client about the aborted
  * operation.  If we did not expect anything from the other peer, we
  * gracefully terminate the operation.
@@ -1168,6 +1143,11 @@ intersection_op_cancel (struct Operation *op)
     GNUNET_CONTAINER_multihashmap_destroy (op->state->my_elements);
     op->state->my_elements = NULL;
   }
+  if (NULL != op->state->full_result_iter)
+  {
+    GNUNET_CONTAINER_multihashmap_iterator_destroy 
(op->state->full_result_iter);
+    op->state->full_result_iter = NULL;
+  }
   GNUNET_free (op->state);
   op->state = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1245,7 +1225,6 @@ _GSS_intersection_vt ()
 {
   static const struct SetVT intersection_vt = {
     .create = &intersection_set_create,
-    .msg_handler = &intersection_handle_p2p_message,
     .add = &intersection_add,
     .remove = &intersection_remove,
     .destroy_set = &intersection_set_destroy,
diff --git a/src/set/gnunet-service-set_union.c 
b/src/set/gnunet-service-set_union.c
index b5b602074..200bd4b8e 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -1,6 +1,6 @@
 /*
       This file is part of GNUnet
-      Copyright (C) 2013-2016 GNUnet e.V.
+      Copyright (C) 2013-2017 GNUnet e.V.
 
       GNUnet is free software; you can redistribute it and/or modify
       it under the terms of the GNU General Public License as published
@@ -19,15 +19,16 @@
 */
 /**
  * @file set/gnunet-service-set_union.c
-
  * @brief two-peer set operations
  * @author Florian Dold
+ * @author Christian Grothoff
  */
 #include "platform.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_statistics_service.h"
 #include "gnunet-service-set.h"
 #include "ibf.h"
+#include "gnunet-service-set_union.h"
 #include "gnunet-service-set_union_strata_estimator.h"
 #include "gnunet-service-set_protocol.h"
 #include <gcrypt.h>
@@ -813,42 +814,56 @@ send_full_set (struct Operation *op)
  * Handle a strata estimator from a remote peer
  *
  * @param cls the union operation
- * @param mh the message
- * @param is_compressed #GNUNET_YES if the estimator is compressed
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- *         #GNUNET_OK otherwise
+ * @param msg the message
  */
-static int
-handle_p2p_strata_estimator (void *cls,
-                             const struct GNUNET_MessageHeader *mh,
-                             int is_compressed)
+int
+check_union_p2p_strata_estimator (void *cls,
+                                  const struct StrataEstimatorMessage *msg)
 {
   struct Operation *op = cls;
-  struct StrataEstimator *remote_se;
-  struct StrataEstimatorMessage *msg = (void *) mh;
-  unsigned int diff;
-  uint64_t other_size;
+  int is_compressed;
   size_t len;
 
-  GNUNET_STATISTICS_update (_GSS_statistics,
-                            "# bytes of SE received",
-                            ntohs (mh->size),
-                            GNUNET_NO);
-
   if (op->state->phase != PHASE_EXPECT_SE)
   {
     GNUNET_break (0);
-    fail_union_operation (op);
     return GNUNET_SYSERR;
   }
-  len = ntohs (mh->size) - sizeof (struct StrataEstimatorMessage);
+  is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons 
(msg->header.type));
+  len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
   if ( (GNUNET_NO == is_compressed) &&
        (len != SE_STRATA_COUNT * SE_IBF_SIZE * IBF_BUCKET_SIZE) )
   {
-    fail_union_operation (op);
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Handle a strata estimator from a remote peer
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+void
+handle_union_p2p_strata_estimator (void *cls,
+                                   const struct StrataEstimatorMessage *msg)
+{
+  struct Operation *op = cls;
+  struct StrataEstimator *remote_se;
+  unsigned int diff;
+  uint64_t other_size;
+  size_t len;
+  int is_compressed;
+
+  is_compressed = (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC == htons 
(msg->header.type));
+  GNUNET_STATISTICS_update (_GSS_statistics,
+                            "# bytes of SE received",
+                            ntohs (msg->header.size),
+                            GNUNET_NO);
+  len = ntohs (msg->header.size) - sizeof (struct StrataEstimatorMessage);
   other_size = GNUNET_ntohll (msg->set_size);
   remote_se = strata_estimator_create (SE_STRATA_COUNT,
                                        SE_IBF_SIZE,
@@ -857,7 +872,7 @@ handle_p2p_strata_estimator (void *cls,
   {
     /* insufficient resources, fail */
     fail_union_operation (op);
-    return GNUNET_SYSERR;
+    return;
   }
   if (GNUNET_OK !=
       strata_estimator_read (&msg[1],
@@ -866,18 +881,16 @@ handle_p2p_strata_estimator (void *cls,
                              remote_se))
   {
     /* decompression failed */
-    fail_union_operation (op);
     strata_estimator_destroy (remote_se);
-    return GNUNET_SYSERR;
+    fail_union_operation (op);
+    return;
   }
   GNUNET_assert (NULL != op->state->se);
   diff = strata_estimator_difference (remote_se,
                                       op->state->se);
 
   if (diff > 200)
-    diff = diff * 3 / 2; 
-
-
+    diff = diff * 3 / 2;
 
   strata_estimator_destroy (remote_se);
   strata_estimator_destroy (op->state->se);
@@ -885,12 +898,14 @@ handle_p2p_strata_estimator (void *cls,
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "got se diff=%d, using ibf size %d\n",
        diff,
-       1<<get_order_from_difference (diff));
+       1U << get_order_from_difference (diff));
 
   {
     char *set_debug;
+
     set_debug = getenv ("GNUNET_SET_BENCHMARK");
-    if ( (NULL != set_debug) && (0 == strcmp (set_debug, "1")) )
+    if ( (NULL != set_debug) &&
+         (0 == strcmp (set_debug, "1")) )
     {
       FILE *f = fopen ("set.log", "a");
       fprintf (f, "%llu\n", (unsigned long long) diff);
@@ -898,15 +913,16 @@ handle_p2p_strata_estimator (void *cls,
     }
   }
 
-  if ((GNUNET_YES == op->spec->byzantine) && (other_size < 
op->spec->byzantine_lower_bound))
+  if ( (GNUNET_YES == op->spec->byzantine) &&
+       (other_size < op->spec->byzantine_lower_bound) )
   {
     GNUNET_break (0);
     fail_union_operation (op);
-    return GNUNET_SYSERR;
+    return;
   }
 
-
-  if ( (GNUNET_YES == op->spec->force_full) || (diff > op->state->initial_size 
/ 4))
+  if ( (GNUNET_YES == op->spec->force_full) ||
+       (diff > op->state->initial_size / 4))
   {
     LOG (GNUNET_ERROR_TYPE_INFO,
          "Sending full set (diff=%d, own set=%u)\n",
@@ -923,6 +939,7 @@ handle_p2p_strata_estimator (void *cls,
     else
     {
       struct GNUNET_MQ_Envelope *ev;
+
       op->state->phase = PHASE_EXPECT_IBF;
       ev = GNUNET_MQ_msg_header 
(GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL);
       GNUNET_MQ_send (op->mq, ev);
@@ -942,11 +959,10 @@ handle_p2p_strata_estimator (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Failed to send IBF, closing connection\n");
       fail_union_operation (op);
-      return GNUNET_SYSERR;
+      return;
     }
   }
-
-  return GNUNET_OK;
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1164,99 +1180,116 @@ decode_and_send (struct Operation *op)
 
 
 /**
- * Handle an IBF message from a remote peer.
+ * Check an IBF message from a remote peer.
  *
  * Reassemble the IBF from multiple pieces, and
  * process the whole IBF once possible.
  *
  * @param cls the union operation
- * @param mh the header of the message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- *         #GNUNET_OK otherwise
+ * @param msg the header of the message
+ * @return #GNUNET_OK if @a msg is well-formed
  */
-static int
-handle_p2p_ibf (void *cls,
-                const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_ibf (void *cls,
+                     const struct IBFMessage *msg)
 {
   struct Operation *op = cls;
-  const struct IBFMessage *msg;
   unsigned int buckets_in_message;
 
-  if (ntohs (mh->size) < sizeof (struct IBFMessage))
+  if (OT_UNION != op->type)
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
     return GNUNET_SYSERR;
   }
-  msg = (const struct IBFMessage *) mh;
-  if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
-       (op->state->phase == PHASE_EXPECT_IBF) )
+  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / 
IBF_BUCKET_SIZE;
+  if (0 == buckets_in_message)
   {
-    op->state->phase = PHASE_EXPECT_IBF_CONT;
-    GNUNET_assert (NULL == op->state->remote_ibf);
-    LOG (GNUNET_ERROR_TYPE_DEBUG,
-         "Creating new ibf of size %u\n",
-         1 << msg->order);
-    op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
-    op->state->salt_receive = ntohl (msg->salt);
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "Receiving new IBF with salt %u\n", 
op->state->salt_receive);
-    if (NULL == op->state->remote_ibf)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Failed to parse remote IBF, closing connection\n");
-      fail_union_operation (op);
-      return GNUNET_SYSERR;
-    }
-    op->state->ibf_buckets_received = 0;
-    if (0 != ntohl (msg->offset))
-    {
-      GNUNET_break_op (0);
-      fail_union_operation (op);
-      return GNUNET_SYSERR;
-    }
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
-  else if (op->state->phase == PHASE_EXPECT_IBF_CONT)
+  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * 
IBF_BUCKET_SIZE)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+  if (op->state->phase == PHASE_EXPECT_IBF_CONT)
   {
     if (ntohl (msg->offset) != op->state->ibf_buckets_received)
     {
       GNUNET_break_op (0);
-      fail_union_operation (op);
       return GNUNET_SYSERR;
     }
     if (1<<msg->order != op->state->remote_ibf->size)
     {
       GNUNET_break_op (0);
-      fail_union_operation (op);
       return GNUNET_SYSERR;
     }
     if (ntohl (msg->salt) != op->state->salt_receive)
     {
       GNUNET_break_op (0);
-      fail_union_operation (op);
       return GNUNET_SYSERR;
     }
   }
-  else
+  else if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
+            (op->state->phase != PHASE_EXPECT_IBF) )
   {
-    GNUNET_assert (0);
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
 
-  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / 
IBF_BUCKET_SIZE;
+  return GNUNET_OK;
+}
 
-  if (0 == buckets_in_message)
+
+/**
+ * Handle an IBF message from a remote peer.
+ *
+ * Reassemble the IBF from multiple pieces, and
+ * process the whole IBF once possible.
+ *
+ * @param cls the union operation
+ * @param msg the header of the message
+ */
+void
+handle_union_p2p_ibf (void *cls,
+                      const struct IBFMessage *msg)
+{
+  struct Operation *op = cls;
+  unsigned int buckets_in_message;
+
+  buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / 
IBF_BUCKET_SIZE;
+  if ( (op->state->phase == PHASE_INVENTORY_PASSIVE) ||
+       (op->state->phase == PHASE_EXPECT_IBF) )
   {
-    GNUNET_break_op (0);
-    fail_union_operation (op);
-    return GNUNET_SYSERR;
+    op->state->phase = PHASE_EXPECT_IBF_CONT;
+    GNUNET_assert (NULL == op->state->remote_ibf);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Creating new ibf of size %u\n",
+         1 << msg->order);
+    op->state->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+    op->state->salt_receive = ntohl (msg->salt);
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Receiving new IBF with salt %u\n",
+         op->state->salt_receive);
+    if (NULL == op->state->remote_ibf)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Failed to parse remote IBF, closing connection\n");
+      fail_union_operation (op);
+      return;
+    }
+    op->state->ibf_buckets_received = 0;
+    if (0 != ntohl (msg->offset))
+    {
+      GNUNET_break_op (0);
+      fail_union_operation (op);
+      return;
+    }
   }
-
-  if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * 
IBF_BUCKET_SIZE)
+  else
   {
-    GNUNET_break_op (0);
-    fail_union_operation (op);
-    return GNUNET_SYSERR;
+    GNUNET_assert (op->state->phase == PHASE_EXPECT_IBF_CONT);
   }
-
   GNUNET_assert (NULL != op->state->remote_ibf);
 
   ibf_read_slice (&msg[1],
@@ -1276,10 +1309,11 @@ handle_p2p_ibf (void *cls,
       /* Internal error, best we can do is shut down */
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Failed to decode IBF, closing connection\n");
-      return GNUNET_SYSERR;
+      fail_union_operation (op);
+      return;
     }
   }
-  return GNUNET_OK;
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1343,6 +1377,11 @@ send_done_and_destroy (void *cls)
 }
 
 
+/**
+ * Tests if the operation is finished, and if so notify.
+ *
+ * @param op operation to check
+ */
 static void
 maybe_finish (struct Operation *op)
 {
@@ -1382,46 +1421,59 @@ maybe_finish (struct Operation *op)
 
 
 /**
- * Handle an element message from a remote peer.
- * Sent by the other peer either because we decoded an IBF and placed a demand,
- * or because the other peer switched to full set transmission.
+ * Check an element message from a remote peer.
  *
  * @param cls the union operation
- * @param mh the message
+ * @param emsg the message
  */
-static void
-handle_p2p_elements (void *cls,
-                     const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_elements (void *cls,
+                          const struct GNUNET_SET_ElementMessage *emsg)
 {
   struct Operation *op = cls;
-  struct ElementEntry *ee;
-  const struct GNUNET_SET_ElementMessage *emsg;
-  uint16_t element_size;
 
-  if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
+  if (OT_UNION != op->type)
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
-  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+  if (0 == GNUNET_CONTAINER_multihashmap_size (op->state->demanded_hashes))
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
+}
+
 
-  emsg = (const struct GNUNET_SET_ElementMessage *) mh;
+/**
+ * Handle an element message from a remote peer.
+ * Sent by the other peer either because we decoded an IBF and placed a demand,
+ * or because the other peer switched to full set transmission.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+void
+handle_union_p2p_elements (void *cls,
+                           const struct GNUNET_SET_ElementMessage *emsg)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  struct KeyEntry *ke;
+  uint16_t element_size;
 
-  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+  element_size = ntohs (emsg->header.size) - sizeof (struct 
GNUNET_SET_ElementMessage);
   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
-  GNUNET_memcpy (&ee[1], &emsg[1], element_size);
+  GNUNET_memcpy (&ee[1],
+                 &emsg[1],
+                 element_size);
   ee->element.size = element_size;
   ee->element.data = &ee[1];
   ee->element.element_type = ntohs (emsg->element_type);
   ee->remote = GNUNET_YES;
-  GNUNET_SET_element_hash (&ee->element, &ee->element_hash);
-
+  GNUNET_SET_element_hash (&ee->element,
+                           &ee->element_hash);
   if (GNUNET_NO ==
       GNUNET_CONTAINER_multihashmap_remove (op->state->demanded_hashes,
                                             &ee->element_hash,
@@ -1429,7 +1481,6 @@ handle_p2p_elements (void *cls,
   {
     /* We got something we didn't demand, since it's not in our map. */
     GNUNET_break_op (0);
-    GNUNET_free (ee);
     fail_union_operation (op);
     return;
   }
@@ -1448,10 +1499,9 @@ handle_p2p_elements (void *cls,
                             1,
                             GNUNET_NO);
 
-  op->state->received_total += 1;
-
-  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+  op->state->received_total++;
 
+  ke = op_get_element (op, &ee->element_hash);
   if (NULL != ke)
   {
     /* Got repeated element.  Should not happen since
@@ -1467,7 +1517,7 @@ handle_p2p_elements (void *cls,
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Registering new element from remote peer\n");
-    op->state->received_fresh += 1;
+    op->state->received_fresh++;
     op_register_element (op, ee, GNUNET_YES);
     /* only send results immediately if the client wants it */
     switch (op->spec->result_mode)
@@ -1485,43 +1535,57 @@ handle_p2p_elements (void *cls,
     }
   }
 
-  if (op->state->received_total > 8 && op->state->received_fresh < 
op->state->received_total / 3)
+  if ( (op->state->received_total > 8) &&
+       (op->state->received_fresh < op->state->received_total / 3) )
   {
     /* The other peer gave us lots of old elements, there's something wrong. */
     GNUNET_break_op (0);
     fail_union_operation (op);
     return;
   }
-
+  GNUNET_CADET_receive_done (op->channel);
   maybe_finish (op);
 }
 
 
 /**
- * Handle an element message from a remote peer.
+ * Check a full element message from a remote peer.
  *
  * @param cls the union operation
- * @param mh the message
+ * @param emsg the message
  */
-static void
-handle_p2p_full_element (void *cls,
-                         const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_full_element (void *cls,
+                              const struct GNUNET_SET_ElementMessage *emsg)
 {
   struct Operation *op = cls;
-  struct ElementEntry *ee;
-  const struct GNUNET_SET_ElementMessage *emsg;
-  uint16_t element_size;
 
-  if (ntohs (mh->size) < sizeof (struct GNUNET_SET_ElementMessage))
+  if (OT_UNION != op->type)
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  // FIXME: check that we expect full elements here?
+  return GNUNET_OK;
+}
 
-  emsg = (const struct GNUNET_SET_ElementMessage *) mh;
 
-  element_size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ElementMessage);
+/**
+ * Handle an element message from a remote peer.
+ *
+ * @param cls the union operation
+ * @param emsg the message
+ */
+void
+handle_union_p2p_full_element (void *cls,
+                               const struct GNUNET_SET_ElementMessage *emsg)
+{
+  struct Operation *op = cls;
+  struct ElementEntry *ee;
+  struct KeyEntry *ke;
+  uint16_t element_size;
+
+  element_size = ntohs (emsg->header.size) - sizeof (struct 
GNUNET_SET_ElementMessage);
   ee = GNUNET_malloc (sizeof (struct ElementEntry) + element_size);
   GNUNET_memcpy (&ee[1], &emsg[1], element_size);
   ee->element.size = element_size;
@@ -1544,10 +1608,9 @@ handle_p2p_full_element (void *cls,
                             1,
                             GNUNET_NO);
 
-  op->state->received_total += 1;
-
-  struct KeyEntry *ke = op_get_element (op, &ee->element_hash);
+  op->state->received_total++;
 
+  ke = op_get_element (op, &ee->element_hash);
   if (NULL != ke)
   {
     /* Got repeated element.  Should not happen since
@@ -1563,7 +1626,7 @@ handle_p2p_full_element (void *cls,
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Registering new element from remote peer\n");
-    op->state->received_fresh += 1;
+    op->state->received_fresh++;
     op_register_element (op, ee, GNUNET_YES);
     /* only send results immediately if the client wants it */
     switch (op->spec->result_mode)
@@ -1581,8 +1644,8 @@ handle_p2p_full_element (void *cls,
     }
   }
 
-  if ( (GNUNET_YES == op->spec->byzantine) && 
-       (op->state->received_total > 384 + op->state->received_fresh * 4) && 
+  if ( (GNUNET_YES == op->spec->byzantine) &&
+       (op->state->received_total > 384 + op->state->received_fresh * 4) &&
        (op->state->received_fresh < op->state->received_total / 6) )
   {
     /* The other peer gave us lots of old elements, there's something wrong. */
@@ -1594,51 +1657,73 @@ handle_p2p_full_element (void *cls,
     fail_union_operation (op);
     return;
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
+
 /**
  * Send offers (for GNUNET_Hash-es) in response
  * to inquiries (for IBF_Key-s).
  *
  * @param cls the union operation
- * @param mh the message
+ * @param msg the message
  */
-static void
-handle_p2p_inquiry (void *cls,
-                    const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_inquiry (void *cls,
+                         const struct InquiryMessage *msg)
 {
   struct Operation *op = cls;
-  const struct IBF_Key *ibf_key;
   unsigned int num_keys;
-  struct InquiryMessage *msg;
 
-  /* look up elements and send them */
+  if (OT_UNION != op->type)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   if (op->state->phase != PHASE_INVENTORY_PASSIVE)
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
-  num_keys = (ntohs (mh->size) - sizeof (struct InquiryMessage))
-      / sizeof (struct IBF_Key);
-  if ((ntohs (mh->size) - sizeof (struct InquiryMessage))
+  num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
+    / sizeof (struct IBF_Key);
+  if ((ntohs (msg->header.size) - sizeof (struct InquiryMessage))
       != num_keys * sizeof (struct IBF_Key))
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
+}
 
-  msg = (struct InquiryMessage *) mh;
 
+/**
+ * Send offers (for GNUNET_Hash-es) in response
+ * to inquiries (for IBF_Key-s).
+ *
+ * @param cls the union operation
+ * @param msg the message
+ */
+void
+handle_union_p2p_inquiry (void *cls,
+                          const struct InquiryMessage *msg)
+{
+  struct Operation *op = cls;
+  const struct IBF_Key *ibf_key;
+  unsigned int num_keys;
+
+  num_keys = (ntohs (msg->header.size) - sizeof (struct InquiryMessage))
+    / sizeof (struct IBF_Key);
   ibf_key = (const struct IBF_Key *) &msg[1];
   while (0 != num_keys--)
   {
     struct IBF_Key unsalted_key;
+
     unsalt_key (ibf_key, ntohl (msg->salt), &unsalted_key);
     send_offers_for_key (op, unsalted_key);
     ibf_key++;
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1677,27 +1762,34 @@ send_missing_elements_iter (void *cls,
 
 
 /**
- * Handle a 
+ * Handle a request for full set transmission.
  *
  * @parem cls closure, a set union operation
  * @param mh the demand message
  */
-static void
-handle_p2p_request_full (void *cls,
-                         const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_request_full (void *cls,
+                               const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
 
-  if (PHASE_EXPECT_IBF != op->state->phase)
+  if (OT_UNION != op->type)
   {
+    GNUNET_break_op (0);
     fail_union_operation (op);
+    return;
+  }
+  if (PHASE_EXPECT_IBF != op->state->phase)
+  {
     GNUNET_break_op (0);
+    fail_union_operation (op);
     return;
   }
 
   // FIXME: we need to check that our set is larger than the
   // byzantine_lower_bound by some threshold
   send_full_set (op);
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1707,56 +1799,97 @@ handle_p2p_request_full (void *cls,
  * @parem cls closure, a set union operation
  * @param mh the demand message
  */
-static void
-handle_p2p_full_done (void *cls,
-                      const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_full_done (void *cls,
+                            const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
 
-  if (PHASE_EXPECT_IBF == op->state->phase)
+  switch (op->state->phase)
   {
-    struct GNUNET_MQ_Envelope *ev;
-
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, sending elements that other 
peer is missing\n");
+  case PHASE_EXPECT_IBF:
+    {
+      struct GNUNET_MQ_Envelope *ev;
 
-    /* send all the elements that did not come from the remote peer */
-    GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
-                                             &send_missing_elements_iter,
-                                             op);
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "got FULL DONE, sending elements that other peer is missing\n");
 
-    ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
-    GNUNET_MQ_send (op->mq, ev);
-    op->state->phase = PHASE_DONE;
+      /* send all the elements that did not come from the remote peer */
+      GNUNET_CONTAINER_multihashmap32_iterate (op->state->key_to_element,
+                                               &send_missing_elements_iter,
+                                               op);
 
-    /* we now wait until the other peer shuts the tunnel down*/
+      ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE);
+      GNUNET_MQ_send (op->mq, ev);
+      op->state->phase = PHASE_DONE;
+      /* we now wait until the other peer shuts the tunnel down*/
+    }
+    break;
+  case PHASE_FULL_SENDING:
+    {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "got FULL DONE, finishing\n");
+      /* We sent the full set, and got the response for that.  We're done. */
+      op->state->phase = PHASE_DONE;
+      GNUNET_CADET_receive_done (op->channel);
+      send_done_and_destroy (op);
+      return;
+    }
+    break;
+  default:
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Handle full done phase is %u\n",
+                (unsigned) op->state->phase);
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
   }
-  else if (PHASE_FULL_SENDING == op->state->phase)
+  GNUNET_CADET_receive_done (op->channel);
+}
+
+
+/**
+ * Check a demand by the other peer for elements based on a list
+ * of `struct GNUNET_HashCode`s.
+ *
+ * @parem cls closure, a set union operation
+ * @param mh the demand message
+ * @return #GNUNET_OK if @a mh is well-formed
+ */
+int
+check_union_p2p_demand (void *cls,
+                        const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  unsigned int num_hashes;
+
+  if (OT_UNION != op->type)
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "got FULL DONE, finishing\n");
-    /* We sent the full set, and got the response for that.  We're done. */
-    op->state->phase = PHASE_DONE;
-    send_done_and_destroy (op);
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
   }
-  else
+  num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+    / sizeof (struct GNUNET_HashCode);
+  if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+      != num_hashes * sizeof (struct GNUNET_HashCode))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handle full done phase is %u\n", 
(unsigned) op->state->phase);
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
 }
 
 
 /**
  * Handle a demand by the other peer for elements based on a list
- * of GNUNET_HashCode-s.
+ * of `struct GNUNET_HashCode`s.
  *
  * @parem cls closure, a set union operation
  * @param mh the demand message
  */
-static void
-handle_p2p_demand (void *cls,
-                   const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_demand (void *cls,
+                         const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
   struct ElementEntry *ee;
@@ -1767,19 +1900,12 @@ handle_p2p_demand (void *cls,
 
   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
     / sizeof (struct GNUNET_HashCode);
-  if ((ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
-      != num_hashes * sizeof (struct GNUNET_HashCode))
-  {
-    GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
-  }
-
   for (hash = (const struct GNUNET_HashCode *) &mh[1];
        num_hashes > 0;
        hash++, num_hashes--)
   {
-    ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements, 
hash);
+    ee = GNUNET_CONTAINER_multihashmap_get (op->spec->set->content->elements,
+                                            hash);
     if (NULL == ee)
     {
       /* Demand for non-existing element. */
@@ -1823,31 +1949,35 @@ handle_p2p_demand (void *cls,
         break;
     }
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
 /**
- * Handle offers (of GNUNET_HashCode-s) and
- * respond with demands (of GNUNET_HashCode-s).
+ * Check offer (of `struct GNUNET_HashCode`s).
  *
  * @param cls the union operation
  * @param mh the message
+ * @return #GNUNET_OK if @a mh is well-formed
  */
-static void
-handle_p2p_offer (void *cls,
-                    const struct GNUNET_MessageHeader *mh)
+int
+check_union_p2p_offer (void *cls,
+                        const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
-  const struct GNUNET_HashCode *hash;
   unsigned int num_hashes;
 
+  if (OT_UNION != op->type)
+  {
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
   /* look up elements and send them */
   if ( (op->state->phase != PHASE_INVENTORY_PASSIVE) &&
        (op->state->phase != PHASE_INVENTORY_ACTIVE))
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
   num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
     / sizeof (struct GNUNET_HashCode);
@@ -1855,10 +1985,29 @@ handle_p2p_offer (void *cls,
       != num_hashes * sizeof (struct GNUNET_HashCode))
   {
     GNUNET_break_op (0);
-    fail_union_operation (op);
-    return;
+    return GNUNET_SYSERR;
   }
+  return GNUNET_OK;
+}
 
+
+/**
+ * Handle offers (of `struct GNUNET_HashCode`s) and
+ * respond with demands (of `struct GNUNET_HashCode`s).
+ *
+ * @param cls the union operation
+ * @param mh the message
+ */
+void
+handle_union_p2p_offer (void *cls,
+                        const struct GNUNET_MessageHeader *mh)
+{
+  struct Operation *op = cls;
+  const struct GNUNET_HashCode *hash;
+  unsigned int num_hashes;
+
+  num_hashes = (ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader))
+    / sizeof (struct GNUNET_HashCode);
   for (hash = (const struct GNUNET_HashCode *) &mh[1];
        num_hashes > 0;
        hash++, num_hashes--)
@@ -1897,6 +2046,7 @@ handle_p2p_offer (void *cls,
     *(struct GNUNET_HashCode *) &demands[1] = *hash;
     GNUNET_MQ_send (op->mq, ev);
   }
+  GNUNET_CADET_receive_done (op->channel);
 }
 
 
@@ -1906,16 +2056,22 @@ handle_p2p_offer (void *cls,
  * @param cls the union operation
  * @param mh the message
  */
-static void
-handle_p2p_done (void *cls,
-                 const struct GNUNET_MessageHeader *mh)
+void
+handle_union_p2p_done (void *cls,
+                       const struct GNUNET_MessageHeader *mh)
 {
   struct Operation *op = cls;
 
-  if (op->state->phase == PHASE_INVENTORY_PASSIVE)
+  if (OT_UNION != op->type)
   {
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
+  }
+  switch (op->state->phase)
+  {
+  case PHASE_INVENTORY_PASSIVE:
     /* We got all requests, but still have to send our elements in response. */
-
     op->state->phase = PHASE_FINISH_WAITING;
 
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1929,11 +2085,10 @@ handle_p2p_done (void *cls,
      * all our demands are satisfied, so that the active
      * peer can quit if we gave him everything.
      */
+    GNUNET_CADET_receive_done (op->channel);
     maybe_finish (op);
     return;
-  }
-  if (op->state->phase == PHASE_INVENTORY_ACTIVE)
-  {
+  case PHASE_INVENTORY_ACTIVE:
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "got DONE (as active partner), waiting to finish\n");
     /* All demands of the other peer are satisfied,
@@ -1944,11 +2099,14 @@ handle_p2p_done (void *cls,
      * to the other peer once our demands are met.
      */
     op->state->phase = PHASE_FINISH_CLOSING;
+    GNUNET_CADET_receive_done (op->channel);
     maybe_finish (op);
     return;
+  default:
+    GNUNET_break_op (0);
+    fail_union_operation (op);
+    return;
   }
-  GNUNET_break_op (0);
-  fail_union_operation (op);
 }
 
 
@@ -2119,62 +2277,6 @@ union_set_destroy (struct SetState *set_state)
 
 
 /**
- * Dispatch messages for a union operation.
- *
- * @param op the state of the union evaluate operation
- * @param mh the received message
- * @return #GNUNET_SYSERR if the tunnel should be disconnected,
- *         #GNUNET_OK otherwise
- */
-int
-union_handle_p2p_message (struct Operation *op,
-                          const struct GNUNET_MessageHeader *mh)
-{
-  //LOG (GNUNET_ERROR_TYPE_DEBUG,
-  //            "received p2p message (t: %u, s: %u)\n",
-  //            ntohs (mh->type),
-  //            ntohs (mh->size));
-  switch (ntohs (mh->type))
-  {
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF:
-      return handle_p2p_ibf (op, mh);
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE:
-      return handle_p2p_strata_estimator (op, mh, GNUNET_NO);
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SEC:
-      return handle_p2p_strata_estimator (op, mh, GNUNET_YES);
-    case GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS:
-      handle_p2p_elements (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_ELEMENT:
-      handle_p2p_full_element (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_INQUIRY:
-      handle_p2p_inquiry (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DONE:
-      handle_p2p_done (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_OFFER:
-      handle_p2p_offer (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_DEMAND:
-      handle_p2p_demand (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_FULL_DONE:
-      handle_p2p_full_done (op, mh);
-      break;
-    case GNUNET_MESSAGE_TYPE_SET_UNION_P2P_REQUEST_FULL:
-      handle_p2p_request_full (op, mh);
-      break;
-    default:
-      /* Something wrong with cadet's message handlers? */
-      GNUNET_assert (0);
-  }
-  return GNUNET_OK;
-}
-
-
-/**
  * Handler for peer-disconnects, notifies the client
  * about the aborted operation in case the op was not concluded.
  *
@@ -2240,7 +2342,6 @@ _GSS_union_vt ()
 {
   static const struct SetVT union_vt = {
     .create = &union_set_create,
-    .msg_handler = &union_handle_p2p_message,
     .add = &union_add,
     .remove = &union_remove,
     .destroy_set = &union_set_destroy,
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 04a4e4910..bc428f9f6 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -76,6 +76,8 @@ struct GNUNET_SET_Handle
 
   /**
    * Should the set be destroyed once all operations are gone?
+   * #GNUNET_SYSERR if #GNUNET_SET_destroy() must raise this flag,
+   * #GNUNET_YES if #GNUNET_SET_destroy() did raise this flag.
    */
   int destroy_requested;
 
@@ -345,11 +347,13 @@ handle_iter_done (void *cls,
 
   if (NULL == iter)
     return;
+  set->destroy_requested = GNUNET_SYSERR;
   set->iterator = NULL;
   set->iteration_id++;
   iter (set->iterator_cls,
         NULL);
-
+  if (GNUNET_SYSERR == set->destroy_requested)
+    set->destroy_requested = GNUNET_NO;
   if (GNUNET_YES == set->destroy_requested)
     GNUNET_SET_destroy (set);
 }
@@ -736,7 +740,9 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
   /* destroying set while iterator is active is currently
      not supported; we should expand the API to allow
      clients to explicitly cancel the iteration! */
-  if ( (NULL != set->ops_head) || (NULL != set->iterator) )
+  if ( (NULL != set->ops_head) ||
+       (NULL != set->iterator) ||
+       (GNUNET_SYSERR == set->destroy_requested) )
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Set operations are pending, delaying set destruction\n");
@@ -809,7 +815,7 @@ GNUNET_SET_prepare (const struct GNUNET_PeerIdentity 
*other_peer,
         msg->force_delta = GNUNET_YES;
         break;
       default:
-        LOG (GNUNET_ERROR_TYPE_ERROR, 
+        LOG (GNUNET_ERROR_TYPE_ERROR,
              "Option with type %d not recognized\n", (int) opt->type);
     }
   }
diff --git a/src/set/test_set_union_copy.c b/src/set/test_set_union_copy.c
index c887a8958..a1eba6311 100644
--- a/src/set/test_set_union_copy.c
+++ b/src/set/test_set_union_copy.c
@@ -122,6 +122,7 @@ check_count_iter (void *cls,
       return GNUNET_NO;
     }
     ci_cls->cont (ci_cls->cont_cls);
+    GNUNET_free (ci_cls);
     return GNUNET_NO;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]