gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r34438 - gnunet/src/set


From: gnunet
Subject: [GNUnet-SVN] r34438 - gnunet/src/set
Date: Thu, 27 Nov 2014 14:31:52 +0100

Author: grothoff
Date: 2014-11-27 14:31:52 +0100 (Thu, 27 Nov 2014)
New Revision: 34438

Modified:
   gnunet/src/set/gnunet-service-set.c
   gnunet/src/set/set.h
   gnunet/src/set/set_api.c
Log:
use and respect send_more field in IterAckMessage

Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2014-11-27 11:56:28 UTC (rev 34437)
+++ gnunet/src/set/gnunet-service-set.c 2014-11-27 13:31:52 UTC (rev 34438)
@@ -176,7 +176,6 @@
   for (op = incoming_head; NULL != op; op = op->next)
     if (op->suggest_id == id)
     {
-      // FIXME: remove this assertion once the corresponding bug is gone!
       GNUNET_assert (GNUNET_YES == op->is_incoming);
       return op;
     }
@@ -325,8 +324,8 @@
   GNUNET_assert (GNUNET_NO == op->is_incoming);
   GNUNET_assert (NULL != op->spec);
   set = op->spec->set;
-  GNUNET_CONTAINER_DLL_remove (op->spec->set->ops_head,
-                               op->spec->set->ops_tail,
+  GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                               set->ops_tail,
                                op);
   op->vt->cancel (op);
   op->vt = NULL;
@@ -511,12 +510,12 @@
 listener_get_by_target (enum GNUNET_SET_OperationType op,
                         const struct GNUNET_HashCode *app_id)
 {
-  struct Listener *l;
+  struct Listener *listener;
 
-  for (l = listeners_head; NULL != l; l = l->next)
-    if ( (l->operation == op) &&
-         (0 == GNUNET_CRYPTO_hash_cmp (app_id, &l->app_id)) )
-      return l;
+  for (listener = listeners_head; NULL != listener; listener = listener->next)
+    if ( (listener->operation == op) &&
+         (0 == GNUNET_CRYPTO_hash_cmp (app_id, &listener->app_id)) )
+      return listener;
   return NULL;
 }
 
@@ -997,7 +996,7 @@
 
 
 /**
- * Called when a client wants to evaluate a set operation with another
+ * Called when a client wants to initiate a set operation with another
  * peer.  Initiates the CADET connection to the listener and sends the
  * request.
  *
@@ -1068,6 +1067,7 @@
                         struct GNUNET_SERVER_Client *client,
                         const struct GNUNET_MessageHeader *m)
 {
+  const struct GNUNET_SET_IterAckMessage *ack;
   struct Set *set;
 
   set = set_get (client);
@@ -1086,9 +1086,18 @@
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
+  ack = (const struct GNUNET_SET_IterAckMessage *) m;
   GNUNET_SERVER_receive_done (client,
                               GNUNET_OK);
-  send_client_element (set);
+  if (ntohl (ack->send_more))
+  {
+    send_client_element (set);
+  }
+  else
+  {
+    GNUNET_CONTAINER_multihashmap_iterator_destroy (set->iter);
+    set->iter = NULL;
+  }
 }
 
 
@@ -1111,19 +1120,17 @@
   struct Operation *op;
   int found;
 
-  // client without a set requested an operation
   set = set_get (client);
   if (NULL == set)
   {
+    /* client without a set requested an operation */
     GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
-
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "client requested cancel for op %u\n",
+              "Client requested cancel for op %u\n",
               ntohl (msg->request_id));
-
   found = GNUNET_NO;
   for (op = set->ops_head; NULL != op; op = op->next)
   {
@@ -1133,27 +1140,30 @@
       break;
     }
   }
-
-  /* It may happen that the operation was destroyed due to
-   * the other peer disconnecting.  The client may not know about this
-   * yet and try to cancel the (non non-existent) operation.
-   */
-  if (GNUNET_NO != found)
+  if (GNUNET_NO == found)
+  {
+    /* It may happen that the operation was already destroyed due to
+     * the other peer disconnecting.  The client may not know about this
+     * yet and try to cancel the (just barely non-existent) operation.
+     * So this is not a hard error.
+     */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client canceled non-existent op\n");
+  }
+  else
+  {
     _GSS_operation_destroy (op,
                             GNUNET_YES);
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "client canceled non-existent op\n");
-
-
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  }
+  GNUNET_SERVER_receive_done (client,
+                              GNUNET_OK);
 }
 
 
 /**
- * Handle a request from the client to accept
- * a set operation that came from a remote peer.
- * We forward the accept to the associated operation for handling
+ * Handle a request from the client to accept a set operation that
+ * came from a remote peer.  We forward the accept to the associated
+ * operation for handling
  *
  * @param cls unused
  * @param client the client
@@ -1167,28 +1177,27 @@
   struct Set *set;
   const struct GNUNET_SET_AcceptMessage *msg;
   struct Operation *op;
+  struct GNUNET_SET_ResultMessage *result_message;
+  struct GNUNET_MQ_Envelope *ev;
 
   msg = (const struct GNUNET_SET_AcceptMessage *) mh;
-
-  // client without a set requested an operation
   set = set_get (client);
-
   if (NULL == set)
   {
+    /* client without a set requested to accept */
     GNUNET_break (0);
     GNUNET_SERVER_client_disconnect (client);
     return;
   }
-
   op = get_incoming (ntohl (msg->accept_reject_id));
-
-  /* it is not an error if the set op does not exist -- it may
-   * have been destroyed when the partner peer disconnected. */
   if (NULL == op)
   {
-    struct GNUNET_SET_ResultMessage *result_message;
-    struct GNUNET_MQ_Envelope *ev;
-    ev = GNUNET_MQ_msg (result_message, GNUNET_MESSAGE_TYPE_SET_RESULT);
+    /* It is not an error if the set op does not exist -- it may
+     * have been destroyed when the partner peer disconnected. */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Client accepted request that is no longer active\n");
+    ev = GNUNET_MQ_msg (result_message,
+                        GNUNET_MESSAGE_TYPE_SET_RESULT);
     result_message->request_id = msg->request_id;
     result_message->element_type = 0;
     result_message->result_status = htons (GNUNET_SET_STATUS_FAILURE);
@@ -1198,33 +1207,24 @@
   }
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "client accepting %u\n",
+              "Client accepting request %u\n",
               ntohl (msg->accept_reject_id));
-
   GNUNET_assert (GNUNET_YES == op->is_incoming);
-
-
-  op->spec->set = set;
-
-  GNUNET_assert (GNUNET_YES == op->is_incoming);
   op->is_incoming = GNUNET_NO;
   GNUNET_CONTAINER_DLL_remove (incoming_head,
                                incoming_tail,
                                op);
-
-  GNUNET_assert (NULL != op->spec->set);
-  GNUNET_assert (NULL != op->spec->set->vt);
-
+  op->spec->set = set;
   GNUNET_CONTAINER_DLL_insert (set->ops_head,
                                set->ops_tail,
                                op);
-
   op->spec->client_request_id = ntohl (msg->request_id);
   op->spec->result_mode = ntohl (msg->result_mode);
   op->generation_created = set->current_generation++;
-  op->vt = op->spec->set->vt;
-  set->vt->accept (op);
-  GNUNET_SERVER_receive_done (client, GNUNET_OK);
+  op->vt = set->vt;
+  op->vt->accept (op);
+  GNUNET_SERVER_receive_done (client,
+                              GNUNET_OK);
 }
 
 
@@ -1240,10 +1240,8 @@
 {
   while (NULL != incoming_head)
     incoming_destroy (incoming_head);
-
   while (NULL != listeners_head)
     listener_destroy (listeners_head);
-
   while (NULL != sets_head)
     set_destroy (sets_head);
 
@@ -1280,7 +1278,7 @@
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "remote peer timed out\n");
+              "Remote peer's incoming request timed out\n");
   incoming_destroy (incoming);
 }
 
@@ -1319,7 +1317,7 @@
  * @param port Port this channel is for.
  * @param options Unused.
  * @return initial channel context for the channel
- *         (can be NULL -- that's not an error)
+ *         returns NULL on error
  */
 static void *
 channel_new_cb (void *cls,
@@ -1365,7 +1363,7 @@
  * GNUNET_CADET_channel_destroy() on the channel.
  *
  * The peer_disconnect function is part of a a virtual table set initially 
either
- * when a peer creates a new channel with us (channel_new_cb), or once we 
create
+ * when a peer creates a new channel with us (#channel_new_cb()), or once we 
create
  * a new channel ourselves (evaluate).
  *
  * Once we know the exact type of operation (union/intersection), the vt is
@@ -1384,7 +1382,7 @@
   struct Operation *op = channel_ctx;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "channel end cb called\n");
+              "channel_end_cb called\n");
   op->channel = NULL;
   /* the vt can be null if a client already requested canceling op. */
   if (NULL != op->vt)
@@ -1393,14 +1391,13 @@
                 "calling peer disconnect due to channel end\n");
     op->vt->peer_disconnect (op);
   }
-
-  if (GNUNET_YES == op->keep)
-    return;
-
-  /* cadet will never call us with the context again! */
-  GNUNET_free (channel_ctx);
+  if (GNUNET_YES != op->keep)
+  {
+    /* cadet will never call us with the context again! */
+    GNUNET_free (op);
+  }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "channel end cb finished\n");
+              "channel_end_cb finished\n");
 }
 
 
@@ -1432,16 +1429,17 @@
   int ret;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "dispatching cadet message (type: %u)\n",
+              "Dispatching cadet message (type: %u)\n",
               ntohs (message->type));
   /* do this before the handler, as the handler might kill the channel */
   GNUNET_CADET_receive_done (channel);
   if (NULL != op->vt)
-    ret = op->vt->msg_handler (op, message);
+    ret = op->vt->msg_handler (op,
+                               message);
   else
     ret = GNUNET_SYSERR;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "handled cadet message (type: %u)\n",
+              "Handled cadet message (type: %u)\n",
               ntohs (message->type));
   return ret;
 }
@@ -1460,34 +1458,48 @@
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
-    {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT,
-        sizeof (struct GNUNET_SET_AcceptMessage)},
-    {handle_client_iter_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ITER_ACK, 0},
-    {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
-    {handle_client_create_set, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE,
-        sizeof (struct GNUNET_SET_CreateMessage)},
-    {handle_client_iterate, NULL, GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
-        sizeof (struct GNUNET_MessageHeader)},
-    {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
-    {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN,
-        sizeof (struct GNUNET_SET_ListenMessage)},
-    {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT,
-        sizeof (struct GNUNET_SET_RejectMessage)},
-    {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
-    {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL,
-        sizeof (struct GNUNET_SET_CancelMessage)},
-    {NULL, NULL, 0, 0}
+    { &handle_client_accept, NULL,
+      GNUNET_MESSAGE_TYPE_SET_ACCEPT,
+      sizeof (struct GNUNET_SET_AcceptMessage)},
+    { &handle_client_iter_ack, NULL,
+      GNUNET_MESSAGE_TYPE_SET_ITER_ACK,
+      sizeof (struct GNUNET_SET_IterAckMessage) },
+    { &handle_client_add, NULL,
+      GNUNET_MESSAGE_TYPE_SET_ADD,
+      0},
+    { &handle_client_create_set, NULL,
+      GNUNET_MESSAGE_TYPE_SET_CREATE,
+      sizeof (struct GNUNET_SET_CreateMessage)},
+    { &handle_client_iterate, NULL,
+      GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST,
+      sizeof (struct GNUNET_MessageHeader)},
+    { &handle_client_evaluate, NULL,
+      GNUNET_MESSAGE_TYPE_SET_EVALUATE,
+      0},
+    { &handle_client_listen, NULL,
+      GNUNET_MESSAGE_TYPE_SET_LISTEN,
+      sizeof (struct GNUNET_SET_ListenMessage)},
+    { &handle_client_reject, NULL,
+      GNUNET_MESSAGE_TYPE_SET_REJECT,
+      sizeof (struct GNUNET_SET_RejectMessage)},
+    { &handle_client_remove, NULL,
+      GNUNET_MESSAGE_TYPE_SET_REMOVE,
+      0},
+    { &handle_client_cancel, NULL,
+      GNUNET_MESSAGE_TYPE_SET_CANCEL,
+      sizeof (struct GNUNET_SET_CancelMessage)},
+    { NULL, NULL, 0, 0}
   };
   static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
-    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
-    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
-    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
-    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
-    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
-    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
-    {dispatch_p2p_message, 
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
-    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
-    {dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART, 
0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_IBF, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_DONE, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_UNION_P2P_SE, 0},
+    { &dispatch_p2p_message, 
GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_ELEMENT_INFO, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF, 0},
+    { &dispatch_p2p_message, GNUNET_MESSAGE_TYPE_SET_INTERSECTION_P2P_BF_PART, 
0},
     {NULL, 0, 0}
   };
   static const uint32_t cadet_ports[] = {GNUNET_APPLICATION_TYPE_SET, 0};
@@ -1495,11 +1507,15 @@
   configuration = cfg;
   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
                                 &shutdown_task, NULL);
-  GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
-  GNUNET_SERVER_add_handlers (server, server_handlers);
-
-  cadet = GNUNET_CADET_connect (cfg, NULL, channel_new_cb, channel_end_cb,
-                              cadet_handlers, cadet_ports);
+  GNUNET_SERVER_disconnect_notify (server,
+                                   &handle_client_disconnect, NULL);
+  GNUNET_SERVER_add_handlers (server,
+                              server_handlers);
+  cadet = GNUNET_CADET_connect (cfg, NULL,
+                                &channel_new_cb,
+                                &channel_end_cb,
+                                cadet_handlers,
+                                cadet_ports);
   if (NULL == cadet)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -1517,14 +1533,15 @@
  * @return 0 ok, 1 on error
  */
 int
-main (int argc, char *const *argv)
+main (int argc,
+      char *const *argv)
 {
   int ret;
 
   ret = GNUNET_SERVICE_run (argc, argv, "set",
-                            GNUNET_SERVICE_OPTION_NONE, &run, NULL);
+                            GNUNET_SERVICE_OPTION_NONE,
+                            &run, NULL);
   return (GNUNET_OK == ret) ? 0 : 1;
 }
 
 /* end of gnunet-service-set.c */
-

Modified: gnunet/src/set/set.h
===================================================================
--- gnunet/src/set/set.h        2014-11-27 11:56:28 UTC (rev 34437)
+++ gnunet/src/set/set.h        2014-11-27 13:31:52 UTC (rev 34438)
@@ -207,7 +207,8 @@
   uint32_t request_id GNUNET_PACKED;
 
   /**
-   * Was the evaluation successful?
+   * Was the evaluation successful? Contains
+   * an `enum GNUNET_SET_Status` in NBO.
    */
   uint16_t result_status GNUNET_PACKED;
 

Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c    2014-11-27 11:56:28 UTC (rev 34437)
+++ gnunet/src/set/set_api.c    2014-11-27 13:31:52 UTC (rev 34438)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+     (C) 2012-2014 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -17,7 +17,6 @@
      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
      Boston, MA 02111-1307, USA.
 */
-
 /**
  * @file set/set_api.c
  * @brief api for the set service
@@ -44,7 +43,7 @@
   struct GNUNET_CLIENT_Connection *client;
 
   /**
-   * Message queue for 'client'.
+   * Message queue for @e client.
    */
   struct GNUNET_MQ_Handle *mq;
 
@@ -59,30 +58,30 @@
   struct GNUNET_SET_OperationHandle *ops_tail;
 
   /**
-   * Should the set be destroyed once all operations are gone?
+   * Callback for the current iteration over the set,
+   * NULL if no iterator is active.
    */
-  int destroy_requested;
+  GNUNET_SET_ElementIterator iterator;
 
   /**
-   * Has the set become invalid (e.g. service died)?
+   * Closure for @e iterator
    */
-  int invalid;
+  void *iterator_cls;
 
   /**
-   * Callback for the current iteration over the set,
-   * NULL if no iterator is active.
+   * Should the set be destroyed once all operations are gone?
    */
-  GNUNET_SET_ElementIterator iterator;
+  int destroy_requested;
 
   /**
-   * Closure for 'iterator'
+   * Has the set become invalid (e.g. service died)?
    */
-  void *iterator_cls;
+  int invalid;
 };
 
 
 /**
- * Opaque handle to a set operation request from another peer.
+ * Handle for a set operation request from another peer.
  */
 struct GNUNET_SET_Request
 {
@@ -94,15 +93,14 @@
 
   /**
    * Has the request been accepted already?
-   * GNUNET_YES/GNUNET_NO
+   * #GNUNET_YES/#GNUNET_NO
    */
   int accepted;
 };
 
 
 /**
- * Handle to an operation.
- * Only known to the service after commiting
+ * Handle to an operation.  Only known to the service after committing
  * the handle with a set.
  */
 struct GNUNET_SET_OperationHandle
@@ -114,7 +112,7 @@
   GNUNET_SET_ResultIterator result_cb;
 
   /**
-   * Closure for result_cb.
+   * Closure for @e result_cb.
    */
   void *result_cls;
 
@@ -125,11 +123,6 @@
   struct GNUNET_SET_Handle *set;
 
   /**
-   * Request ID to identify the operation within the set.
-   */
-  uint32_t request_id;
-
-  /**
    * Message sent to the server on calling conclude,
    * NULL if conclude has been called.
    */
@@ -150,6 +143,11 @@
    * Handles are kept in a linked list.
    */
   struct GNUNET_SET_OperationHandle *next;
+
+  /**
+   * Request ID to identify the operation within the set.
+   */
+  uint32_t request_id;
 };
 
 
@@ -182,16 +180,11 @@
   GNUNET_SET_ListenCallback listen_cb;
 
   /**
-   * Closure for listen_cb.
+   * Closure for @e listen_cb.
    */
   void *listen_cls;
 
   /**
-   * Operation we listen for.
-   */
-  enum GNUNET_SET_OperationType operation;
-
-  /**
    * Application ID we listen for.
    */
   struct GNUNET_HashCode app_id;
@@ -205,59 +198,78 @@
    * Task for reconnecting when the listener fails.
    */
   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
+
+  /**
+   * Operation we listen for.
+   */
+  enum GNUNET_SET_OperationType operation;
 };
 
 
-/* forward declaration */
-static void
-listen_connect (void *cls,
-                const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
 /**
- * Handle element for iteration over the set.
+ * Handle element for iteration over the set.  Notifies the
+ * iterator and sends an acknowledgement to the service.
  *
  * @param cls the set
  * @param mh the message
  */
 static void
-handle_iter_element (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_iter_element (void *cls,
+                     const struct GNUNET_MessageHeader *mh)
 {
   struct GNUNET_SET_Handle *set = cls;
+  GNUNET_SET_ElementIterator iter = set->iterator;
   struct GNUNET_SET_Element element;
-  const struct GNUNET_SET_IterResponseMessage *msg =
-    (const struct GNUNET_SET_IterResponseMessage *) mh;
+  const struct GNUNET_SET_IterResponseMessage *msg;
   struct GNUNET_SET_IterAckMessage *ack_msg;
   struct GNUNET_MQ_Envelope *ev;
+  uint16_t msize;
 
-  if (NULL == set->iterator)
-    return;
-
-  element.size = ntohs (mh->size) - sizeof (struct 
GNUNET_SET_IterResponseMessage);
-  element.element_type = htons (msg->element_type);
-  element.data = &msg[1];
-  set->iterator (set->iterator_cls, &element);
-  ev = GNUNET_MQ_msg (ack_msg, GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
-  ack_msg->send_more = htonl (1);
+  msize = ntohs (mh->size);
+  if (msize < sizeof (sizeof (struct GNUNET_SET_IterResponseMessage)))
+  {
+    /* message malformed */
+    GNUNET_break (0);
+    set->iterator = NULL;
+    iter (set->iterator_cls,
+          NULL);
+    iter = NULL;
+  }
+  if (NULL != iter)
+  {
+    msg = (const struct GNUNET_SET_IterResponseMessage *) mh;
+    element.size = msize - sizeof (struct GNUNET_SET_IterResponseMessage);
+    element.element_type = htons (msg->element_type);
+    element.data = &msg[1];
+    iter (set->iterator_cls,
+          &element);
+  }
+  ev = GNUNET_MQ_msg (ack_msg,
+                      GNUNET_MESSAGE_TYPE_SET_ITER_ACK);
+  ack_msg->send_more = htonl ((NULL != iter));
   GNUNET_MQ_send (set->mq, ev);
 }
 
 
 /**
- * Handle element for iteration over the set.
+ * Handle message signalling conclusion of iteration over the set.
+ * Notifies the iterator that we are done.
  *
  * @param cls the set
  * @param mh the message
  */
 static void
-handle_iter_done (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_iter_done (void *cls,
+                  const struct GNUNET_MessageHeader *mh)
 {
   struct GNUNET_SET_Handle *set = cls;
+  GNUNET_SET_ElementIterator iter = set->iterator;
 
-  if (NULL == set->iterator)
+  if (NULL == iter)
     return;
-
-  set->iterator (set->iterator_cls, NULL);
+  set->iterator = NULL;
+  iter (set->iterator_cls,
+        NULL);
 }
 
 
@@ -268,47 +280,53 @@
  * @param mh the message
  */
 static void
-handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
+handle_result (void *cls,
+               const struct GNUNET_MessageHeader *mh)
 {
+  struct GNUNET_SET_Handle *set = cls;
   const struct GNUNET_SET_ResultMessage *msg;
-  struct GNUNET_SET_Handle *set = cls;
   struct GNUNET_SET_OperationHandle *oh;
   struct GNUNET_SET_Element e;
   enum GNUNET_SET_Status result_status;
 
   msg = (const struct GNUNET_SET_ResultMessage *) mh;
-  GNUNET_assert (NULL != set);
   GNUNET_assert (NULL != set->mq);
-
   result_status = ntohs (msg->result_status);
-
-  oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id));
-  // 'oh' can be NULL if we canceled the operation, but the service
-  // did not get the cancel message yet.
+  oh = GNUNET_MQ_assoc_get (set->mq,
+                            ntohl (msg->request_id));
   if (NULL == oh)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ignoring result from canceled 
operation\n");
+    /* 'oh' can be NULL if we canceled the operation, but the service
+       did not get the cancel message yet. */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Ignoring result from canceled operation\n");
     return;
   }
-  /* status is not STATUS_OK => there's no attached element,
-   * and this is the last result message we get */
   if (GNUNET_SET_STATUS_OK != result_status)
   {
+    /* status is not STATUS_OK => there's no attached element,
+     * and this is the last result message we get */
     GNUNET_MQ_assoc_remove (set->mq, ntohl (msg->request_id));
-    GNUNET_CONTAINER_DLL_remove (oh->set->ops_head, oh->set->ops_tail, oh);
-    if (GNUNET_YES == oh->set->destroy_requested)
-      GNUNET_SET_destroy (oh->set);
+    GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                                 set->ops_tail,
+                                 oh);
+    if ( (GNUNET_YES == set->destroy_requested) &&
+         (NULL == set->ops_head) )
+      GNUNET_SET_destroy (set);
     if (NULL != oh->result_cb)
-      oh->result_cb (oh->result_cls, NULL, result_status);
+      oh->result_cb (oh->result_cls,
+                     NULL,
+                     result_status);
     GNUNET_free (oh);
     return;
   }
-
   e.data = &msg[1];
   e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
   e.element_type = msg->element_type;
   if (NULL != oh->result_cb)
-    oh->result_cb (oh->result_cls, &e, result_status);
+    oh->result_cb (oh->result_cls,
+                   &e,
+                   result_status);
 }
 
 
@@ -360,93 +378,30 @@
 }
 
 
-static void
-handle_client_listener_error (void *cls,
-                              enum GNUNET_MQ_Error error)
-{
-  struct GNUNET_SET_ListenHandle *lh = cls;
-
-  LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "listener broke down, re-connecting\n");
-  GNUNET_CLIENT_disconnect (lh->client);
-  lh->client = NULL;
-  GNUNET_MQ_destroy (lh->mq);
-  lh->mq = NULL;
-  lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
-                                                     &listen_connect, lh);
-  lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
-}
-
-
 /**
- * Destroy the set handle if no operations are left, mark the set
- * for destruction otherwise.
+ * Destroy the given set operation.
  *
- * @param set set handle to destroy
+ * @param oh set operation to destroy
  */
-static int
-set_destroy (struct GNUNET_SET_Handle *set)
+static void
+set_operation_destroy (struct GNUNET_SET_OperationHandle *oh)
 {
-  if (NULL != set->ops_head)
-  {
-    set->destroy_requested = GNUNET_YES;
-    return GNUNET_NO;
-  }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Really destroying set\n");
-  GNUNET_CLIENT_disconnect (set->client);
-  set->client = NULL;
-  GNUNET_MQ_destroy (set->mq);
-  set->mq = NULL;
-  GNUNET_free (set);
-  return GNUNET_YES;
-}
+  struct GNUNET_SET_Handle *set = oh->set;
+  struct GNUNET_SET_OperationHandle *h_assoc;
 
-
-/**
- * Cancel the given set operation.  We need to send an explicit cancel message,
- * as all operations one one set communicate using one handle.
- *
- * In contrast to #GNUNET_SET_operation_cancel(), this function indicates 
whether
- * the set of the operation has been destroyed because all operations are done 
and
- * the set's destruction was requested before.
- *
- * @param oh set operation to cancel
- * @return #GNUNET_YES if the set of the operation was destroyed
- */
-static int
-set_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
-{
-  int ret = GNUNET_NO;
-
   if (NULL != oh->conclude_mqm)
     GNUNET_MQ_discard (oh->conclude_mqm);
-
   /* is the operation already commited? */
-  if (NULL != oh->set)
+  if (NULL != set)
   {
-    struct GNUNET_SET_OperationHandle *h_assoc;
-    struct GNUNET_SET_CancelMessage *m;
-    struct GNUNET_MQ_Envelope *mqm;
-
-    GNUNET_CONTAINER_DLL_remove (oh->set->ops_head,
-                                 oh->set->ops_tail,
+    GNUNET_CONTAINER_DLL_remove (set->ops_head,
+                                 set->ops_tail,
                                  oh);
-    h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq,
+    h_assoc = GNUNET_MQ_assoc_remove (set->mq,
                                       oh->request_id);
-    GNUNET_assert ((h_assoc == NULL) || (h_assoc == oh));
-    mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
-    m->request_id = htonl (oh->request_id);
-    GNUNET_MQ_send (oh->set->mq, mqm);
-
-    if (GNUNET_YES == oh->set->destroy_requested)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG,
-           "Destroying set after operation cancel\n");
-      ret = set_destroy (oh->set);
-    }
+    GNUNET_assert ((NULL == h_assoc) || (h_assoc == oh));
   }
   GNUNET_free (oh);
-  return ret;
 }
 
 
@@ -460,27 +415,58 @@
 void
 GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
 {
-  (void) set_operation_cancel (oh);
+  struct GNUNET_SET_Handle *set = oh->set;
+  struct GNUNET_SET_CancelMessage *m;
+  struct GNUNET_MQ_Envelope *mqm;
+
+  if (NULL != set)
+  {
+    mqm = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_SET_CANCEL);
+    m->request_id = htonl (oh->request_id);
+    GNUNET_MQ_send (set->mq, mqm);
+  }
+  set_operation_destroy (oh);
+  if ( (NULL != set) &&
+       (GNUNET_YES == set->destroy_requested) &&
+       (NULL == set->ops_head) )
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Destroying set after operation cancel\n");
+    GNUNET_SET_destroy (set);
+  }
 }
 
 
+/**
+ * We encountered an error communicating with the set service while
+ * performing a set operation. Report to the application.
+ *
+ * @param cls the `struct GNUNET_SET_Handle`
+ * @param error error code
+ */
 static void
-handle_client_set_error (void *cls, enum GNUNET_MQ_Error error)
+handle_client_set_error (void *cls,
+                         enum GNUNET_MQ_Error error)
 {
   struct GNUNET_SET_Handle *set = cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "handling client set error\n");
-
+       "Handling client set error\n");
   while (NULL != set->ops_head)
   {
     if (NULL != set->ops_head->result_cb)
-      set->ops_head->result_cb (set->ops_head->result_cls, NULL,
+      set->ops_head->result_cb (set->ops_head->result_cls,
+                                NULL,
                                 GNUNET_SET_STATUS_FAILURE);
-    if (GNUNET_YES == set_operation_cancel (set->ops_head))
-      return; /* stop if the set is destroyed */
+    set_operation_destroy (set->ops_head);
   }
   set->invalid = GNUNET_YES;
+  if (GNUNET_YES == set->destroy_requested)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Destroying set after operation failure\n");
+    GNUNET_SET_destroy (set);
+  }
 }
 
 
@@ -500,9 +486,11 @@
                    enum GNUNET_SET_OperationType op)
 {
   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
-    {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0},
-    {handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0},
-    {handle_iter_done, GNUNET_MESSAGE_TYPE_SET_ITER_DONE, 0},
+    { &handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT, 0},
+    { &handle_iter_element, GNUNET_MESSAGE_TYPE_SET_ITER_ELEMENT, 0},
+    { &handle_iter_done,
+      GNUNET_MESSAGE_TYPE_SET_ITER_DONE,
+      sizeof (struct GNUNET_MessageHeader) },
     GNUNET_MQ_HANDLERS_END
   };
   struct GNUNET_SET_Handle *set;
@@ -511,12 +499,17 @@
 
   set = GNUNET_new (struct GNUNET_SET_Handle);
   set->client = GNUNET_CLIENT_connect ("set", cfg);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "set client created\n");
-  GNUNET_assert (NULL != set->client);
-  set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers,
-                                                   handle_client_set_error, 
set);
+  if (NULL == set->client)
+  {
+    GNUNET_free (set);
+    return NULL;
+  }
+  set->mq = GNUNET_MQ_queue_for_connection_client (set->client,
+                                                   mq_handlers,
+                                                   &handle_client_set_error, 
set);
   GNUNET_assert (NULL != set->mq);
-  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
+  mqm = GNUNET_MQ_msg (msg,
+                       GNUNET_MESSAGE_TYPE_SET_CREATE);
   msg->operation = htonl (op);
   GNUNET_MQ_send (set->mq, mqm);
   return set;
@@ -524,10 +517,10 @@
 
 
 /**
- * Add an element to the given set.
- * After the element has been added (in the sense of being
- * transmitted to the set service), cont will be called.
- * Calls to add_element can be queued
+ * Add an element to the given set.  After the element has been added
+ * (in the sense of being transmitted to the set service), @a cont
+ * will be called.  Multiple calls to GNUNET_SET_add_element() can be
+ * queued.
  *
  * @param set set to add element to
  * @param element element to add to the set
@@ -551,21 +544,24 @@
       cont (cont_cls);
     return GNUNET_SYSERR;
   }
-
-  mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD);
+  mqm = GNUNET_MQ_msg_extra (msg, element->size,
+                             GNUNET_MESSAGE_TYPE_SET_ADD);
   msg->element_type = element->element_type;
-  memcpy (&msg[1], element->data, element->size);
-  GNUNET_MQ_notify_sent (mqm, cont, cont_cls);
+  memcpy (&msg[1],
+          element->data,
+          element->size);
+  GNUNET_MQ_notify_sent (mqm,
+                         cont, cont_cls);
   GNUNET_MQ_send (set->mq, mqm);
   return GNUNET_OK;
 }
 
 
 /**
- * Remove an element to the given set.
- * After the element has been removed (in the sense of the
- * request being transmitted to the set service), cont will be called.
- * Calls to remove_element can be queued
+ * Remove an element to the given set.  After the element has been
+ * removed (in the sense of the request being transmitted to the set
+ * service), @a cont will be called.  Multiple calls to
+ * GNUNET_SET_remove_element() can be queued
  *
  * @param set set to remove element from
  * @param element element to remove from the set
@@ -589,27 +585,49 @@
       cont (cont_cls);
     return GNUNET_SYSERR;
   }
-
   mqm = GNUNET_MQ_msg_extra (msg,
                              element->size,
                              GNUNET_MESSAGE_TYPE_SET_REMOVE);
   msg->element_type = element->element_type;
-  memcpy (&msg[1], element->data, element->size);
-  GNUNET_MQ_notify_sent (mqm, cont, cont_cls);
+  memcpy (&msg[1],
+          element->data,
+          element->size);
+  GNUNET_MQ_notify_sent (mqm,
+                         cont, cont_cls);
   GNUNET_MQ_send (set->mq, mqm);
   return GNUNET_OK;
 }
 
 
 /**
- * Destroy the set handle, and free all associated resources.
+ * Destroy the set handle if no operations are left, mark the set
+ * for destruction otherwise.
  *
  * @param set set handle to destroy
  */
 void
 GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
 {
-  (void) set_destroy (set);
+  if (NULL != set->ops_head)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Set operations are pending, delaying set destruction\n");
+    set->destroy_requested = GNUNET_YES;
+    return;
+  }
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Really destroying set\n");
+  if (NULL != set->client)
+  {
+    GNUNET_CLIENT_disconnect (set->client);
+    set->client = NULL;
+  }
+  if (NULL != set->mq)
+  {
+    GNUNET_MQ_destroy (set->mq);
+    set->mq = NULL;
+  }
+  GNUNET_free (set);
 }
 
 
@@ -656,43 +674,76 @@
 
 
 /**
- * Connect to the set service in order to listen
- * for request.
+ * Connect to the set service in order to listen for requests.
  *
- * @param cls the listen handle to connect
+ * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
  * @param tc task context if invoked as a task, NULL otherwise
  */
 static void
 listen_connect (void *cls,
+                const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Our connection with the set service encountered an error,
+ * re-initialize with exponential back-off.
+ *
+ * @param cls the `struct GNUNET_SET_ListenHandle *`
+ * @param error reason for the disconnect
+ */
+static void
+handle_client_listener_error (void *cls,
+                              enum GNUNET_MQ_Error error)
+{
+  struct GNUNET_SET_ListenHandle *lh = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Listener broke down (%d), re-connecting\n",
+       (int) error);
+  GNUNET_CLIENT_disconnect (lh->client);
+  lh->client = NULL;
+  GNUNET_MQ_destroy (lh->mq);
+  lh->mq = NULL;
+  lh->reconnect_task = GNUNET_SCHEDULER_add_delayed (lh->reconnect_backoff,
+                                                     &listen_connect, lh);
+  lh->reconnect_backoff = GNUNET_TIME_STD_BACKOFF (lh->reconnect_backoff);
+}
+
+
+/**
+ * Connect to the set service in order to listen for requests.
+ *
+ * @param cls the `struct GNUNET_SET_ListenHandle *` to connect
+ * @param tc task context if invoked as a task, NULL otherwise
+ */
+static void
+listen_connect (void *cls,
                 const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  struct GNUNET_MQ_Envelope *mqm;
-  struct GNUNET_SET_ListenMessage *msg;
-  struct GNUNET_SET_ListenHandle *lh = cls;
   static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
-    {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
+    { &handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST },
     GNUNET_MQ_HANDLERS_END
   };
+  struct GNUNET_SET_ListenHandle *lh = cls;
+  struct GNUNET_MQ_Envelope *mqm;
+  struct GNUNET_SET_ListenMessage *msg;
 
-  if ((tc != NULL) &&(tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
+  if ( (NULL != tc) &&
+       (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) )
   {
-    LOG (GNUNET_ERROR_TYPE_DEBUG, "listener not reconnecting due to 
shutdown\n");
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+         "Listener not reconnecting due to shutdown\n");
     return;
   }
-
   lh->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
 
   GNUNET_assert (NULL == lh->client);
   lh->client = GNUNET_CLIENT_connect ("set", lh->cfg);
   if (NULL == lh->client)
-  {
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "could not connect to set (wrong configuration?), giving up 
listening\n");
     return;
-  }
   GNUNET_assert (NULL == lh->mq);
   lh->mq = GNUNET_MQ_queue_for_connection_client (lh->client, mq_handlers,
-                                                  
handle_client_listener_error, lh);
+                                                  
&handle_client_listener_error, lh);
   mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_LISTEN);
   msg->operation = htonl (lh->operation);
   msg->app_id = lh->app_id;
@@ -709,7 +760,7 @@
  * @param app_id id of the application that handles set operation requests
  * @param listen_cb called for each incoming request matching the operation
  *                  and application id
- * @param listen_cls handle for listen_cb
+ * @param listen_cls handle for @a listen_cb
  * @return a handle that can be used to cancel the listen operation
  */
 struct GNUNET_SET_ListenHandle *
@@ -729,6 +780,11 @@
   lh->app_id = *app_id;
   lh->reconnect_backoff = GNUNET_TIME_UNIT_MILLISECONDS;
   listen_connect (lh, NULL);
+  if (NULL == lh->client)
+  {
+    GNUNET_free (lh);
+    return NULL;
+  }
   return lh;
 }
 
@@ -741,8 +797,8 @@
 void
 GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
 {
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "canceling listener\n");
-  /* listener's connection may have failed, thus mq/client could be NULL */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Canceling listener\n");
   if (NULL != lh->mq)
   {
     GNUNET_MQ_destroy (lh->mq);
@@ -786,21 +842,16 @@
   struct GNUNET_SET_OperationHandle *oh;
   struct GNUNET_SET_AcceptMessage *msg;
 
-  GNUNET_assert (NULL != request);
   GNUNET_assert (GNUNET_NO == request->accepted);
   request->accepted = GNUNET_YES;
-
+  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
+  msg->accept_reject_id = htonl (request->accept_id);
+  msg->result_mode = htonl (result_mode);
   oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
   oh->result_cb = result_cb;
   oh->result_cls = result_cls;
-
-  mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
-  msg->accept_reject_id = htonl (request->accept_id);
-  msg->result_mode = htonl (result_mode);
-
   oh->conclude_mqm = mqm;
   oh->request_id_addr = &msg->request_id;
-
   return oh;
 }
 
@@ -840,9 +891,9 @@
 
 
 /**
- * Iterate over all elements in the given set.
- * Note that this operation involves transferring every element of the set
- * from the service to the client, and is thus costly.
+ * Iterate over all elements in the given set.  Note that this
+ * operation involves transferring every element of the set from the
+ * service to the client, and is thus costly.
  *
  * @param set the set to iterate over
  * @param iter the iterator to call for each element
@@ -858,15 +909,13 @@
 {
   struct GNUNET_MQ_Envelope *ev;
 
-
   GNUNET_assert (NULL != iter);
-
   if (GNUNET_YES == set->invalid)
     return GNUNET_SYSERR;
   if (NULL != set->iterator)
     return GNUNET_NO;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "iterating set\n");
+       "Iterating over set\n");
   set->iterator = iter;
   set->iterator_cls = iter_cls;
   ev = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ITER_REQUEST);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]