gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r27860 - in gnunet/src: consensus include set util


From: gnunet
Subject: [GNUnet-SVN] r27860 - in gnunet/src: consensus include set util
Date: Wed, 10 Jul 2013 03:31:13 +0200

Author: dold
Date: 2013-07-10 03:31:13 +0200 (Wed, 10 Jul 2013)
New Revision: 27860

Removed:
   gnunet/src/consensus/gnunet-consensus-start-peers.c
Modified:
   gnunet/src/consensus/Makefile.am
   gnunet/src/consensus/consensus_api.c
   gnunet/src/consensus/gnunet-service-consensus.c
   gnunet/src/include/gnunet_mq_lib.h
   gnunet/src/include/gnunet_set_service.h
   gnunet/src/set/gnunet-service-set.c
   gnunet/src/set/gnunet-service-set.h
   gnunet/src/set/gnunet-service-set_union.c
   gnunet/src/set/gnunet-set-profiler.c
   gnunet/src/set/set.h
   gnunet/src/set/set_api.c
   gnunet/src/util/mq.c
Log:
- set service working
- set profiler


Modified: gnunet/src/consensus/Makefile.am
===================================================================
--- gnunet/src/consensus/Makefile.am    2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/consensus/Makefile.am    2013-07-10 01:31:13 UTC (rev 27860)
@@ -16,8 +16,7 @@
 endif
 
 bin_PROGRAMS = \
- gnunet-consensus \
- gnunet-consensus-start-peers
+ gnunet-consensus
 
 libexec_PROGRAMS = \
  gnunet-service-consensus
@@ -41,17 +40,6 @@
 gnunet_consensus_DEPENDENCIES = \
   libgnunetconsensus.la
 
-gnunet_consensus_start_peers_SOURCES = \
- gnunet-consensus-start-peers.c
-gnunet_consensus_start_peers_LDADD = \
-  $(top_builddir)/src/util/libgnunetutil.la \
-  $(top_builddir)/src/testbed/libgnunettestbed.la \
-  $(top_builddir)/src/consensus/libgnunetconsensus.la \
-  $(GN_LIBINTL)
-gnunet_consensus_start_peers_DEPENDENCIES = \
-  libgnunetconsensus.la
-
-
 gnunet_service_consensus_SOURCES = \
  gnunet-service-consensus.c
 gnunet_service_consensus_LDADD = \

Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c        2013-07-10 01:26:04 UTC (rev 
27859)
+++ gnunet/src/consensus/consensus_api.c        2013-07-10 01:31:13 UTC (rev 
27860)
@@ -205,7 +205,7 @@
 
   consensus->client = GNUNET_CLIENT_connect ("consensus", cfg);
   consensus->mq = GNUNET_MQ_queue_for_connection_client (consensus->client,
-                                                         mq_handlers, 
consensus);
+                                                         mq_handlers, NULL, 
consensus);
 
   GNUNET_assert (consensus->client != NULL);
 

Deleted: gnunet/src/consensus/gnunet-consensus-start-peers.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-07-10 01:26:04 UTC 
(rev 27859)
+++ gnunet/src/consensus/gnunet-consensus-start-peers.c 2013-07-10 01:31:13 UTC 
(rev 27860)
@@ -1,186 +0,0 @@
-
-/*
-      This file is part of GNUnet
-      (C) 2012 Christian Grothoff (and other contributing authors)
-
-      GNUnet is free software; you can redistribute it and/or modify
-      it under the terms of the GNU General Public License as published
-      by the Free Software Foundation; either version 2, or (at your
-      option) any later version.
-
-      GNUnet is distributed in the hope that it will be useful, but
-      WITHOUT ANY WARRANTY; without even the implied warranty of
-      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-      General Public License for more details.
-
-      You should have received a copy of the GNU General Public License
-      along with GNUnet; see the file COPYING.  If not, write to the
-      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
-      Boston, MA 02111-1307, USA.
- */
-
-/**
- * @file consensus/gnunet-consensus-start-peers.c
- * @brief Starts peers with testebed on localhost,
- *        prints their configuration files and waits for ^C.
- * @author Florian Dold
- */
-#include "platform.h"
-#include "gnunet_util_lib.h"
-#include "gnunet_testbed_service.h"
-
-
-static char *config_template_file;
-static unsigned int num_peers_requested = 2;
-static struct GNUNET_TESTBED_Peer **peers;
-
-
-/**
- * Callback to be called when the requested peer information is available
- *
- * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
- * @param op the operation this callback corresponds to
- * @param pinfo the result; will be NULL if the operation has failed
- * @param emsg error message if the operation has failed; will be NULL if the
- *          operation is successfull
- */
-static void
-peer_info_cb (void *cb_cls,
-              struct GNUNET_TESTBED_Operation
-              *op,
-              const struct
-              GNUNET_TESTBED_PeerInformation
-              *pinfo,
-              const char *emsg)
-{
-  GNUNET_assert (NULL == emsg);
-  if (pinfo->pit == GNUNET_TESTBED_PIT_IDENTITY)
-  {
-    struct GNUNET_CRYPTO_HashAsciiEncoded enc;
-    GNUNET_CRYPTO_hash_to_enc (&pinfo->result.id->hashPubKey, &enc);
-    printf("peer %td identity:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls) - 
&peers[0]);
-    printf("%s\n", (char *)&enc);
-  }
-  else if (pinfo->pit == GNUNET_TESTBED_PIT_CONFIGURATION)
-  {
-    char *tmpfilename;
-    if (NULL == (tmpfilename = GNUNET_DISK_mktemp ("gnunet-consensus")))
-    {
-      GNUNET_break (0);
-      GNUNET_SCHEDULER_shutdown ();
-      return;
-    }
-    if (GNUNET_SYSERR == 
-        GNUNET_CONFIGURATION_write (pinfo->result.cfg,
-                                    tmpfilename))
-    {
-      GNUNET_break (0);
-      return;
-    }
-    printf("peer %td config file:\n", ((struct GNUNET_TESTBED_Peer **) cb_cls) 
- &peers[0]);
-    printf("%s\n", tmpfilename);
-  }
-  else
-  {
-    GNUNET_assert (0);
-  }
-}
-
-
-
-/**
- * Signature of the event handler function called by the
- * respective event controller.
- *
- * @param cls closure
- * @param event information about the event
- */
-static void
-controller_cb(void *cls,
-              const struct GNUNET_TESTBED_EventInformation *event)
-{
-  GNUNET_assert (0);
-}
-
-
-
-
-/**
- * Signature of a main function for a testcase.
- *
- * @param cls closure
- * @param num_peers number of peers in 'peers'
- * @param started_peers handle to peers being run in the testbed.  NULL upon
- *          timeout (see GNUNET_TESTBED_test_run()).
- * @param links_succeeded the number of overlay link connection attempts that
- *          succeeded
- * @param links_failed the number of overlay link connection attempts that
- *          failed
- */
-static void
-test_master (void *cls,
-             unsigned int num_peers,
-             struct GNUNET_TESTBED_Peer **started_peers,
-             unsigned int links_succeeded,
-             unsigned int links_failed)
-{
-  int i;
-
-  printf("started %d peers\n", num_peers);
-  peers = started_peers;
-
-  for (i = 0; i < num_peers; i++)
-  {
-    GNUNET_TESTBED_peer_get_information (peers[i],
-                                         GNUNET_TESTBED_PIT_IDENTITY,
-                                         peer_info_cb,
-                                         &peers[i]);
-    GNUNET_TESTBED_peer_get_information (peers[i],
-                                         GNUNET_TESTBED_PIT_CONFIGURATION,
-                                         peer_info_cb,
-                                         &peers[i]);
-  }
-}
-
-
-static void
-run (void *cls, char *const *args, const char *cfgfile,
-     const struct GNUNET_CONFIGURATION_Handle *config)
-{
-  if (NULL == config_template_file)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "no template file specified\n");
-    return;
-  }
-
-  (void) GNUNET_TESTBED_test_run ("gnunet-consensus-start-peers",
-                                  config_template_file,
-                                  num_peers_requested,
-                                  0,
-                                  controller_cb,
-                                  NULL,
-                                  test_master,
-                                  NULL);
-}
-
-
-int
-main (int argc, char **argv)
-{
-  static const struct GNUNET_GETOPT_CommandLineOption options[] = {
-      { 't', "config-template", "TEMPLATE",
-        gettext_noop ("start peers with the given template configuration"),
-        GNUNET_YES, &GNUNET_GETOPT_set_string, &config_template_file },
-      { 'n', "num-peers", "NUM",
-        gettext_noop ("number of peers to start"),
-        GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers_requested },
-      GNUNET_GETOPT_OPTION_END
-   };
-
-  /* run without scheduler, as test_run already does this */
-  GNUNET_PROGRAM_run2 (argc, argv, "gnunet-consensus-start-peers",
-                     "help",
-                     options, &run, NULL, GNUNET_YES);
-  return 0;
-}
-

Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2013-07-10 01:26:04 UTC 
(rev 27859)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2013-07-10 01:31:13 UTC 
(rev 27860)
@@ -266,8 +266,6 @@
 }
 
 
-
-
 /**
  * Destroy a session, free all resources associated with it.
  * 

Modified: gnunet/src/include/gnunet_mq_lib.h
===================================================================
--- gnunet/src/include/gnunet_mq_lib.h  2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/include/gnunet_mq_lib.h  2013-07-10 01:31:13 UTC (rev 27860)
@@ -328,12 +328,14 @@
  *
  * @param connection the client connection
  * @param handlers handlers for receiving messages
+ * @param error_handler error handler
  * @param cls closure for the handlers
  * @return the message queue
  */
 struct GNUNET_MQ_Handle *
 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection 
*connection,
                                        const struct GNUNET_MQ_MessageHandler 
*handlers,
+                                       GNUNET_MQ_ErrorHandler error_handler,
                                        void *cls);
 
 

Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h     2013-07-10 01:26:04 UTC (rev 
27859)
+++ gnunet/src/include/gnunet_set_service.h     2013-07-10 01:31:13 UTC (rev 
27860)
@@ -74,9 +74,16 @@
 enum GNUNET_SET_OperationType
 {
   /**
+   * A purely local set that does not support any
+   * operation.
+   */
+  GNUNET_SET_OPERATION_NONE,
+
+  /**
    * Set intersection, only return elements that are in both sets.
    */
   GNUNET_SET_OPERATION_INTERSECTION,
+
   /**
    * Set union, return all elements that are in at least one of the sets.
    */
@@ -116,6 +123,7 @@
   GNUNET_SET_STATUS_DONE
 };
 
+
 /**
  * The way results are given to the client.
  */
@@ -137,6 +145,7 @@
   GNUNET_SET_RESULT_REMOVED
 };
 
+
 /**
  * Element stored in a set.
  */
@@ -182,18 +191,19 @@
 
 /**
  * Called when another peer wants to do a set operation with the
- * local peer.
+ * local peer. If a listen error occurs, the 'request' is NULL.
  *
  * @param cls closure
  * @param other_peer the other peer
  * @param context_msg message with application specific information from
  *        the other peer
  * @param request request from the other peer, use GNUNET_SET_accept
+ *        Will be NULL if the listener failed.
  *        to accept it, otherwise the request will be refused
- *        Note that we don't use a return value here, as it is also
- *        necessary to specify the set we want to do the operation with,
- *        whith sometimes can be derived from the context message.
- *        Also necessary to specify the timeout.
+ *        Note that we can't just return value from the listen callback,
+ *        as it is also necessary to specify the set we want to do the
+ *        operation with, whith sometimes can be derived from the context
+ *        message. It's necessary to specify the timeout.
  */
 typedef void
 (*GNUNET_SET_ListenCallback) (void *cls,

Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/set/gnunet-service-set.c 2013-07-10 01:31:13 UTC (rev 27860)
@@ -46,11 +46,14 @@
 
   /**
    * Detail information about the operation.
+   * NULL as long as we did not receive the operation
+   * request from the remote peer.
    */
   struct OperationSpecification *spec;
 
   /**
-   * The identity of the requesting peer.
+   * The identity of the requesting peer.  Needs to
+   * be stored here as the op spec might not have been created yet.
    */
   struct GNUNET_PeerIdentity peer;
 
@@ -83,17 +86,55 @@
 
 
 /**
+ * A listener is inhabited by a client, and
+ * waits for evaluation requests from remote peers.
+ */
+struct Listener
+{
+  /**
+   * Listeners are held in a doubly linked list.
+   */
+  struct Listener *next;
+
+  /**
+   * Listeners are held in a doubly linked list.
+   */
+  struct Listener *prev;
+
+  /**
+   * Client that owns the listener.
+   * Only one client may own a listener.
+   */
+  struct GNUNET_SERVER_Client *client;
+
+  /**
+   * Message queue for the client
+   */
+  struct GNUNET_MQ_Handle *client_mq;
+
+  /**
+   * The type of the operation.
+   */
+  enum GNUNET_SET_OperationType operation;
+
+  /**
+   * Application ID for the operation, used to distinguish
+   * multiple operations of the same type with the same peer.
+   */
+  struct GNUNET_HashCode app_id;
+};
+
+
+/**
  * Configuration of our local peer.
- * (Not declared 'static' as also needed in gnunet-service-set_union.c)
  */
-const struct GNUNET_CONFIGURATION_Handle *configuration;
+static const struct GNUNET_CONFIGURATION_Handle *configuration;
 
 /**
  * Handle to the mesh service, used
  * to listen for and connect to remote peers.
- * (Not declared 'static' as also needed in gnunet-service-set_union.c)
  */
-struct GNUNET_MESH_Handle *mesh;
+static struct GNUNET_MESH_Handle *mesh;
 
 /**
  * Sets are held in a doubly linked list.
@@ -204,8 +245,9 @@
    * The client's destroy callback will destroy the listener again. */
   if (NULL != listener->client)
   {
-    GNUNET_SERVER_client_disconnect (listener->client);
+    struct GNUNET_SERVER_Client *client = listener->client;
     listener->client = NULL;
+    GNUNET_SERVER_client_disconnect (client);
     return;
   }
   if (NULL != listener->client_mq)
@@ -230,22 +272,19 @@
    * The client's destroy callback will destroy the set again. */
   if (NULL != set->client)
   {
-    GNUNET_SERVER_client_disconnect (set->client);
+    struct GNUNET_SERVER_Client *client = set->client;
     set->client = NULL;
+    GNUNET_SERVER_client_disconnect (client);
     return;
   }
-  switch (set->operation)
+  if (NULL != set->client_mq)
   {
-    case GNUNET_SET_OPERATION_INTERSECTION:
-      GNUNET_assert (0);
-      break;
-    case GNUNET_SET_OPERATION_UNION:
-      _GSS_union_set_destroy (set);
-      break;
-    default:
-      GNUNET_assert (0);
-      break;
+    GNUNET_MQ_destroy (set->client_mq);
+    set->client_mq = NULL;
   }
+  GNUNET_assert (NULL != set->state);
+  set->vt->destroy_set (set->state);
+  set->state = NULL;
   GNUNET_CONTAINER_DLL_remove (sets_head, sets_tail, set);
   GNUNET_free (set);
 }
@@ -264,6 +303,8 @@
   struct Set *set;
   struct Listener *listener;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, cleaning up\n");
+
   set = set_get (client);
   if (NULL != set)
   {
@@ -287,6 +328,7 @@
 static void
 incoming_destroy (struct Incoming *incoming)
 {
+  GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
   if (NULL != incoming->tunnel)
   {
     struct GNUNET_MESH_Tunnel *t = incoming->tunnel;
@@ -294,7 +336,6 @@
     GNUNET_MESH_tunnel_destroy (t);
     return;
   }
-  GNUNET_CONTAINER_DLL_remove (incoming_head, incoming_tail, incoming);
   GNUNET_free (incoming);
 }
 
@@ -338,7 +379,7 @@
   mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST,
                                  incoming->spec->context_msg);
   GNUNET_assert (NULL != mqm);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "suggesting request with accept id 
%u\n", incoming->suggest_id);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "suggesting request with accept id 
%u\n", incoming->suggest_id);
   cmsg->accept_id = htonl (incoming->suggest_id);
   cmsg->peer_id = incoming->spec->peer;
   GNUNET_MQ_send (listener->client_mq, mqm);
@@ -350,33 +391,28 @@
  * Handle a request for a set operation from
  * another peer.
  *
- * @param cls the incoming socket
- * @param tunnel the tunnel that sent the message
- * @param tunnel_ctx the tunnel context
- * @param mh the message
+ * @param op the operation state
+ * @param mh the received message
+ * @return GNUNET_OK if the tunnel should be kept alive,
+ *         GNUNET_SYSERR to destroy the tunnel
  */
 static int
-handle_p2p_operation_request (void *cls,
-                              struct GNUNET_MESH_Tunnel *tunnel,
-                              void **tunnel_ctx,
-                              const struct GNUNET_MessageHeader *mh)
+handle_incoming_msg (struct OperationState *op,
+                     const struct GNUNET_MessageHeader *mh)
 {
-  struct TunnelContext *tc = *tunnel_ctx;
-  struct Incoming *incoming;
+  struct Incoming *incoming = (struct Incoming *) op;
   const struct OperationRequestMessage *msg = (const struct 
OperationRequestMessage *) mh;
   struct Listener *listener;
   struct OperationSpecification *spec;
 
-  if (CONTEXT_INCOMING != tc->type)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got op request\n");
+
+  if (GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST != ntohs (mh->type))
   {
-    /* unexpected request */
     GNUNET_break_op (0);
-    /* kill the tunnel, cleaner will be called */
     return GNUNET_SYSERR;
   }
 
-  incoming = tc->data.incoming;
-
   if (NULL != incoming->spec)
   {
     /* double operation request */
@@ -385,8 +421,9 @@
   }
 
   spec = GNUNET_new (struct OperationSpecification);
-  spec->context_msg = 
-      GNUNET_copy_message (GNUNET_MQ_extract_nested_mh (msg));
+  spec->context_msg = GNUNET_MQ_extract_nested_mh (msg);
+  if (NULL != spec->context_msg)
+    spec->context_msg = GNUNET_copy_message (spec->context_msg);
   spec->operation = ntohl (msg->operation);
   spec->app_id = msg->app_id;
   spec->salt = ntohl (msg->salt);
@@ -401,12 +438,12 @@
     return GNUNET_SYSERR;
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, 
app %s)\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received P2P operation request (op %u, 
app %s)\n",
               ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
   listener = listener_get_by_target (ntohs (msg->operation), &msg->app_id);
   if (NULL == listener)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "no listener matches incoming request, waiting with 
timeout\n");
     return GNUNET_OK;
   }
@@ -430,7 +467,7 @@
   struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m;
   struct Set *set;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation 
%u)\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client created new set (operation 
%u)\n",
               ntohs (msg->operation));
 
   if (NULL != set_get (client))
@@ -441,14 +478,15 @@
   }
 
   set = NULL;
+  set = GNUNET_new (struct Set);
 
   switch (ntohs (msg->operation))
   {
     case GNUNET_SET_OPERATION_INTERSECTION:
-      //set = _GSS_intersection_set_create ();
+      // FIXME
       break;
     case GNUNET_SET_OPERATION_UNION:
-      set = _GSS_union_set_create ();
+      set->vt = _GSS_union_vt ();
       break;
     default:
       GNUNET_break (0);
@@ -456,8 +494,7 @@
       return;
   }
 
-  GNUNET_assert (NULL != set);
-
+  set->state = set->vt->create ();
   set->client = client;
   set->client_mq = GNUNET_MQ_queue_for_server_client (client);
   GNUNET_CONTAINER_DLL_insert (sets_head, sets_tail, set);
@@ -493,7 +530,7 @@
   listener->app_id = msg->app_id;
   listener->operation = ntohs (msg->operation);
   GNUNET_CONTAINER_DLL_insert_tail (listeners_head, listeners_tail, listener);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new listener created (op %u, app %s)\n",
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new listener created (op %u, app 
%s)\n",
               listener->operation, GNUNET_h2s (&listener->app_id));
   for (incoming = incoming_head; NULL != incoming; incoming = incoming->next)
   {
@@ -511,46 +548,6 @@
 
 
 /**
- * Called when a client wants to remove an element
- * from the set it inhabits.
- *
- * @param cls unused
- * @param client client that sent the message
- * @param m message sent by the client
- */
-static void
-handle_client_remove (void *cls,
-                      struct GNUNET_SERVER_Client *client,
-                      const struct GNUNET_MessageHeader *m)
-{
-  struct Set *set;
-
-  set = set_get (client);
-  if (NULL == set)
-  {
-    GNUNET_break (0);
-    GNUNET_SERVER_client_disconnect (client);
-    return;
-  }
-  switch (set->operation)
-  {
-    case GNUNET_SET_OPERATION_UNION:
-      _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set);
-      break;
-    case GNUNET_SET_OPERATION_INTERSECTION:
-      //_GSS_intersection_remove ((struct GNUNET_SET_ElementMessage *) m, set);
-      break;
-    default:
-      GNUNET_assert (0);
-      break;
-  }
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-
-/**
  * Called when the client wants to reject an operation
  * request from another peer.
  *
@@ -574,13 +571,12 @@
     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "peer request rejected by client\n");
   GNUNET_MESH_tunnel_destroy (incoming->tunnel);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
 
-
 /**
  * Called when a client wants to add an element to a
  * set it inhabits.
@@ -590,11 +586,13 @@
  * @param m message sent by the client
  */
 static void
-handle_client_add (void *cls,
-                   struct GNUNET_SERVER_Client *client,
-                   const struct GNUNET_MessageHeader *m)
+handle_client_add_remove (void *cls,
+                          struct GNUNET_SERVER_Client *client,
+                          const struct GNUNET_MessageHeader *m)
 {
   struct Set *set;
+  const struct GNUNET_SET_ElementMessage *msg;
+  struct GNUNET_SET_Element el;
 
   set = set_get (client);
   if (NULL == set)
@@ -603,19 +601,14 @@
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
-  switch (set->operation)
-  {
-    case GNUNET_SET_OPERATION_UNION:
-      _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set);
-      break;
-    case GNUNET_SET_OPERATION_INTERSECTION:
-      //_GSS_intersection_add ((struct GNUNET_SET_ElementMessage *) m, set);
-      break;
-    default:
-      GNUNET_assert (0);
-      break;
-  }
-
+  msg = (const struct GNUNET_SET_ElementMessage *) m;
+  el.size = ntohs (m->size) - sizeof *msg;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client ins/rem element of size %u\n", 
el.size);
+  el.data = &msg[1];
+  if (GNUNET_MESSAGE_TYPE_SET_REMOVE == ntohs (m->type))
+    set->vt->remove (set->state, &el);
+  else
+    set->vt->add (set->state, &el);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -653,27 +646,15 @@
   spec->app_id = msg->app_id;
   spec->salt = ntohl (msg->salt);
   spec->peer = msg->target_peer;
+  spec->set = set;
+  spec->client_request_id = ntohl (msg->request_id);
 
   tunnel = GNUNET_MESH_tunnel_create (mesh, tc, &msg->target_peer,
                                       GNUNET_APPLICATION_TYPE_SET,
                                       GNUNET_YES,
                                       GNUNET_YES);
 
-  switch (set->operation)
-  {
-    case GNUNET_SET_OPERATION_INTERSECTION:
-      tc->type = CONTEXT_OPERATION_INTERSECTION;
-      //_GSS_intersection_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, 
set);
-      break;
-    case GNUNET_SET_OPERATION_UNION:
-      tc->type = CONTEXT_OPERATION_UNION;
-      tc->data.union_op =
-          _GSS_union_evaluate (spec, tunnel);
-      break;
-    default:
-      GNUNET_assert (0);
-      break;
-  }
+  set->vt->evaluate (spec, tunnel, tc);
 
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
@@ -705,6 +686,35 @@
  * @param mh the message
  */
 static void
+handle_client_cancel (void *cls,
+                      struct GNUNET_SERVER_Client *client,
+                      const struct GNUNET_MessageHeader *mh)
+{
+  const struct GNUNET_SET_CancelMessage *msg =
+      (const struct GNUNET_SET_CancelMessage *) mh;
+  struct Set *set;
+
+  set = set_get (client);
+  if (NULL == set)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVER_client_disconnect (client);
+    return;
+  }
+  /* FIXME: maybe cancel should return success/error code? */
+  set->vt->cancel (set->state, ntohl (msg->request_id));
+}
+
+
+/**
+ * Handle a request from the client to accept
+ * a set operation that came from a remote peer.
+ *
+ * @param cls unused
+ * @param client the client
+ * @param mh the message
+ */
+static void
 handle_client_accept (void *cls,
                       struct GNUNET_SERVER_Client *client,
                       const struct GNUNET_MessageHeader *mh)
@@ -712,12 +722,10 @@
   struct Set *set;
   struct Incoming *incoming;
   struct GNUNET_SET_AcceptRejectMessage *msg = (struct 
GNUNET_SET_AcceptRejectMessage *) mh;
-  struct OperationSpecification *spec;
-  struct TunnelContext *tc;
 
   incoming = get_incoming (ntohl (msg->accept_reject_id));
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl 
(msg->accept_reject_id));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client accepting %u\n", ntohl 
(msg->accept_reject_id));
 
   if (NULL == incoming)
   {
@@ -736,24 +744,9 @@
     return;
   }
 
-  spec = GNUNET_new (struct OperationSpecification);
-  tc = incoming->tc;
-
-  switch (set->operation)
-  {
-    case GNUNET_SET_OPERATION_INTERSECTION:
-      tc->type = CONTEXT_OPERATION_INTERSECTION;
-      // _GSS_intersection_accept (msg, set, incoming);
-      break;
-    case GNUNET_SET_OPERATION_UNION:
-      tc->type = CONTEXT_OPERATION_UNION;
-      tc->data.union_op = _GSS_union_accept (spec, incoming->tunnel);
-      break;
-    default:
-      GNUNET_assert (0);
-      break;
-  }
-
+  incoming->spec->set = set;
+  incoming->spec->client_request_id = ntohl (msg->request_id);
+  set->vt->accept (incoming->spec, incoming->tunnel, incoming->tc);
   /* tunnel ownership goes to operation */
   incoming->tunnel = NULL;
   incoming_destroy (incoming);
@@ -771,12 +764,6 @@
 shutdown_task (void *cls,
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  if (NULL != mesh)
-  {
-    GNUNET_MESH_disconnect (mesh);
-    mesh = NULL;
-  }
-
   while (NULL != incoming_head)
     incoming_destroy (incoming_head);
 
@@ -786,11 +773,19 @@
   while (NULL != sets_head)
     set_destroy (sets_head);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handled shutdown request\n");
+
+  /* it's important to destroy mesh at the end, as tunnels
+   * must be destroyed first! */
+  if (NULL != mesh)
+  {
+    GNUNET_MESH_disconnect (mesh);
+    mesh = NULL;
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "handled shutdown request\n");
 }
 
 
-
 /**
  * Signature of the main function of a task.
  *
@@ -803,11 +798,22 @@
 {
   struct Incoming *incoming = cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "remote peer timed out\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "remote peer timed out\n");
   incoming_destroy (incoming);
 }
 
 
+static void
+handle_incoming_disconnect (struct OperationState *op_state)
+{
+  struct Incoming *incoming = (struct Incoming *) op_state;
+  if (NULL == incoming->tunnel)
+    return;
+
+  incoming_destroy (incoming);
+}
+
+
 /**
  * Method called whenever another peer has added us to a tunnel
  * the other peer initiated.
@@ -830,23 +836,25 @@
                uint32_t port)
 {
   struct Incoming *incoming;
-  struct TunnelContext *tc;
+  static const struct SetVT incoming_vt = {
+    .msg_handler = handle_incoming_msg,
+    .peer_disconnect = handle_incoming_disconnect
+  };
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "new incoming tunnel\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "new incoming tunnel\n");
 
   GNUNET_assert (port == GNUNET_APPLICATION_TYPE_SET);
-  tc = GNUNET_new (struct TunnelContext);
   incoming = GNUNET_new (struct Incoming);
   incoming->peer = *initiator;
   incoming->tunnel = tunnel;
-  incoming->tc = tc;
-  tc->data.incoming = incoming;
-  tc->type = CONTEXT_INCOMING;
+  incoming->tc = GNUNET_new (struct TunnelContext);;
+  incoming->tc->vt = &incoming_vt;
+  incoming->tc->op = (struct OperationState *) incoming;
   incoming->timeout_task = 
       GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, 
incoming_timeout_cb, incoming);
   GNUNET_CONTAINER_DLL_insert_tail (incoming_head, incoming_tail, incoming);
 
-  return tc;
+  return incoming->tc;
 }
 
 
@@ -867,27 +875,46 @@
 {
   struct TunnelContext *ctx = tunnel_ctx;
 
-  switch (ctx->type)
-  {
-    case CONTEXT_INCOMING:
-      incoming_destroy (ctx->data.incoming);
-      break;
-    case CONTEXT_OPERATION_UNION:
-      _GSS_union_operation_destroy (ctx->data.union_op);
-      break;
-    case CONTEXT_OPERATION_INTERSECTION:
-      GNUNET_assert (0);
-      /* FIXME: cfuchs */
-      break;
-    default:
-      GNUNET_assert (0);
-  }
-
+  ctx->vt->peer_disconnect (ctx->op);
+  /* mesh will never call us with the context again! */
   GNUNET_free (tunnel_ctx);
 }
 
 
 /**
+ * Functions with this signature are called whenever a message is
+ * received.
+ * 
+ * Each time the function must call GNUNET_MESH_receive_done on the tunnel
+ * in order to receive the next message. This doesn't need to be immediate:
+ * can be delayed if some processing is done on the message.
+ *
+ * @param cls Closure (set from GNUNET_MESH_connect).
+ * @param tunnel Connection to the other end.
+ * @param tunnel_ctx Place to store local state associated with the tunnel.
+ * @param message The actual message.
+ * 
+ * @return GNUNET_OK to keep the tunnel open,
+ *         GNUNET_SYSERR to close it (signal serious error).
+ */
+static int
+dispatch_p2p_message (void *cls,
+                      struct GNUNET_MESH_Tunnel *tunnel,
+                      void **tunnel_ctx,
+                      const struct GNUNET_MessageHeader *message)
+{
+  struct TunnelContext *tc = *tunnel_ctx;
+  int ret;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "dispatching mesh message\n");
+  ret = tc->vt->msg_handler (tc->op, message);
+  GNUNET_MESH_receive_done (tunnel);
+
+  return ret;
+}
+
+
+/**
  * Function called by the service's run
  * method to run service-specific setup code.
  *
@@ -900,31 +927,29 @@
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
-    {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
+    {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT,
+        sizeof (struct GNUNET_SET_AcceptRejectMessage)},
     {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
-    {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
-    {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
+    {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
+    {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE,
+        sizeof (struct GNUNET_SET_CreateMessage)},
     {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
-    {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
-    {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0},
-    {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
+    {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN,
+        sizeof (struct GNUNET_SET_ListenMessage)},
+    {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT,
+        sizeof (struct GNUNET_SET_AcceptRejectMessage)},
+    {handle_client_add_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
+    {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE,
+        sizeof (struct GNUNET_SET_CancelMessage)},
     {NULL, NULL, 0, 0}
   };
   static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
-    {handle_p2p_operation_request,
-      GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
-    /* messages for the union operation */
-    {_GSS_union_handle_p2p_message,
-      GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0},
-    {_GSS_union_handle_p2p_message,
-      GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
-    {_GSS_union_handle_p2p_message,
-      GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
-    {_GSS_union_handle_p2p_message,
-      GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
-    {_GSS_union_handle_p2p_message,
-      GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
-    /* FIXME: messages for intersection operation */
+    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
+    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_IBF, 0},
+    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
+    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
+    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
+    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_SE, 0},
     {NULL, 0, 0}
   };
   static const uint32_t mesh_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
@@ -943,7 +968,7 @@
     return;
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "service started\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "started\n");
 }
 
 

Modified: gnunet/src/set/gnunet-service-set.h
===================================================================
--- gnunet/src/set/gnunet-service-set.h 2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/set/gnunet-service-set.h 2013-07-10 01:31:13 UTC (rev 27860)
@@ -38,69 +38,27 @@
 #include "set.h"
 
 
-/* FIXME: cfuchs */
-struct IntersectionState;
-
-
-/* FIXME: cfuchs */
-struct IntersectionOperation;
-
-
 /**
- * Extra state required for set union.
+ * Implementation-specific set state.
+ * Used as opaque pointer, and specified further
+ * in the respective implementation.
  */
-struct UnionState;
+struct SetState;
 
-/**
- * State of a union operation being evaluated.
- */
-struct UnionEvaluateOperation;
 
-
-
 /**
- * A set that supports a specific operation
- * with other peers.
+ * Implementation-specific set operation.
+ * Used as opaque pointer, and specified further
+ * in the respective implementation.
  */
-struct Set
-{
-  /**
-   * Client that owns the set.
-   * Only one client may own a set.
-   */
-  struct GNUNET_SERVER_Client *client;
+struct OperationState;
 
-  /**
-   * Message queue for the client
-   */
-  struct GNUNET_MQ_Handle *client_mq;
 
-  /**
-   * Type of operation supported for this set
-   */
-  uint32_t operation; // use enum from API
+/* forward declarations */
+struct Set;
+struct TunnelContext;
 
-  /**
-   * Sets are held in a doubly linked list.
-   */
-  struct Set *next;
 
-  /**
-   * Sets are held in a doubly linked list.
-   */
-  struct Set *prev;
-
-  /**
-   * Appropriate state for each type of
-   * operation.
-   */
-  union {
-    struct IntersectionState *i;
-    struct UnionState *u;
-  } state;
-};
-
-
 /**
  * Detail information about an operation.
  */
@@ -146,96 +104,169 @@
 
 
 /**
- * A listener is inhabited by a client, and
- * waits for evaluation requests from remote peers.
+ * Signature of functions that create the implementation-specific
+ * state for a set supporting a specific operation.
+ *
+ * @return a set state specific to the supported operation
  */
-struct Listener
+typedef struct SetState *(*CreateImpl) (void);
+
+
+/**
+ * Signature of functions that implement the add/remove functionality
+ * for a set supporting a specific operation.
+ *
+ * @param set implementation-specific set state
+ * @param msg element message from the client
+ */
+typedef void (*AddRemoveImpl) (struct SetState *state, const struct 
GNUNET_SET_Element *element);
+
+
+/**
+ * Signature of functions that handle disconnection
+ * of the remote peer.
+ *
+ * @param op the set operation, contains implementation-specific data
+ */
+typedef void (*PeerDisconnectImpl) (struct OperationState *op);
+
+
+/**
+ * Signature of functions that implement the destruction of the
+ * implementation-specific set state.
+ *
+ * @param state the set state, contains implementation-specific data
+ */
+typedef void (*DestroySetImpl) (struct SetState *state);
+
+
+/**
+ * Signature of functions that implement the creation of set operations
+ * (currently evaluate and accept).
+ *
+ * @param spec specification of the set operation to be created
+ * @param tunnel the tunnel with the other peer
+ * @param tc tunnel context
+ */
+typedef void (*OpCreateImpl) (struct OperationSpecification *spec,
+                              struct GNUNET_MESH_Tunnel *tunnel,
+                              struct TunnelContext *tc);
+
+
+/**
+ * Signature of functions that implement the message handling for
+ * the different set operations.
+ *
+ * @param op operation state
+ * @param msg received message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to
+ *         destroy the operation and the tunnel
+ */
+typedef int (*MsgHandlerImpl) (struct OperationState *op,
+                               const struct GNUNET_MessageHeader *msg);
+
+typedef void (*CancelImpl) (struct SetState *set,
+                            uint32_t request_id);
+
+
+/**
+ * Dispatch table for a specific set operation.
+ * Every set operation has to implement the callback
+ * in this struct.
+ */
+struct SetVT
 {
   /**
-   * Listeners are held in a doubly linked list.
+   * Callback for the set creation.
    */
-  struct Listener *next;
+  CreateImpl create;
 
   /**
-   * Listeners are held in a doubly linked list.
+   * Callback for element insertion
    */
-  struct Listener *prev;
+  AddRemoveImpl add;
 
   /**
-   * Client that owns the listener.
-   * Only one client may own a listener.
+   * Callback for element removal.
    */
-  struct GNUNET_SERVER_Client *client;
+  AddRemoveImpl remove;
 
   /**
-   * Message queue for the client
+   * Callback for accepting a set operation request
    */
-  struct GNUNET_MQ_Handle *client_mq;
+  OpCreateImpl accept;
 
   /**
-   * The type of the operation.
+   * Callback for starting evaluation with a remote peer.
    */
-  enum GNUNET_SET_OperationType operation;
+  OpCreateImpl evaluate;
 
   /**
-   * Application ID for the operation, used to distinguish
-   * multiple operations of the same type with the same peer.
+   * Callback for destruction of the set state.
    */
-  struct GNUNET_HashCode app_id;
-};
+  DestroySetImpl destroy_set;
 
+  /**
+   * Callback for handling operation-specific messages.
+   */
+  MsgHandlerImpl msg_handler;
 
-/**
- * Peer that has connected to us, but is not yet evaluating a set operation.
- * Once the peer has sent a request, and the client has
- * accepted or rejected it, this information will be deleted.
- */
-struct Incoming;
+  /**
+   * Callback for handling the remote peer's
+   * disconnect.
+   */
+  PeerDisconnectImpl peer_disconnect;
 
+  /**
+   * Callback for canceling an operation by
+   * its ID.
+   */
+  CancelImpl cancel;
+};
 
+
 /**
- * Different types a tunnel can be.
+ * A set that supports a specific operation
+ * with other peers.
  */
-enum TunnelContextType 
+struct Set
 {
   /**
-   * Tunnel is waiting for a set request from the tunnel,
-   * or for the ack/nack of the client for a received request.
+   * Client that owns the set.
+   * Only one client may own a set.
    */
-  CONTEXT_INCOMING,
+  struct GNUNET_SERVER_Client *client;
 
   /**
-   * The tunnel performs a union operation.
+   * Message queue for the client
    */
-  CONTEXT_OPERATION_UNION,
+  struct GNUNET_MQ_Handle *client_mq;
 
   /**
-   * The tunnel performs an intersection operation.
+   * Type of operation supported for this set
    */
-  CONTEXT_OPERATION_INTERSECTION,
-};
+  enum GNUNET_SET_OperationType operation;
 
+  /**
+   * Virtual table for this set.
+   * Determined by the operation type of this set.
+   */
+  const struct SetVT *vt;
 
-/**
- * State associated with the tunnel, dependent on
- * tunnel type.
- */
-union TunnelContextData
-{
   /**
-   * Valid for tag 'CONTEXT_INCOMING'
+   * Sets are held in a doubly linked list.
    */
-  struct Incoming *incoming;
+  struct Set *next;
 
   /**
-   * Valid for tag 'CONTEXT_OPERATION_UNION'
+   * Sets are held in a doubly linked list.
    */
-  struct UnionEvaluateOperation *union_op;
+  struct Set *prev;
 
   /**
-   * Valid for tag 'CONTEXT_OPERATION_INTERSECTION'
+   * Implementation-specific state.
    */
-  struct IntersectionEvaluateOperation *intersection_op;
+  struct SetState *state;
 };
 
 
@@ -246,119 +277,24 @@
 struct TunnelContext
 {
   /**
-   * Type of the tunnel.
+   * V-Table for the operation belonging
+   * to the tunnel contest.
    */
-  enum TunnelContextType type;
+  const struct SetVT *vt;
 
   /**
-   * State associated with the tunnel, dependent on
-   * tunnel type.
+   * Implementation-specific operation state.
    */
-  union TunnelContextData data;
+  struct OperationState *op;
 };
 
 
-
 /**
- * Configuration of the local peer.
+ * Get the table with implementing functions for
+ * set union.
  */
-extern const struct GNUNET_CONFIGURATION_Handle *configuration;
+const struct SetVT *
+_GSS_union_vt (void);
 
-/**
- * Handle to the mesh service.
- */
-extern struct GNUNET_MESH_Handle *mesh;
 
-
-/**
- * Create a new set supporting the union operation
- *
- * @return the newly created set
- */
-struct Set *
-_GSS_union_set_create (void);
-
-
-/**
- * Evaluate a union operation with
- * a remote peer.
- *
- * @param spec specification of the operation the evaluate
- * @param tunnel tunnel already connected to the partner peer
- * @return a handle to the operation
- */
-struct UnionEvaluateOperation *
-_GSS_union_evaluate (struct OperationSpecification *spec,
-                     struct GNUNET_MESH_Tunnel *tunnel);
-
-
-/**
- * Add the element from the given element message to the set.
- *
- * @param m message with the element
- * @param set set to add the element to
- */
-void
-_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set);
-
-
-/**
- * Remove the element given in the element message from the set.
- * Only marks the element as removed, so that older set operations can still 
exchange it.
- *
- * @param m message with the element
- * @param set set to remove the element from
- */
-void
-_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set);
-
-
-/**
- * Destroy a set that supports the union operation
- *
- * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
- */
-void
-_GSS_union_set_destroy (struct Set *set);
-
-
-/**
- * Accept an union operation request from a remote peer
- *
- * @param spec all necessary information about the operation
- * @param tunnel open tunnel to the partner's peer
- * @return operation
- */
-struct UnionEvaluateOperation *
-_GSS_union_accept (struct OperationSpecification *spec,
-                   struct GNUNET_MESH_Tunnel *tunnel);
-
-
-/**
- * Destroy a union operation, and free all resources
- * associated with it.
- *
- * @param eo the union operation to destroy
- */
-void
-_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo);
-
-
-/**
- * Dispatch messages for a union operation.
- *
- * @param cls closure
- * @param tunnel mesh tunnel
- * @param tunnel_ctx tunnel context
- * @param mh message to process
- * @return GNUNET_SYSERR if the tunnel should be disconnected,
- *         GNUNET_OK otherwise
- */
-int
-_GSS_union_handle_p2p_message (void *cls,
-                               struct GNUNET_MESH_Tunnel *tunnel,
-                               void **tunnel_ctx,
-                               const struct GNUNET_MessageHeader *mh);
-
-
 #endif

Modified: gnunet/src/set/gnunet-service-set_union.c
===================================================================
--- gnunet/src/set/gnunet-service-set_union.c   2013-07-10 01:26:04 UTC (rev 
27859)
+++ gnunet/src/set/gnunet-service-set_union.c   2013-07-10 01:31:13 UTC (rev 
27860)
@@ -61,7 +61,7 @@
 
 
 /**
- * Current phase we are in for a union operation
+ * Current phase we are in for a union operation.
  */
 enum UnionOperationPhase
 {
@@ -100,7 +100,7 @@
  * State of an evaluate operation
  * with another peer.
  */
-struct UnionEvaluateOperation
+struct OperationState
 {
   /**
    * Tunnel to the remote peer.
@@ -154,23 +154,29 @@
    * was created.
    */
   unsigned int generation_created;
+
+  /**
+   * Set state of the set that this operation
+   * belongs to.
+   */
+  struct SetState *set_state;
   
   /**
    * Evaluate operations are held in
    * a linked list.
    */
-  struct UnionEvaluateOperation *next;
+  struct OperationState *next;
   
    /**
-   * Evaluate operations are held in
-   * a linked list.
-   */
-  struct UnionEvaluateOperation *prev;
+    * Evaluate operations are held in
+    * a linked list.
+    */
+  struct OperationState *prev;
 };
 
 
 /**
- * Information about the element in a set.
+ * Information about an element element in the set.
  * All elements are stored in a hash-table
  * from their hash-code to their 'struct Element',
  * so that the remove and add operations are reasonably
@@ -218,7 +224,8 @@
 
 
 /**
- * Entries in the key-to-element map of the union set.
+ * The key entry is used to associate an ibf key with
+ * an element.
  */
 struct KeyEntry
 {
@@ -239,6 +246,7 @@
   struct KeyEntry *next_colliding;
 };
 
+
 /**
  * Used as a closure for sending elements
  * with a specific IBF key.
@@ -255,14 +263,14 @@
    * Operation for which the elements
    * should be sent.
    */
-  struct UnionEvaluateOperation *eo;
+  struct OperationState *eo;
 };
 
 
 /**
  * Extra state required for efficient set union.
  */
-struct UnionState
+struct SetState
 {
   /**
    * The strata estimator is only generated once for
@@ -281,13 +289,13 @@
    * Evaluate operations are held in
    * a linked list.
    */
-  struct UnionEvaluateOperation *ops_head;
+  struct OperationState *ops_head;
 
   /**
    * Evaluate operations are held in
    * a linked list.
    */
-  struct UnionEvaluateOperation *ops_tail;
+  struct OperationState *ops_tail;
 
   /**
    * Current generation, that is, number of
@@ -321,23 +329,6 @@
 
 
 /**
- * Destroy the elements belonging to a union set.
- *
- * @param us union state that contains the elements
- */
-static void
-destroy_elements (struct UnionState *us)
-{
-  if (NULL == us->elements)
-    return;
-  GNUNET_CONTAINER_multihashmap_iterate (us->elements, 
destroy_elements_iterator, NULL);
-  GNUNET_CONTAINER_multihashmap_destroy (us->elements);
-  us->elements = NULL;
-}
-
-
-
-/**
  * Iterator over hash map entries.
  *
  * @param cls closure
@@ -358,6 +349,11 @@
   {
     struct KeyEntry *k_tmp = k;
     k = k->next_colliding;
+    if (GNUNET_YES == k_tmp->element->remote)
+    {
+      GNUNET_free (k_tmp->element);
+      k_tmp->element = NULL;
+    }
     GNUNET_free (k_tmp);
   }
   return GNUNET_YES;
@@ -370,20 +366,24 @@
  *
  * @param eo the union operation to destroy
  */
-void
-_GSS_union_operation_destroy (struct UnionEvaluateOperation *eo)
+static void
+union_operation_destroy (struct OperationState *eo)
 {
-  struct UnionState *st = eo->spec->set->state.u;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
-
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op\n");
+  GNUNET_CONTAINER_DLL_remove (eo->set_state->ops_head,
+                               eo->set_state->ops_tail,
+                               eo);
+  if (NULL != eo->mq)
+  {
+    GNUNET_MQ_destroy (eo->mq);
+    eo->mq = NULL;
+  }
   if (NULL != eo->tunnel)
   {
     struct GNUNET_MESH_Tunnel *t = eo->tunnel;
     eo->tunnel = NULL;
     GNUNET_MESH_tunnel_destroy (t);
   }
-  
   if (NULL != eo->remote_ibf)
   {
     ibf_destroy (eo->remote_ibf);
@@ -405,16 +405,20 @@
     GNUNET_CONTAINER_multihashmap32_destroy (eo->key_to_element);
     eo->key_to_element = NULL;
   }
-
-  GNUNET_CONTAINER_DLL_remove (st->ops_head,
-                               st->ops_tail,
-                               eo);
+  if (NULL != eo->spec)
+  {
+    if (NULL != eo->spec->context_msg)
+    {
+      GNUNET_free (eo->spec->context_msg);
+      eo->spec->context_msg = NULL;
+    }
+    GNUNET_free (eo->spec);
+    eo->spec = NULL;
+  }
   GNUNET_free (eo);
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union op done\n");
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
-
-
   /* FIXME: do a garbage collection of the set generations */
 }
 
@@ -426,7 +430,7 @@
  * @param eo the union operation to fail
  */
 static void
-fail_union_operation (struct UnionEvaluateOperation *eo)
+fail_union_operation (struct OperationState *eo)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *msg;
@@ -434,8 +438,9 @@
   ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
   msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
   msg->request_id = htonl (eo->spec->client_request_id);
+  msg->element_type = htons (0);
   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
-  _GSS_union_operation_destroy (eo);
+  union_operation_destroy (eo);
 }
 
 
@@ -467,13 +472,13 @@
  * @param eo operation with the other peer
  */
 static void
-send_operation_request (struct UnionEvaluateOperation *eo)
+send_operation_request (struct OperationState *eo)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct OperationRequestMessage *msg;
 
   ev = GNUNET_MQ_msg_nested_mh (msg, 
GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST,
-                                 eo->spec->context_msg);
+                                eo->spec->context_msg);
 
   if (NULL == ev)
   {
@@ -484,6 +489,7 @@
   }
   msg->operation = htons (GNUNET_SET_OPERATION_UNION);
   msg->app_id = eo->spec->app_id;
+  msg->salt = htonl (eo->spec->salt);
   GNUNET_MQ_send (eo->mq, ev);
 
   if (NULL != eo->spec->context_msg)
@@ -492,7 +498,7 @@
     eo->spec->context_msg = NULL;
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent op request\n");
 }
 
 
@@ -532,13 +538,13 @@
 
 /**
  * Insert an element into the union operation's
- * key-to-element mapping
+ * key-to-element mapping. Takes ownership of 'ee'.
  *
  * @param eo the union operation
  * @param ee the element entry
  */
 static void
-insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
+insert_element (struct OperationState *eo, struct ElementEntry *ee)
 {
   int ret;
   struct IBF_Key ibf_key;
@@ -595,7 +601,7 @@
                               const struct GNUNET_HashCode *key,
                               void *value)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct OperationState *eo = cls;
   struct ElementEntry *e = value;
 
   /* make sure that the element belongs to the set at the time
@@ -605,6 +611,8 @@
          (e->generation_removed < eo->generation_created)))
     return GNUNET_YES;
 
+  e->remote = GNUNET_NO;
+
   insert_element (eo, e);
   return GNUNET_YES;
 }
@@ -618,15 +626,15 @@
  * @param size size of the ibf to create
  */
 static void
-prepare_ibf (struct UnionEvaluateOperation *eo, uint16_t size)
+prepare_ibf (struct OperationState *eo, uint16_t size)
 {
   if (NULL == eo->key_to_element)
   {
     unsigned int len;
-    len = GNUNET_CONTAINER_multihashmap_size 
(eo->spec->set->state.u->elements);
+    len = GNUNET_CONTAINER_multihashmap_size (eo->set_state->elements);
     eo->key_to_element = GNUNET_CONTAINER_multihashmap32_create (len + 1);
-    GNUNET_CONTAINER_multihashmap_iterate (eo->spec->set->state.u->elements,
-                                             init_key_to_element_iterator, eo);
+    GNUNET_CONTAINER_multihashmap_iterate (eo->set_state->elements,
+                                           init_key_to_element_iterator, eo);
   }
   if (NULL != eo->local_ibf)
     ibf_destroy (eo->local_ibf);
@@ -643,14 +651,14 @@
  * @param ibf_order order of the ibf to send, size=2^order
  */
 static void
-send_ibf (struct UnionEvaluateOperation *eo, uint16_t ibf_order)
+send_ibf (struct OperationState *eo, uint16_t ibf_order)
 {
   unsigned int buckets_sent = 0;
   struct InvertibleBloomFilter *ibf;
 
   prepare_ibf (eo, 1<<ibf_order);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending ibf of size %u\n", 
1<<ibf_order);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending ibf of size %u\n", 
1<<ibf_order);
 
   ibf = eo->local_ibf;
 
@@ -667,11 +675,14 @@
 
     ev = GNUNET_MQ_msg_extra (msg, buckets_in_message * IBF_BUCKET_SIZE,
                                GNUNET_MESSAGE_TYPE_SET_P2P_IBF);
+    msg->reserved = 0;
     msg->order = ibf_order;
     msg->offset = htons (buckets_sent);
     ibf_write_slice (ibf, buckets_sent,
                      buckets_in_message, &msg[1]);
     buckets_sent += buckets_in_message;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ibf chunk size %u, %u/%u sent\n",
+                buckets_in_message, buckets_sent, 1<<ibf_order);
     GNUNET_MQ_send (eo->mq, ev);
   }
 
@@ -685,18 +696,18 @@
  * @param eo the union operation with the remote peer
  */
 static void
-send_strata_estimator (struct UnionEvaluateOperation *eo)
+send_strata_estimator (struct OperationState *eo)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_MessageHeader *strata_msg;
-  struct UnionState *st = eo->spec->set->state.u;
 
   ev = GNUNET_MQ_msg_header_extra (strata_msg,
-                                    SE_STRATA_COUNT * IBF_BUCKET_SIZE * 
SE_IBF_SIZE,
-                                    GNUNET_MESSAGE_TYPE_SET_P2P_SE);
-  strata_estimator_write (st->se, &strata_msg[1]);
+                                   SE_STRATA_COUNT * IBF_BUCKET_SIZE * 
SE_IBF_SIZE,
+                                   GNUNET_MESSAGE_TYPE_SET_P2P_SE);
+  strata_estimator_write (eo->set_state->se, &strata_msg[1]);
   GNUNET_MQ_send (eo->mq, ev);
   eo->phase = PHASE_EXPECT_IBF;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sent SE, expecting IBF\n");
 }
 
 
@@ -730,7 +741,7 @@
 static void
 handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct OperationState *eo = cls;
   struct StrataEstimator *remote_se;
   int diff;
 
@@ -746,7 +757,7 @@
   strata_estimator_read (&mh[1], remote_se);
   GNUNET_assert (NULL != eo->se);
   diff = strata_estimator_difference (remote_se, eo->se);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got se, diff=%d\n", diff);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got se, diff=%d\n", diff);
   strata_estimator_destroy (remote_se);
   strata_estimator_destroy (eo->se);
   eo->se = NULL;
@@ -769,7 +780,7 @@
 {
   struct SendElementClosure *sec = cls;
   struct IBF_Key ibf_key = sec->ibf_key;
-  struct UnionEvaluateOperation *eo = sec->eo;
+  struct OperationState *eo = sec->eo;
   struct KeyEntry *ke = value;
 
   if (ke->ibf_key.key_val != ibf_key.key_val)
@@ -789,7 +800,7 @@
       continue;
     }
     memcpy (&mh[1], element->data, element->size);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending element to client\n");
     GNUNET_MQ_send (eo->mq, ev);
     ke = ke->next_colliding;
   }
@@ -804,7 +815,7 @@
  * @param ibf_key IBF key of interest
  */
 static void
-send_elements_for_key (struct UnionEvaluateOperation *eo, struct IBF_Key 
ibf_key)
+send_elements_for_key (struct OperationState *eo, struct IBF_Key ibf_key)
 {
   struct SendElementClosure send_cls;
 
@@ -822,7 +833,7 @@
  * @param eo union operation
  */
 static void
-decode_and_send (struct UnionEvaluateOperation *eo)
+decode_and_send (struct OperationState *eo)
 {
   struct IBF_Key key;
   int side;
@@ -833,6 +844,9 @@
   prepare_ibf (eo, eo->remote_ibf->size);
   diff_ibf = ibf_dup (eo->local_ibf);
   ibf_subtract (diff_ibf, eo->remote_ibf);
+  
+  ibf_destroy (eo->remote_ibf);
+  eo->remote_ibf = NULL;
 
   while (1)
   {
@@ -864,7 +878,7 @@
     {
       struct GNUNET_MQ_Envelope *ev;
 
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values, sending 
DONE\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmitted all values, sending 
DONE\n");
       ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
       GNUNET_MQ_send (eo->mq, ev);
       break;
@@ -899,7 +913,7 @@
 static void
 handle_p2p_ibf (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct OperationState *eo = cls;
   struct IBFMessage *msg = (struct IBFMessage *) mh;
   unsigned int buckets_in_message;
 
@@ -908,8 +922,9 @@
   {
     eo->phase = PHASE_EXPECT_IBF_CONT;
     GNUNET_assert (NULL == eo->remote_ibf);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "creating new ibf of order %u\n", 
1<<msg->order);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "creating new ibf of size %u\n", 
1<<msg->order);
     eo->remote_ibf = ibf_create (1<<msg->order, SE_IBF_HASH_NUM);
+    eo->ibf_buckets_received = 0;
     if (0 != ntohs (msg->offset))
     {
       GNUNET_break (0);
@@ -929,6 +944,13 @@
 
   buckets_in_message = (ntohs (msg->header.size) - sizeof *msg) / 
IBF_BUCKET_SIZE;
 
+  if (0 == buckets_in_message)
+  {
+    GNUNET_break_op (0);
+    fail_union_operation (eo);
+    return;
+  }
+
   if ((ntohs (msg->header.size) - sizeof *msg) != buckets_in_message * 
IBF_BUCKET_SIZE)
   {
     GNUNET_break (0);
@@ -942,7 +964,7 @@
   if (eo->ibf_buckets_received == eo->remote_ibf->size)
   {
 
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full strata estimator\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "received full ibf\n");
     eo->phase = PHASE_EXPECT_ELEMENTS;
     decode_and_send (eo);
   }
@@ -957,12 +979,13 @@
  * @param element element to send
  */
 static void
-send_client_element (struct UnionEvaluateOperation *eo,
+send_client_element (struct OperationState *eo,
                      struct GNUNET_SET_Element *element)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *rm;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending el of size %u\n", 
element->size);
   GNUNET_assert (0 != eo->spec->client_request_id);
   ev = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
   if (NULL == ev)
@@ -973,6 +996,7 @@
   }
   rm->result_status = htons (GNUNET_SET_STATUS_OK);
   rm->request_id = htonl (eo->spec->client_request_id);
+  rm->element_type = element->type;
   memcpy (&rm[1], element->data, element->size);
   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
 }
@@ -987,7 +1011,7 @@
  * @param eo union operation
  */
 static void
-send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
+send_client_done_and_destroy (struct OperationState *eo)
 {
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_SET_ResultMessage *rm;
@@ -995,6 +1019,7 @@
   ev = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
   rm->request_id = htonl (eo->spec->client_request_id);
   rm->result_status = htons (GNUNET_SET_STATUS_DONE);
+  rm->element_type = htons (0);
   GNUNET_MQ_send (eo->spec->set->client_mq, ev);
 
 }
@@ -1009,11 +1034,11 @@
 static void
 handle_p2p_elements (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct OperationState *eo = cls;
   struct ElementEntry *ee;
   uint16_t element_size;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got element from peer\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got element from peer\n");
 
   if ( (eo->phase != PHASE_EXPECT_ELEMENTS) &&
        (eo->phase != PHASE_EXPECT_ELEMENTS_AND_REQUESTS) )
@@ -1025,13 +1050,12 @@
   element_size = ntohs (mh->size) - sizeof (struct GNUNET_MessageHeader);
   ee = GNUNET_malloc (sizeof *eo + element_size);
   memcpy (&ee[1], &mh[1], element_size);
+  ee->element.size = element_size;
   ee->element.data = &ee[1];
   ee->remote = GNUNET_YES;
 
   insert_element (eo, ee);
   send_client_element (eo, &ee->element);
-
-  GNUNET_free (ee);
 }
 
 
@@ -1044,7 +1068,7 @@
 static void
 handle_p2p_element_requests (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct OperationState *eo = cls;
   struct IBF_Key *ibf_key;
   unsigned int num_keys;
 
@@ -1082,7 +1106,7 @@
 static void
 peer_done_sent_cb (void *cls)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct OperationState *eo = cls;
 
   send_client_done_and_destroy (eo);
 }
@@ -1097,14 +1121,14 @@
 static void
 handle_p2p_done (void *cls, const struct GNUNET_MessageHeader *mh)
 {
-  struct UnionEvaluateOperation *eo = cls;
+  struct OperationState *eo = cls;
 
   if (eo->phase == PHASE_EXPECT_ELEMENTS_AND_REQUESTS)
   {
     /* we got all requests, but still have to send our elements as response */
     struct GNUNET_MQ_Envelope *ev;
 
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got DONE, sending final DONE after 
elements\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got DONE, sending final DONE after 
elements\n");
     eo->phase = PHASE_FINISHED;
     ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_DONE);
     GNUNET_MQ_notify_sent (ev, peer_done_sent_cb, eo);
@@ -1113,7 +1137,7 @@
   }
   if (eo->phase == PHASE_EXPECT_ELEMENTS)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got final DONE\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "got final DONE\n");
     eo->phase = PHASE_FINISHED;
     send_client_done_and_destroy (eo);
     return;
@@ -1129,34 +1153,38 @@
  *
  * @param spec specification of the operation the evaluate
  * @param tunnel tunnel already connected to the partner peer
+ * @param tc tunnel context, passed here so all new incoming
+ *        messages are directly going to the union operations
  * @return a handle to the operation
  */
-struct UnionEvaluateOperation *
-_GSS_union_evaluate (struct OperationSpecification *spec,
-                     struct GNUNET_MESH_Tunnel *tunnel)
+static void
+union_evaluate (struct OperationSpecification *spec,
+                struct GNUNET_MESH_Tunnel *tunnel,
+                struct TunnelContext *tc)
 {
-  struct UnionEvaluateOperation *eo;
-  struct UnionState *st = spec->set->state.u;
+  struct OperationState *eo;
 
-  eo = GNUNET_new (struct UnionEvaluateOperation);
-  eo->se = strata_estimator_dup (spec->set->state.u->se);
+  eo = GNUNET_new (struct OperationState);
+  tc->vt = _GSS_union_vt ();
+  tc->op = eo;
+  eo->se = strata_estimator_dup (spec->set->state->se);
+  eo->set_state = spec->set->state;
   eo->spec = spec;
   eo->tunnel = tunnel;
+  eo->mq = GNUNET_MESH_mq_create (tunnel);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
              "evaluating union operation, (app %s)\n", 
               GNUNET_h2s (&eo->spec->app_id));
 
   /* we started the operation, thus we have to send the operation request */
   eo->phase = PHASE_EXPECT_SE;
 
-  GNUNET_CONTAINER_DLL_insert (st->ops_head,
-                               st->ops_tail,
+  GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
+                               eo->set_state->ops_tail,
                                eo);
 
   send_operation_request (eo);
-
-  return eo;
 }
 
 
@@ -1165,30 +1193,34 @@
  *
  * @param spec all necessary information about the operation
  * @param tunnel open tunnel to the partner's peer
+ * @param tc tunnel context, passed here so all new incoming
+ *        messages are directly going to the union operations
  * @return operation
  */
-struct UnionEvaluateOperation *
-_GSS_union_accept (struct OperationSpecification *spec,
-                   struct GNUNET_MESH_Tunnel *tunnel)
+static void
+union_accept (struct OperationSpecification *spec,
+              struct GNUNET_MESH_Tunnel *tunnel,
+              struct TunnelContext *tc)
 {
-  struct UnionEvaluateOperation *eo;
-  struct UnionState *st = spec->set->state.u;
+  struct OperationState *eo;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "accepting set union operation\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "accepting set union operation\n");
 
-  eo = GNUNET_new (struct UnionEvaluateOperation);
-  eo->generation_created = st->current_generation++;
+  eo = GNUNET_new (struct OperationState);
+  tc->vt = _GSS_union_vt ();
+  tc->op = eo;
+  eo->set_state = spec->set->state;
+  eo->generation_created = eo->set_state->current_generation++;
   eo->spec = spec;
   eo->tunnel = tunnel;
-  eo->se = strata_estimator_dup (st->se);
+  eo->mq = GNUNET_MESH_mq_create (tunnel);
+  eo->se = strata_estimator_dup (eo->set_state->se);
   /* transfer ownership of mq and socket from incoming to eo */
-  GNUNET_CONTAINER_DLL_insert (st->ops_head,
-                               st->ops_tail,
+  GNUNET_CONTAINER_DLL_insert (eo->set_state->ops_head,
+                               eo->set_state->ops_tail,
                                eo);
   /* kick off the operation */
   send_strata_estimator (eo);
-
-  return eo;
 }
 
 
@@ -1197,111 +1229,101 @@
  *
  * @return the newly created set
  */
-struct Set *
-_GSS_union_set_create (void)
+static struct SetState *
+union_set_create (void)
 {
-  struct Set *set;
+  struct SetState *set_state;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "union set created\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "union set created\n");
   
-  set = GNUNET_malloc (sizeof (struct Set) + sizeof (struct UnionState));
-  set->state.u = (struct UnionState *) &set[1];
-  set->operation = GNUNET_SET_OPERATION_UNION;
+  set_state = GNUNET_new (struct SetState);
   /* keys of the hash map are stored in the element entrys, thus we do not
    * want the hash map to copy them */
-  set->state.u->elements = GNUNET_CONTAINER_multihashmap_create (1, 
GNUNET_YES);
-  set->state.u->se = strata_estimator_create (SE_STRATA_COUNT,
+  set_state->elements = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+  set_state->se = strata_estimator_create (SE_STRATA_COUNT,
                                               SE_IBF_SIZE, SE_IBF_HASH_NUM);  
-  return set;
+  return set_state;
 }
 
 
 /**
  * Add the element from the given element message to the set.
  *
- * @param m message with the element
- * @param set set to add the element to
+ * @param set_state state of the set want to add to
+ * @param element the element to add to the set
  */
-void
-_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
+static void
+union_add (struct SetState *set_state, const struct GNUNET_SET_Element 
*element)
 {
   struct ElementEntry *ee;
   struct ElementEntry *ee_dup;
-  uint16_t element_size;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "adding element\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "adding union element of size %u\n", 
element->size);
 
-  GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
-  element_size = ntohs (m->header.size) - sizeof *m;
-  ee = GNUNET_malloc (element_size + sizeof *ee);
-  ee->element.size = element_size;
-  memcpy (&ee[1], &m[1], element_size);
+  ee = GNUNET_malloc (element->size + sizeof *ee);
+  ee->element.size = element->size;
+  memcpy (&ee[1], element->data, element->size);
   ee->element.data = &ee[1];
-  ee->generation_added = set->state.u->current_generation;
-  GNUNET_CRYPTO_hash (ee->element.data, element_size, &ee->element_hash);
-  ee_dup = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, 
&ee->element_hash);
+  ee->generation_added = set_state->current_generation;
+  ee->remote = GNUNET_NO;
+  GNUNET_CRYPTO_hash (ee->element.data, element->size, &ee->element_hash);
+  ee_dup = GNUNET_CONTAINER_multihashmap_get (set_state->elements,
+                                              &ee->element_hash);
   if (NULL != ee_dup)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "element inserted twice, ignoring\n");
     GNUNET_free (ee);
     return;
   }
-  GNUNET_CONTAINER_multihashmap_put (set->state.u->elements, 
&ee->element_hash, ee,
+  GNUNET_CONTAINER_multihashmap_put (set_state->elements, &ee->element_hash, 
ee,
                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-  strata_estimator_insert (set->state.u->se, get_ibf_key (&ee->element_hash, 
0));
+  strata_estimator_insert (set_state->se, get_ibf_key (&ee->element_hash, 0));
 }
 
 
 /**
  * Destroy a set that supports the union operation
  *
- * @param set the set to destroy, must be of type GNUNET_SET_OPERATION_UNION
+ * @param set_state the set to destroy
  */
-void
-_GSS_union_set_destroy (struct Set *set)
+static void
+union_set_destroy (struct SetState *set_state)
 {
-  GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
-  if (NULL != set->client)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "destroying union set\n");
+  /* important to destroy operations before the rest of the set */
+  while (NULL != set_state->ops_head)
+    union_operation_destroy (set_state->ops_head);
+  if (NULL != set_state->se)
   {
-    GNUNET_SERVER_client_drop (set->client);
-    set->client = NULL;
+    strata_estimator_destroy (set_state->se);
+    set_state->se = NULL;
   }
-  if (NULL != set->client_mq)
+  if (NULL != set_state->elements)
   {
-    GNUNET_MQ_destroy (set->client_mq);
-    set->client_mq = NULL;
+    GNUNET_CONTAINER_multihashmap_iterate (set_state->elements,
+                                           destroy_elements_iterator, NULL);
+    GNUNET_CONTAINER_multihashmap_destroy (set_state->elements);
+    set_state->elements = NULL;
   }
 
-  if (NULL != set->state.u->se)
-  {
-    strata_estimator_destroy (set->state.u->se);
-    set->state.u->se = NULL;
-  }
-
-  destroy_elements (set->state.u);
-
-  while (NULL != set->state.u->ops_head)
-  {
-    _GSS_union_operation_destroy (set->state.u->ops_head);
-  }
+  GNUNET_free (set_state);
 }
 
 /**
  * Remove the element given in the element message from the set.
  * Only marks the element as removed, so that older set operations can still 
exchange it.
  *
- * @param m message with the element
- * @param set set to remove the element from
+ * @param set_state state of the set to remove from
+ * @param element set element to remove
  */
-void
-_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
+static void
+union_remove (struct SetState *set_state, const struct GNUNET_SET_Element 
*element)
 {
   struct GNUNET_HashCode hash;
   struct ElementEntry *ee;
 
-  GNUNET_assert (GNUNET_SET_OPERATION_UNION == set->operation);
-  GNUNET_CRYPTO_hash (&m[1], ntohs (m->header.size), &hash);
-  ee = GNUNET_CONTAINER_multihashmap_get (set->state.u->elements, &hash);
+  GNUNET_CRYPTO_hash (element->data, element->size, &hash);
+  ee = GNUNET_CONTAINER_multihashmap_get (set_state->elements, &hash);
   if (NULL == ee)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to remove 
non-existing element\n");
@@ -1313,36 +1335,22 @@
     return;
   }
   ee->removed = GNUNET_YES;
-  ee->generation_removed = set->state.u->current_generation;
+  ee->generation_removed = set_state->current_generation;
 }
 
 
 /**
  * Dispatch messages for a union operation.
  *
- * @param cls closure
- * @param tunnel mesh tunnel
- * @param tunnel_ctx tunnel context
- * @param mh message to process
+ * @param eo the state of the union evaluate operation
+ * @param mh the received message
  * @return GNUNET_SYSERR if the tunnel should be disconnected,
  *         GNUNET_OK otherwise
  */
 int
-_GSS_union_handle_p2p_message (void *cls,
-                               struct GNUNET_MESH_Tunnel *tunnel,
-                               void **tunnel_ctx,
-                               const struct GNUNET_MessageHeader *mh)
+union_handle_p2p_message (struct OperationState *eo,
+                          const struct GNUNET_MessageHeader *mh)
 {
-  struct TunnelContext *tc = *tunnel_ctx;
-  struct UnionEvaluateOperation *eo;
-
-  if (CONTEXT_OPERATION_UNION != tc->type)
-  {
-    return GNUNET_SYSERR;
-  }
-
-  eo = tc->data.union_op;
-
   switch (ntohs (mh->type))
   {
     case GNUNET_MESSAGE_TYPE_SET_P2P_IBF:
@@ -1366,3 +1374,60 @@
   }
   return GNUNET_OK;
 }
+
+
+static void
+union_peer_disconnect (struct OperationState *op)
+{
+  /* Are we already disconnected? */
+  if (NULL == op->tunnel)
+    return;
+  op->tunnel = NULL;
+  if (NULL != op->mq)
+  {
+    GNUNET_MQ_destroy (op->mq);
+    op->mq = NULL;
+  }
+  if (PHASE_FINISHED != op->phase)
+  {
+    struct GNUNET_MQ_Envelope *ev;
+    struct GNUNET_SET_ResultMessage *msg;
+    ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
+    msg->request_id = htonl (op->spec->client_request_id);
+    msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
+    msg->element_type = htons (0);
+    GNUNET_MQ_send (op->spec->set->client_mq, ev);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "other peer disconnected 
prematurely\n");
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "other peer disconnected 
(finished)\n");
+  }
+  union_operation_destroy (op);
+}
+
+
+static void
+union_op_cancel (struct SetState *set_state, uint32_t op_id)
+{
+  /* FIXME: implement */
+}
+
+
+const struct SetVT *
+_GSS_union_vt (void)
+{
+  static const struct SetVT union_vt = {
+    .create = union_set_create,
+    .msg_handler = union_handle_p2p_message,
+    .add = union_add,
+    .remove = union_remove,
+    .destroy_set = union_set_destroy,
+    .evaluate = union_evaluate,
+    .accept = union_accept,
+    .peer_disconnect = union_peer_disconnect,
+    .cancel = union_op_cancel
+  };
+
+  return &union_vt;
+}

Modified: gnunet/src/set/gnunet-set-profiler.c
===================================================================
--- gnunet/src/set/gnunet-set-profiler.c        2013-07-10 01:26:04 UTC (rev 
27859)
+++ gnunet/src/set/gnunet-set-profiler.c        2013-07-10 01:31:13 UTC (rev 
27860)
@@ -16,7 +16,7 @@
       along with GNUnet; see the file COPYING.  If not, write to the
       Free Software Foundation, Inc., 59 Temple Place - Suite 330,
       Boston, MA 02111-1307, USA.
- */
+*/
 
 /**
  * @file set/gnunet-set-profiler.c
@@ -42,41 +42,25 @@
 
 const static struct GNUNET_CONFIGURATION_Handle *config;
 
-struct GNUNET_CONTAINER_MultiHashMap *map_a;
-struct GNUNET_CONTAINER_MultiHashMap *map_b;
-struct GNUNET_CONTAINER_MultiHashMap *map_c;
+struct SetInfo
+{
+  char *id;
+  struct GNUNET_SET_Handle *set;
+  struct GNUNET_SET_OperationHandle *oh;
+  struct GNUNET_CONTAINER_MultiHashMap *sent;
+  struct GNUNET_CONTAINER_MultiHashMap *received;
+  int done;
+} info1, info2;
 
+struct GNUNET_CONTAINER_MultiHashMap *common_sent;
 
-/**
- * Elements that set a received, should match map_c
- * in the end.
- */
-struct GNUNET_CONTAINER_MultiHashMap *map_a_received;
-
-/**
- * Elements that set b received, should match map_c
- * in the end.
- */
-struct GNUNET_CONTAINER_MultiHashMap *map_b_received;
-
-struct GNUNET_SET_Handle *set_a;
-struct GNUNET_SET_Handle *set_b;
-
 struct GNUNET_HashCode app_id;
 
 struct GNUNET_PeerIdentity local_peer;
 
 struct GNUNET_SET_ListenHandle *set_listener;
 
-struct GNUNET_SET_OperationHandle *set_oh1;
-struct GNUNET_SET_OperationHandle *set_oh2;
 
-
-int a_done;
-int b_done;
-
-
-
 static int
 map_remove_iterator (void *cls,
                      const struct GNUNET_HashCode *key,
@@ -85,66 +69,69 @@
   struct GNUNET_CONTAINER_MultiHashMap *m = cls;
   int ret;
 
+  GNUNET_assert (NULL != key);
+
   ret = GNUNET_CONTAINER_multihashmap_remove (m, key, NULL);
-  GNUNET_assert (GNUNET_OK == ret);
+  if (GNUNET_OK != ret)
+    printf ("spurious element\n");
   return GNUNET_YES;
 
 }
 
-
 static void
-set_result_cb_1 (void *cls,
-                 const struct GNUNET_SET_Element *element,
-                 enum GNUNET_SET_Status status)
+check_all_done (void)
 {
-  GNUNET_assert (GNUNET_NO == a_done);
-  GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
-  switch (status)
-  {
-    case GNUNET_SET_STATUS_DONE:
-    case GNUNET_SET_STATUS_HALF_DONE:
-      a_done = GNUNET_YES;
-      GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, 
map_a_received);
-      GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_a_received));
-      return;
-    case GNUNET_SET_STATUS_FAILURE:
-      GNUNET_assert (0);
-      return;
-    case GNUNET_SET_STATUS_OK:
-      break;
-    default:
-      GNUNET_assert (0);
-  }
-  GNUNET_CONTAINER_multihashmap_put (map_a_received,
-                                     element->data, NULL,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+  if (info1.done == GNUNET_NO || info2.done == GNUNET_NO)
+    return;
+
+  GNUNET_CONTAINER_multihashmap_iterate (info1.received, map_remove_iterator, 
info2.sent);
+  GNUNET_CONTAINER_multihashmap_iterate (info2.received, map_remove_iterator, 
info1.sent);
+
+  printf ("set a: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size 
(info1.sent));
+  printf ("set b: %d missing elements\n", GNUNET_CONTAINER_multihashmap_size 
(info2.sent));
+
+  GNUNET_SCHEDULER_shutdown ();
 }
 
 
 static void
-set_result_cb_2 (void *cls,
+set_result_cb (void *cls,
                  const struct GNUNET_SET_Element *element,
                  enum GNUNET_SET_Status status)
 {
-  GNUNET_assert (GNUNET_NO == b_done);
-  GNUNET_assert (element->size == sizeof (struct GNUNET_HashCode));
+  struct SetInfo *info = cls;
+
+  GNUNET_assert (GNUNET_NO == info->done);
   switch (status)
   {
     case GNUNET_SET_STATUS_DONE:
     case GNUNET_SET_STATUS_HALF_DONE:
-      b_done = GNUNET_YES;
-      GNUNET_CONTAINER_multihashmap_iterate (map_c, map_remove_iterator, 
map_b_received);
-      GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (map_b_received));
+      info->done = GNUNET_YES;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s done\n", info->id);
+      check_all_done ();
+      info->oh = NULL;
       return;
     case GNUNET_SET_STATUS_FAILURE:
-      GNUNET_assert (0);
+      info->oh = NULL;
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "failure\n");
+      GNUNET_SCHEDULER_shutdown ();
       return;
     case GNUNET_SET_STATUS_OK:
       break;
     default:
       GNUNET_assert (0);
   }
-  GNUNET_CONTAINER_multihashmap_put (map_b_received,
+
+  if (element->size != sizeof (struct GNUNET_HashCode))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "wrong element size: %u\n", 
element->size);
+    GNUNET_assert (0);
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set %s: got element (%s)\n",
+              info->id, GNUNET_h2s (element->data));
+  GNUNET_assert (NULL != element->data);
+  GNUNET_CONTAINER_multihashmap_put (info->received,
                                      element->data, NULL,
                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
 }
@@ -156,11 +143,16 @@
                const struct GNUNET_MessageHeader *context_msg,
                struct GNUNET_SET_Request *request)
 {
-  GNUNET_assert (NULL == set_oh2);
+  if (NULL == request)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "listener failed\n");
+    return;
+  }
+  GNUNET_assert (NULL == info2.oh);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set listen cb called\n");
-  set_oh2 = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
-                               set_result_cb_2, NULL);
-  GNUNET_SET_commit (set_oh2, set_b);
+  info2.oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+                               set_result_cb, &info2);
+  GNUNET_SET_commit (info2.oh, info2.set);
 }
 
 
@@ -185,6 +177,37 @@
 
 
 static void
+handle_shutdown (void *cls,
+                 const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  if (NULL != set_listener)
+  {
+    GNUNET_SET_listen_cancel (set_listener);
+    set_listener = NULL;
+  }
+  if (NULL != info1.oh)
+  {
+    GNUNET_SET_operation_cancel (info1.oh);
+    info1.oh = NULL;
+  }
+  if (NULL != info2.oh)
+  {
+    GNUNET_SET_operation_cancel (info2.oh);
+    info2.oh = NULL;
+  }
+  if (NULL != info1.set)
+  {
+    GNUNET_SET_destroy (info1.set);
+    info1.set = NULL;
+  }
+  if (NULL != info2.set)
+  {
+    GNUNET_SET_destroy (info2.set);
+    info2.set = NULL;
+  }
+}
+
+static void
 run (void *cls, char *const *args, const char *cfgfile,
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
@@ -195,63 +218,41 @@
 
   if (GNUNET_OK != GNUNET_CRYPTO_get_host_identity (cfg, &local_peer))
   {
-    GNUNET_assert (0);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "could not retrieve host identity\n");
+    ret = 0;
     return;
   }
+
+  GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, handle_shutdown, 
NULL);
+
+  info1.id = "a";
+  info2.id = "b";
   
-  map_a = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
-  map_b = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
-  map_c = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO);
+  info1.sent = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
+  info2.sent = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
+  common_sent = GNUNET_CONTAINER_multihashmap_create (num_c, GNUNET_NO);
 
+  info1.received = GNUNET_CONTAINER_multihashmap_create (num_a, GNUNET_NO);
+  info2.received = GNUNET_CONTAINER_multihashmap_create (num_b, GNUNET_NO);
+
   for (i = 0; i < num_a; i++)
   {
     GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
-    {
-      i--;
-      continue;
-    }
-    GNUNET_CONTAINER_multihashmap_put (map_a, &hash, &hash,
-                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+    GNUNET_CONTAINER_multihashmap_put (info1.sent, &hash, NULL,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   }
 
   for (i = 0; i < num_b; i++)
   {
     GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
-    {
-      i--;
-      continue;
-    }
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
-    {
-      i--;
-      continue;
-    }
-    GNUNET_CONTAINER_multihashmap_put (map_b, &hash, NULL,
+    GNUNET_CONTAINER_multihashmap_put (info2.sent, &hash, NULL,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   }
 
   for (i = 0; i < num_c; i++)
   {
     GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_a, &hash))
-    {
-      i--;
-      continue;
-    }
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_b, &hash))
-    {
-      i--;
-      continue;
-    }
-    if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (map_c, &hash))
-    {
-      i--;
-      continue;
-    }
-    GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, &hash);
-    GNUNET_CONTAINER_multihashmap_put (map_c, &hash, NULL,
+    GNUNET_CONTAINER_multihashmap_put (common_sent, &hash, NULL,
                                        
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
   }
 
@@ -259,20 +260,22 @@
   app_id = hash;
 
   /* FIXME: also implement intersection etc. */
-  set_a = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
-  set_b = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+  info1.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
+  info2.set = GNUNET_SET_create (config, GNUNET_SET_OPERATION_UNION);
 
-  GNUNET_CONTAINER_multihashmap_iterate (map_a, set_insert_iterator, set_a);
-  GNUNET_CONTAINER_multihashmap_iterate (map_b, set_insert_iterator, set_b);
-  GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_a);
-  GNUNET_CONTAINER_multihashmap_iterate (map_c, set_insert_iterator, set_b);
+  GNUNET_CONTAINER_multihashmap_iterate (info1.sent, set_insert_iterator, 
info1.set);
+  GNUNET_CONTAINER_multihashmap_iterate (info2.sent, set_insert_iterator, 
info2.set);
+  GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator, 
info1.set);
+  GNUNET_CONTAINER_multihashmap_iterate (common_sent, set_insert_iterator, 
info2.set);
 
   set_listener = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
                                     &app_id, set_listen_cb, NULL);
 
-  set_oh1 = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, 
GNUNET_SET_RESULT_ADDED,
-                       set_result_cb_1, NULL);
-  GNUNET_SET_commit (set_oh1, set_a);
+  info1.oh = GNUNET_SET_prepare (&local_peer, &app_id, NULL, salt, 
GNUNET_SET_RESULT_ADDED,
+                       set_result_cb, &info1);
+  GNUNET_SET_commit (info1.oh, info1.set);
+  GNUNET_SET_destroy (info1.set);
+  info1.set = NULL;
 }
 
 

Modified: gnunet/src/set/set.h
===================================================================
--- gnunet/src/set/set.h        2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/set/set.h        2013-07-10 01:31:13 UTC (rev 27860)
@@ -194,6 +194,24 @@
 };
 
 
+/**
+ * Sent to the service by the client
+ * in order to cancel a set operation.
+ */
+struct GNUNET_SET_CancelMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * ID of the request we want to cancel.
+   */
+  uint32_t request_id GNUNET_PACKED;
+};
+
+
 GNUNET_NETWORK_STRUCT_END
 
 #endif

Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c    2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/set/set_api.c    2013-07-10 01:31:13 UTC (rev 27860)
@@ -33,7 +33,6 @@
 
 #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
 
-
 /**
  * Opaque handle to a set.
  */
@@ -42,20 +41,47 @@
   struct GNUNET_CLIENT_Connection *client;
   struct GNUNET_MQ_Handle *mq;
   unsigned int messages_since_ack;
+  struct GNUNET_SET_OperationHandle *ops_head;
+  struct GNUNET_SET_OperationHandle *ops_tail;
+  int destroy_requested;
 };
 
+
 /**
  * Opaque handle to a set operation request from another peer.
  */
 struct GNUNET_SET_Request
 {
+  /**
+   * Id of the request, used to identify the request when
+   * accepting/rejecting it.
+   */
   uint32_t accept_id;
+
+  /**
+   * Has the request been accepted already?
+   * GNUNET_YES/GNUNET_NO
+   */
   int accepted;
 };
 
+
+/**
+ * Handle to an operation.
+ * Only known to the service after commiting
+ * the handle with a set.
+ */
 struct GNUNET_SET_OperationHandle
 {
+  /**
+   * Function to be called when we have a result,
+   * or an error.
+   */
   GNUNET_SET_ResultIterator result_cb;
+
+  /**
+   * Closure for result_cb.
+   */
   void *result_cls;
 
   /**
@@ -80,6 +106,17 @@
    * used to patch the request id into the message when the set is known.
    */
   uint32_t *request_id_addr;
+
+  /**
+   * Handles are kept in a linked list.
+   */
+  struct GNUNET_SET_OperationHandle *prev;
+
+  /**
+   * Handles are kept in a linked list.
+   */
+  struct GNUNET_SET_OperationHandle *next;
+
 };
 
 
@@ -88,9 +125,25 @@
  */
 struct GNUNET_SET_ListenHandle
 {
+  /**
+   * Connection to the service.
+   */
   struct GNUNET_CLIENT_Connection *client;
+
+  /**
+   * Message queue for the client.
+   */
   struct GNUNET_MQ_Handle* mq;
+
+  /**
+   * Function to call on a new incoming request,
+   * or on error.
+   */
   GNUNET_SET_ListenCallback listen_cb;
+
+  /**
+   * Closure for listen_cb.
+   */
   void *listen_cls;
 };
 
@@ -108,11 +161,13 @@
   struct GNUNET_SET_Handle *set = cls;
   struct GNUNET_SET_OperationHandle *oh;
   struct GNUNET_SET_Element e;
+  enum GNUNET_SET_Status result_status;
 
-
   GNUNET_assert (NULL != set);
   GNUNET_assert (NULL != set->mq);
 
+  result_status = ntohs (msg->result_status);
+
   if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
   {
     struct GNUNET_MQ_Envelope *mqm;
@@ -123,11 +178,14 @@
   GNUNET_assert (NULL != oh);
   /* status is not STATUS_OK => there's no attached element,
    * and this is the last result message we get */
-  if (htons (msg->result_status) != GNUNET_SET_STATUS_OK)
+  if (GNUNET_SET_STATUS_OK != result_status)
   {
     GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
+    GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh);
+    if (GNUNET_YES == oh->set->destroy_requested)
+      GNUNET_SET_destroy (oh->set);
     if (NULL != oh->result_cb)
-      oh->result_cb (oh->result_cls, NULL, htons (msg->result_status));
+      oh->result_cb (oh->result_cls, NULL, result_status);
     GNUNET_free (oh);
     return;
   }
@@ -136,7 +194,7 @@
   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
   e.type = msg->element_type;
   if (NULL != oh->result_cb)
-    oh->result_cb (oh->result_cls, &e, htons (msg->result_status));
+    oh->result_cb (oh->result_cls, &e, result_status);
 }
 
 /**
@@ -153,7 +211,7 @@
   struct GNUNET_SET_Request *req;
   struct GNUNET_MessageHeader *context_msg;
 
-  LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "processing operation request\n");
   req = GNUNET_new (struct GNUNET_SET_Request);
   req->accept_id = ntohl (msg->accept_id);
   context_msg = GNUNET_MQ_extract_nested_mh (msg);
@@ -171,16 +229,42 @@
     amsg->accept_reject_id = msg->accept_id;
     GNUNET_MQ_send (lh->mq, mqm);
     GNUNET_free (req);
-    LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG, "rejecting request\n");
   }
 
-  LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "processed op request from service\n");
 
   /* the accept-case is handled in GNUNET_SET_accept,
    * as we have the accept message available there */
 }
 
 
+static void
+handle_client_listener_error (void *cls, enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_SET_ListenHandle *lh = cls;
+
+  lh->listen_cb (lh->listen_cls, NULL, NULL, NULL);
+}
+
+
+static void
+handle_client_set_error (void *cls, enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_SET_Handle *set = cls;
+
+  while (NULL != set->ops_head)
+  {
+    if (NULL != set->ops_head->result_cb)
+      set->ops_head->result_cb (set->ops_head->result_cls, NULL,
+                                GNUNET_SET_STATUS_FAILURE);
+    GNUNET_SET_operation_cancel (set->ops_head);
+  }
+
+  /* FIXME: there should be a set error handler */
+}
+
+
 /**
  * Create an empty set, supporting the specified operation.
  *
@@ -200,15 +284,16 @@
   struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_CreateMessage *msg;
   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
-    {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT},
+    {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0},
     GNUNET_MQ_HANDLERS_END
   };
 
   set = GNUNET_new (struct GNUNET_SET_Handle);
   set->client = GNUNET_CLIENT_connect ("set", cfg);
-  LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n");
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "set client created\n");
   GNUNET_assert (NULL != set->client);
-  set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, 
set);
+  set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers,
+                                                   handle_client_set_error, 
set);
   GNUNET_assert (NULL != set->mq);
   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
   msg->operation = htons (op);
@@ -279,6 +364,11 @@
 void
 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
 {
+  if (NULL != set->ops_head)
+  {
+    set->destroy_requested = GNUNET_YES;
+    return;
+  }
   GNUNET_CLIENT_disconnect (set->client);
   set->client = NULL;
   GNUNET_MQ_destroy (set->mq);
@@ -332,7 +422,6 @@
   return oh;
 }
 
-
 /**
  * Wait for set operation requests for the given application id
  * 
@@ -365,7 +454,8 @@
   lh->listen_cb = listen_cb;
   lh->listen_cls = listen_cls;
   GNUNET_assert (NULL != lh->client);
-  lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers, lh);
+  lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
+                                                  
handle_client_listener_error, lh);
   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
   msg->operation = htons (operation);
   msg->app_id = *app_id;
@@ -413,6 +503,7 @@
   struct GNUNET_SET_OperationHandle *oh;
   struct GNUNET_SET_AcceptRejectMessage *msg;
 
+  GNUNET_assert (NULL != request);
   GNUNET_assert (GNUNET_NO == request->accepted);
   request->accepted = GNUNET_YES;
 
@@ -432,6 +523,9 @@
 
 /**
  * Cancel the given set operation.
+ * We need to send an explicit cancel message, as
+ * all operations communicate with the set's client
+ * handle.
  *
  * @param oh set operation to cancel
  */
@@ -441,17 +535,20 @@
   struct GNUNET_MQ_Envelope *mqm;
   struct GNUNET_SET_OperationHandle *h_assoc;
 
-  if (NULL != oh->set)
-  {
-    h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
-    GNUNET_assert (h_assoc == oh);
-    mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
-    GNUNET_MQ_send (oh->set->mq, mqm);
-  }
+  GNUNET_assert (NULL != oh->set);
 
+  GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh);
+  h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
+  GNUNET_assert (h_assoc == oh);
+  mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
+  GNUNET_MQ_send (oh->set->mq, mqm);
+
   if (NULL != oh->conclude_mqm)
     GNUNET_MQ_discard (oh->conclude_mqm);
 
+  if (GNUNET_YES == oh->set->destroy_requested)
+    GNUNET_SET_destroy (oh->set);
+
   GNUNET_free (oh);
 }
 
@@ -469,14 +566,15 @@
  */
 void
 GNUNET_SET_commit (struct GNUNET_SET_OperationHandle *oh,
-                    struct GNUNET_SET_Handle *set)
+                   struct GNUNET_SET_Handle *set)
 {
   GNUNET_assert (NULL == oh->set);
   GNUNET_assert (NULL != oh->conclude_mqm);
   oh->set = set;
-  oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, oh);
+  GNUNET_CONTAINER_DLL_insert (set->ops_head, set->ops_tail, oh);
+  oh->request_id = GNUNET_MQ_assoc_add (set->mq, oh);
   *oh->request_id_addr = htonl (oh->request_id);
-  GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm);
+  GNUNET_MQ_send (set->mq, oh->conclude_mqm);
   oh->conclude_mqm = NULL;
   oh->request_id_addr = NULL;
 }

Modified: gnunet/src/util/mq.c
===================================================================
--- gnunet/src/util/mq.c        2013-07-10 01:26:04 UTC (rev 27859)
+++ gnunet/src/util/mq.c        2013-07-10 01:31:13 UTC (rev 27860)
@@ -612,6 +612,7 @@
 struct GNUNET_MQ_Handle *
 GNUNET_MQ_queue_for_connection_client (struct GNUNET_CLIENT_Connection 
*connection,
                                        const struct GNUNET_MQ_MessageHandler 
*handlers,
+                                       GNUNET_MQ_ErrorHandler error_handler,
                                        void *cls)
 {
   struct GNUNET_MQ_Handle *mq;
@@ -621,6 +622,7 @@
 
   mq = GNUNET_new (struct GNUNET_MQ_Handle);
   mq->handlers = handlers;
+  mq->error_handler = error_handler;
   mq->handlers_cls = cls;
   state = GNUNET_new (struct ClientConnectionState);
   state->connection = connection;
@@ -708,18 +710,29 @@
 void
 GNUNET_MQ_destroy (struct GNUNET_MQ_Handle *mq)
 {
-  /* FIXME: destroy all pending messages in the queue */
-
   if (NULL != mq->destroy_impl)
   {
     mq->destroy_impl (mq, mq->impl_state);
   }
 
+  while (NULL != mq->envelope_head)
+  {
+    struct GNUNET_MQ_Envelope *ev;
+    ev = mq->envelope_head;
+    GNUNET_MQ_discard (ev);
+    GNUNET_CONTAINER_DLL_remove (mq->envelope_head, mq->envelope_tail, ev);
+  }
+
+  if (NULL != mq->current_envelope)
+  {
+    GNUNET_MQ_discard (mq->current_envelope);
+    mq->current_envelope = NULL;
+  }
+
   GNUNET_free (mq);
 }
 
 
-
 struct GNUNET_MessageHeader *
 GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t 
base_size)
 {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]