gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (8b48d9288 -> 8e157def2)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (8b48d9288 -> 8e157def2)
Date: Fri, 25 Jan 2019 18:35:08 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a change to branch master
in repository gnunet.

    from 8b48d9288 fixed up documentation for ascension
     new 7536db9fc fix mddl insert after macro
     new 8e157def2 add prototypes for handlers for incoming messages

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/include/gnunet_container_lib.h |   2 +-
 src/include/gnunet_mq_lib.h        |  45 +++
 src/include/gnunet_protocols.h     |   5 +
 src/transport/gnunet-service-tng.c | 565 ++++++++++++++++++++++++++++++++-----
 src/util/mq.c                      |  44 ++-
 5 files changed, 590 insertions(+), 71 deletions(-)

diff --git a/src/include/gnunet_container_lib.h 
b/src/include/gnunet_container_lib.h
index e7bd4113d..fee851e1c 100644
--- a/src/include/gnunet_container_lib.h
+++ b/src/include/gnunet_container_lib.h
@@ -2011,7 +2011,7 @@ GNUNET_CONTAINER_multihashmap32_iterator_destroy (struct 
GNUNET_CONTAINER_MultiH
   if (NULL == (element)->next_##mdll) \
     (tail) = (element); \
   else \
-    (element)->next->prev_##mdll = (element); } while (0)
+    (element)->next_##mdll->prev_##mdll = (element); } while (0)
 
 
 /**
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index 3f67dc365..2a459636a 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -527,6 +527,51 @@ struct GNUNET_MQ_MessageHandler
   }
 
 
+/**
+ * Insert code for a "check_" function that verifies that
+ * a given variable-length message received over the network
+ * is followed by another variable-length message that fits
+ * exactly with the given size.  If the message @a m
+ * is not followed by another `struct GNUNET_MessageHeader`
+ * with a size that adds up to the total size, an error is logged
+ * and the function is returned with #GNUNET_NO.
+ *
+ * @param an IPC message with proper type to determine
+ *  the size, starting with a `struct GNUNET_MessageHeader`
+ */
+#define GNUNET_MQ_check_boxed_message(m)                \
+  {                                                     \
+    const struct GNUNET_MessageHeader *inbox =          \
+      (const struct GNUNET_MessageHeader *) &m[1];      \
+    const struct GNUNET_MessageHeader *hdr =            \
+      (const struct GNUNET_MessageHeader *) m;          \
+    uint16_t slen = ntohs (hdr->size) - sizeof (*m);    \
+    if ( (slen < sizeof (struct GNUNET_MessageHeader))||\
+         (slen != ntohs (inbox->size)) )                \
+    {                                                   \
+      GNUNET_break (0);                                 \
+      return GNUNET_NO;                                 \
+    }                                                   \
+  }
+
+
+/**
+ * Call the message message handler that was registered
+ * for the type of the given message in the given @a handlers list.
+ *
+ * This function is indended to be used for the implementation
+ * of message queues.
+ *
+ * @param handlers a set of handlers
+ * @param mh message to dispatch
+ * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
+ *         #GNUNET_SYSERR if message was rejected by check function
+ */
+int
+GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
+                          const struct GNUNET_MessageHeader *mh);
+
+
 /**
  * Create a new envelope.
  *
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 8593005d7..a8d716b3f 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3124,6 +3124,11 @@ extern "C"
  */ 
 #define GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX 1219
 
+/**
+ * Transport affirming receipt of an ephemeral key.
+ */ 
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION 1220
+
 /**
  * Message sent to indicate to the transport that a monitor
  * wants to observe certain events.
diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 8febbdfff..3cccf5173 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -33,7 +33,7 @@
  *       transport-to-transport traffic)
  *
  * Implement:
- * - manage fragmentation/defragmentation, retransmission, track RTT, loss, 
etc.
+ * - manage defragmentation, retransmission, track RTT, loss, etc.
  *
  * Easy:
  * - use ATS bandwidth allocation callback and schedule transmissions!
@@ -165,8 +165,8 @@ struct TransportBackchannelEncapsulationMessage
 
 
 /**
- * Message by which a peer confirms that it is using an
- * ephemeral key.
+ * Body by which a peqer confirms that it is using an ephemeral
+ * key.
  */
 struct EphemeralConfirmation
 {
@@ -191,6 +191,37 @@ struct EphemeralConfirmation
 };
 
 
+/**
+ * Message by which a peqer confirms that it is using an ephemeral
+ * key.
+ */
+struct EphemeralConfirmationMessage
+{
+
+  /**
+   * Message header, type is 
#GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Must be zero.
+   */  
+  uint32_t reserved;
+  
+  /**
+   * How long is this signature over the ephemeral key
+   * valid?
+   */
+  struct GNUNET_TIME_AbsoluteNBO ephemeral_validity;
+
+  /**
+   * Ephemeral key setup by the sender for @e target, used
+   * to encrypt the payload.
+   */
+  struct GNUNET_CRYPTO_EcdhePublicKey ephemeral_key;
+};
+
+
 /**
  * Plaintext of the variable-size payload that is encrypted
  * within a `struct TransportBackchannelEncapsulationMessage`
@@ -863,7 +894,12 @@ struct PendingMessage
    * Kept in a MDLL of messages from this @a cpm  (if @e pmt is 
#PMT_FRAGMENT_BOX)
    */
   struct PendingMessage *prev_frag;
-    
+
+  /**
+   * This message, reliability boxed. Only possibly available if @e pmt is 
#PMT_CORE.
+   */ 
+  struct PendingMessage *bpm;
+  
   /**
    * Target of the request.
    */
@@ -1797,6 +1833,36 @@ free_fragment_tree (struct PendingMessage *root)
 }
 
 
+/**
+ * Release memory associated with @a pm and remove @a pm from associated
+ * data structures.  @a pm must be a top-level pending message and not
+ * a fragment in the tree.  The entire tree is freed (if applicable).
+ *
+ * @param pm the pending message to free
+ */
+static void
+free_pending_message (struct PendingMessage *pm)
+{
+  struct TransportClient *tc = pm->client;
+  struct Neighbour *target = pm->target;
+
+  if (NULL != tc)
+  {
+    GNUNET_CONTAINER_MDLL_remove (client,
+                                 tc->details.core.pending_msg_head,
+                                 tc->details.core.pending_msg_tail,
+                                 pm);
+  }
+  GNUNET_CONTAINER_MDLL_remove (neighbour,
+                               target->pending_msg_head,
+                               target->pending_msg_tail,
+                               pm);
+  free_fragment_tree (pm);
+  GNUNET_free_non_null (pm->bpm);
+  GNUNET_free (pm);
+}
+
+
 /**
  * Send a response to the @a pm that we have processed a
  * "send" request with status @a success. We
@@ -1829,17 +1895,8 @@ client_send_response (struct PendingMessage *pm,
     som->peer = target->pid;
     GNUNET_MQ_send (tc->mq,
                    env);
-    GNUNET_CONTAINER_MDLL_remove (client,
-                                 tc->details.core.pending_msg_head,
-                                 tc->details.core.pending_msg_tail,
-                                 pm);
   }
-  GNUNET_CONTAINER_MDLL_remove (neighbour,
-                               target->pending_msg_head,
-                               target->pending_msg_tail,
-                               pm);
-  free_fragment_tree (pm);
-  GNUNET_free (pm);
+  free_pending_message (pm);
 }
 
 
@@ -2175,37 +2232,292 @@ handle_del_address (void *cls,
 }
 
 
+/**
+ * Context from #handle_incoming_msg().  Closure for many
+ * message handlers below.
+ */
+struct CommunicatorMessageContext
+{
+  /**
+   * Which communicator provided us with the message.
+   */
+  struct TransportClient *tc;
+
+  /**
+   * Additional information for flow control and about the sender.
+   */
+  struct GNUNET_TRANSPORT_IncomingMessage im;
+};
+
+
+/**
+ * Send ACK to communicator (if requested) and free @a cmc.
+ *
+ * @param cmc context for which we are done handling the message
+ */
+static void
+finish_cmc_handling (struct CommunicatorMessageContext *cmc)
+{
+  // FIXME: if (0 != ntohl (im->fc_on)) => send ACK when done to communicator 
for flow control!
+  GNUNET_SERVICE_client_continue (cmc->tc->client);
+
+  GNUNET_free (cmc);
+}
+
+
+/**
+ * Communicator gave us an unencapsulated message to pass
+ * as-is to CORE.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param mh the message that was received
+ */
+static void
+handle_raw_message (void *cls,
+                   const struct GNUNET_MessageHeader *mh)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a fragment box.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param fb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_fragment_box (void *cls,
+                   const struct TransportFragmentBox *fb)
+{
+  // FIXME! check that off + size-of-payload <= total-length!
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a fragment.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param fb the message that was received
+ */
+static void
+handle_fragment_box (void *cls,
+                    const struct TransportFragmentBox *fb)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a fragment acknowledgement.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param fa the message that was received
+ */
+static void
+handle_fragment_ack (void *cls,
+                    const struct TransportFragmentAckMessage *fa)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a reliability box.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param rb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_reliability_box (void *cls,
+                      const struct TransportReliabilityBox *rb)
+{
+  GNUNET_MQ_check_boxed_message (rb);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a reliability box.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param rb the message that was received
+ */
+static void
+handle_reliability_box (void *cls,
+                       const struct TransportReliabilityBox *rb)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a reliability ack.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param ra the message that was received
+ */
+static void
+handle_reliability_ack (void *cls,
+                       const struct TransportReliabilityAckMessage *ra)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a backchannel encapsulation.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param be the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_backchannel_encapsulation (void *cls,
+                                const struct 
TransportBackchannelEncapsulationMessage *be)
+{
+  // FIXME: do work!
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a backchannel encapsulation.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param be the message that was received
+ */
+static void
+handle_backchannel_encapsulation (void *cls,
+                                 const struct 
TransportBackchannelEncapsulationMessage *be)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us an ephemeral confirmation.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param ec the message that was received
+ */
+static void
+handle_ephemeral_confirmation (void *cls,
+                              const struct EphemeralConfirmationMessage *ec)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a DV learn message.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvl the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_dv_learn (void *cls,
+               const struct TransportDVLearn *dvl)
+{
+  // FIXME: do work!
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a DV learn message.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param dvl the message that was received
+ */
+static void
+handle_dv_learn (void *cls,
+                const struct TransportDVLearn *dvl)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
+/**
+ * Communicator gave us a DV box.  Check the message.
+ *
+ * @param cls a `struct CommunicatorMessageContext`
+ * @param dvb the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
+ */
+static int
+check_dv_box (void *cls,
+             const struct TransportDVBox *dvb)
+{
+  // FIXME: do work!
+  return GNUNET_YES;
+}
+
+
+/**
+ * Communicator gave us a DV box.  Process the request.
+ *
+ * @param cls a `struct CommunicatorMessageContext` (must call 
#finish_cmc_handling() when done)
+ * @param dvb the message that was received
+ */
+static void
+handle_dv_box (void *cls,
+              const struct TransportDVBox *dvb)
+{
+  struct CommunicatorMessageContext *cmc = cls;
+  
+  // FIXME: do work!
+  finish_cmc_handling (cmc);
+}
+
+
 /**
  * Client notified us about transmission from a peer.  Process the request.
  *
- * @param cls the client
+ * @param cls a `struct TransportClient` which sent us the message
  * @param obm the send message that was sent
+ * @return #GNUNET_YES if message is well-formed
  */
 static int
 check_incoming_msg (void *cls,
                     const struct GNUNET_TRANSPORT_IncomingMessage *im)
 {
   struct TransportClient *tc = cls;
-  uint16_t size;
-  const struct GNUNET_MessageHeader *obmm;
 
   if (CT_COMMUNICATOR != tc->type)
   {
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  size = ntohs (im->header.size) - sizeof (*im);
-  if (size < sizeof (struct GNUNET_MessageHeader))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  obmm = (const struct GNUNET_MessageHeader *) &im[1];
-  if (size != ntohs (obmm->size))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
+  GNUNET_MQ_check_boxed_message (im);
   return GNUNET_OK;
 }
 
@@ -2213,7 +2525,6 @@ check_incoming_msg (void *cls,
 /**
  * Incoming meessage.  Process the request.
  *
- * @param cls the client
  * @param im the send message that was received
  */
 static void
@@ -2221,8 +2532,61 @@ handle_incoming_msg (void *cls,
                      const struct GNUNET_TRANSPORT_IncomingMessage *im)
 {
   struct TransportClient *tc = cls;
+  struct CommunicatorMessageContext *cmc = GNUNET_new (struct 
CommunicatorMessageContext);
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (fragment_box,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT,
+                          struct TransportFragmentBox,
+                          &cmc),
+    GNUNET_MQ_hd_fixed_size (fragment_ack,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK,
+                            struct TransportFragmentAckMessage,
+                            &cmc),
+    GNUNET_MQ_hd_var_size (reliability_box,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX,
+                          struct TransportReliabilityBox,
+                          &cmc),
+    GNUNET_MQ_hd_fixed_size (reliability_ack,
+                            GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK,
+                            struct TransportReliabilityAckMessage,
+                            &cmc),
+    GNUNET_MQ_hd_var_size (backchannel_encapsulation,
+                          
GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION,
+                          struct TransportBackchannelEncapsulationMessage,
+                          &cmc),
+    GNUNET_MQ_hd_fixed_size (ephemeral_confirmation,
+                            
GNUNET_MESSAGE_TYPE_TRANSPORT_EPHEMERAL_CONFIRMATION,
+                            struct EphemeralConfirmationMessage,
+                            &cmc),
+    GNUNET_MQ_hd_var_size (dv_learn,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN,
+                          struct TransportDVLearn,
+                          &cmc),
+    GNUNET_MQ_hd_var_size (dv_box,
+                          GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX,
+                          struct TransportDVBox,
+                          &cmc),
+    GNUNET_MQ_handler_end()
+  };
+  int ret;
 
-  GNUNET_SERVICE_client_continue (tc->client);
+  cmc->tc = tc;
+  cmc->im = *im;
+  ret = GNUNET_MQ_handle_message (handlers,
+                                 (const struct GNUNET_MessageHeader *) &im[1]);
+  if (GNUNET_SYSERR == ret)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    GNUNET_free (cmc);
+    return;
+  }
+  if (GNUNET_NO == ret)
+  {
+    /* unencapsulated 'raw' message */
+    handle_raw_message (&cmc,
+                       (const struct GNUNET_MessageHeader *) &im[1]);
+  }
 }
 
 
@@ -2268,6 +2632,23 @@ tracker_update_in_cb (void *cls)
 }
 
 
+/**
+ * If necessary, generates the UUID for a @a pm
+ *
+ * @param pm pending message to generate UUID for.
+ */
+static void
+set_pending_message_uuid (struct PendingMessage *pm)
+{
+  if (pm->msg_uuid_set)
+    return;
+  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                             &pm->msg_uuid,
+                             sizeof (pm->msg_uuid));
+  pm->msg_uuid_set = GNUNET_YES;
+}
+
+
 /**
  * Fragment the given @a pm to the given @a mtu.  Adds 
  * additional fragments to the neighbour as well. If the
@@ -2284,13 +2665,7 @@ fragment_message (struct PendingMessage *pm,
 {
   struct PendingMessage *ff;
 
-  if (GNUNET_NO == pm->msg_uuid_set)
-  {
-    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
-                               &pm->msg_uuid,
-                               sizeof (pm->msg_uuid));
-    pm->msg_uuid_set = GNUNET_YES;
-  }
+  set_pending_message_uuid (pm);
   
   /* This invariant is established in #handle_add_queue_message() */
   GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
@@ -2390,24 +2765,50 @@ fragment_message (struct PendingMessage *pm,
 static struct PendingMessage *
 reliability_box_message (struct PendingMessage *pm)
 {
-  if (PMT_CORE != pm->pmt) 
-  {
-    /* already fragmented or reliability boxed, or control message: do nothing 
*/
-    return pm;
-  }
-  
-  if (0) // FIXME
+  struct TransportReliabilityBox rbox;
+  struct PendingMessage *bpm;
+  char *msg;
+
+  if (PMT_CORE != pm->pmt)
+    return pm;  /* already fragmented or reliability boxed, or control 
message: do nothing */
+  if (NULL != pm->bpm)
+    return pm->bpm; /* already computed earlier: do nothing */
+  GNUNET_assert (NULL == pm->head_frag);
+  if (pm->bytes_msg + sizeof (rbox) > UINT16_MAX) 
   {
     /* failed hard */
-    // FIMXE: bitch
+    GNUNET_break (0);
     client_send_response (pm,
                          GNUNET_NO,
                          0);
     return NULL;
   }
-
-  /* FIXME: return boxed PM here! */
-  return NULL;
+  bpm = GNUNET_malloc (sizeof (struct PendingMessage) +
+                      sizeof (rbox) + 
+                      pm->bytes_msg);
+  bpm->target = pm->target;
+  bpm->frag_parent = pm;
+  GNUNET_CONTAINER_MDLL_insert (frag,
+                               pm->head_frag,
+                               pm->tail_frag,
+                               bpm);
+  bpm->timeout = pm->timeout;
+  bpm->pmt = PMT_RELIABILITY_BOX;
+  bpm->bytes_msg = pm->bytes_msg + sizeof (rbox);
+  set_pending_message_uuid (bpm);
+  rbox.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX);
+  rbox.header.size = htons (sizeof (rbox) + pm->bytes_msg);
+  rbox.ack_countdown = htonl (0); // FIXME: implement ACK countdown support
+  rbox.msg_uuid = pm->msg_uuid;
+  msg = (char *) &bpm[1];
+  memcpy (msg,
+         &rbox,
+         sizeof (rbox));
+  memcpy (&msg[sizeof (rbox)],
+         &pm[1],
+         pm->bytes_msg);
+  pm->bpm = bpm;
+  return bpm;
 }
 
 
@@ -2542,26 +2943,64 @@ transmit_on_queue (void *cls)
   else if (PMT_CORE != pm->pmt)
   {
     /* This was an acknowledgement of some type, always free */
-
-    struct Neighbour *neighbour = pm->target;
-    GNUNET_CONTAINER_MDLL_remove (neighbour,
-                                 neighbour->pending_msg_head,
-                                 neighbour->pending_msg_tail,
-                                 pm);
-    GNUNET_free (pm);
+    free_pending_message (pm);
   }
   else
   {
     /* message not finished, waiting for acknowledgement */
-    // FIXME: update time by which we might retransmit 's' based on
-    // queue characteristics (i.e. RTT)
-    
-    // FIXME: move 'pm' back in the transmission queue (simplistic: to
-    // the end, better: with position depending on type, timeout,
-    // etc.)
+    struct Neighbour *neighbour = pm->target;
+    /* Update time by which we might retransmit 's' based on queue
+       characteristics (i.e. RTT); it takes one RTT for the message to
+       arrive and the ACK to come back in the best case; but the other
+       side is allowed to delay ACKs by 2 RTTs, so we use 4 RTT before
+       retransmitting.  Note that in the future this heuristic should
+       likely be improved further (measure RTT stability, consider
+       message urgency and size when delaying ACKs, etc.) */
+    s->next_attempt = GNUNET_TIME_relative_to_absolute
+      (GNUNET_TIME_relative_multiply (queue->rtt,
+                                     4));
+    if (s == pm)
+    {
+      struct PendingMessage *pos;
+
+      /* re-insert sort in neighbour list */
+      GNUNET_CONTAINER_MDLL_remove (neighbour,
+                                   neighbour->pending_msg_head,
+                                   neighbour->pending_msg_tail,
+                                   pm);
+      pos = neighbour->pending_msg_tail;
+      while ( (NULL != pos) &&
+             (pm->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
+       pos = pos->prev_neighbour;
+      GNUNET_CONTAINER_MDLL_insert_after (neighbour,
+                                         neighbour->pending_msg_head,
+                                         neighbour->pending_msg_tail,
+                                         pos,
+                                         pm);
+    }
+    else
+    {
+      /* re-insert sort in fragment list */
+      struct PendingMessage *fp = s->frag_parent;
+      struct PendingMessage *pos;
+
+      GNUNET_CONTAINER_MDLL_remove (frag,
+                                   fp->head_frag,
+                                   fp->tail_frag,
+                                   s);
+      pos = fp->tail_frag;
+      while ( (NULL != pos) &&
+             (s->next_attempt.abs_value_us > pos->next_attempt.abs_value_us) )
+       pos = pos->prev_frag;
+      GNUNET_CONTAINER_MDLL_insert_after (frag,
+                                         fp->head_frag,
+                                         fp->tail_frag,
+                                         pos,
+                                         s);
+    }
   }
   
-  /* finally, re-schedule self */
+  /* finally, re-schedule queue transmission task itself */
   schedule_transmit_on_queue (queue);
 }
 
diff --git a/src/util/mq.c b/src/util/mq.c
index 4dfcb72be..d2f5add19 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -214,6 +214,35 @@ struct GNUNET_MQ_Handle
 void
 GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
                           const struct GNUNET_MessageHeader *mh)
+{
+  int ret;
+
+  ret = GNUNET_MQ_handle_message (mq->handlers,
+                                 mh);
+  if (GNUNET_SYSERR == ret)
+  {
+    GNUNET_MQ_inject_error (mq,
+                           GNUNET_MQ_ERROR_MALFORMED);
+    return;
+  }
+}
+
+
+/**
+ * Call the message message handler that was registered
+ * for the type of the given message in the given @a handlers list.
+ *
+ * This function is indended to be used for the implementation
+ * of message queues.
+ *
+ * @param handlers a set of handlers
+ * @param mh message to dispatch
+ * @return #GNUNET_OK on success, #GNUNET_NO if no handler matched,
+ *         #GNUNET_SYSERR if message was rejected by check function
+ */
+int
+GNUNET_MQ_handle_message (const struct GNUNET_MQ_MessageHandler *handlers,
+                          const struct GNUNET_MessageHeader *mh)
 {
   const struct GNUNET_MQ_MessageHandler *handler;
   int handled = GNUNET_NO;
@@ -224,9 +253,9 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
        "Received message of type %u and size %u\n",
        mtype, msize);
 
-  if (NULL == mq->handlers)
+  if (NULL == handlers)
     goto done;
-  for (handler = mq->handlers; NULL != handler->cb; handler++)
+  for (handler = handlers; NULL != handler->cb; handler++)
   {
     if (handler->type == mtype)
     {
@@ -240,9 +269,7 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
         LOG (GNUNET_ERROR_TYPE_ERROR,
              "Received malformed message of type %u\n",
              (unsigned int) handler->type);
-       GNUNET_MQ_inject_error (mq,
-                               GNUNET_MQ_ERROR_MALFORMED);
-       break;
+       return GNUNET_SYSERR;
       }
       if ( (NULL == handler->mv) ||
           (GNUNET_OK ==
@@ -257,17 +284,20 @@ GNUNET_MQ_inject_message (struct GNUNET_MQ_Handle *mq,
         LOG (GNUNET_ERROR_TYPE_ERROR,
              "Received malformed message of type %u\n",
              (unsigned int) handler->type);
-       GNUNET_MQ_inject_error (mq,
-                               GNUNET_MQ_ERROR_MALFORMED);
+       return GNUNET_SYSERR;
       }
       break;
     }
   }
  done:
   if (GNUNET_NO == handled)
+  {
     LOG (GNUNET_ERROR_TYPE_INFO,
          "No handler for message of type %u and size %u\n",
          mtype, msize);
+    return GNUNET_NO;
+  }
+  return GNUNET_OK;
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]