gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: working on crazy fragmentat


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: working on crazy fragmentation logic
Date: Tue, 22 Jan 2019 22:55:10 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new ff10602f5 working on crazy fragmentation logic
ff10602f5 is described below

commit ff10602f5ab7df06dc850206159e76bd7a7891ea
Author: Christian Grothoff <address@hidden>
AuthorDate: Tue Jan 22 22:55:05 2019 +0100

    working on crazy fragmentation logic
---
 src/include/gnunet_protocols.h      |  31 +++
 src/transport/gnunet-service-tng.c  | 380 ++++++++++++++++++++++++++++++++----
 src/transport/transport.h           |   4 +-
 src/transport/transport_api2_core.c |   6 +-
 4 files changed, 376 insertions(+), 45 deletions(-)

diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index af4dbd52f..8593005d7 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -3092,6 +3092,37 @@ extern "C"
  */
 #define GNUNET_MESSAGE_TYPE_TRANSPORT_BACKCHANNEL_ENCAPSULATION 1213
 
+/**
+ * Type of a fragment of a CORE message created by transport to adjust
+ * message length to a queue's MTU.
+ */ 
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT 1214
+
+/** 
+ * Acknowledgement generated for a fragment.
+ */ 
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT_ACK 1215
+
+/** 
+ * Wrapper around non-fragmented CORE message used to measure RTT
+ * and ensure reliability.
+ */ 
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX 1216
+
+/** 
+ * Confirmation for a #GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_BOX.
+ */ 
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_RELIABILITY_ACK 1217
+
+/** 
+ * Message sent for topology discovery at transport level.
+ */ 
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_DV_LEARN 1218
+
+/** 
+ * Source-routed transport message based DV information gathered.
+ */ 
+#define GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX 1219
 
 /**
  * Message sent to indicate to the transport that a monitor
diff --git a/src/transport/gnunet-service-tng.c 
b/src/transport/gnunet-service-tng.c
index 3365ea5d5..76d5265a8 100644
--- a/src/transport/gnunet-service-tng.c
+++ b/src/transport/gnunet-service-tng.c
@@ -428,31 +428,50 @@ struct TransportDVLearn
 
 /**
  * Outer layer of an encapsulated message send over multiple hops.
+ * The path given only includes the identities of the subsequent
+ * peers, i.e. it will be empty if we are the receiver. Each
+ * forwarding peer should scan the list from the end, and if it can,
+ * forward to the respective peer. The list should then be shortened
+ * by all the entries up to and including that peer.  Each hop should
+ * also increment @e total_hops to allow the receiver to get a precise
+ * estimate on the number of hops the message travelled.  Senders must
+ * provide a learned path that thus should work, but intermediaries
+ * know of a shortcut, they are allowed to send the message via that
+ * shortcut.
+ *
+ * If a peer finds itself still on the list, it must drop the message.
  */
 struct TransportDVBox
 {
   /**
-   * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV
+   * Type is #GNUNET_MESSAGE_TYPE_TRANSPORT_DV_BOX
    */
   struct GNUNET_MessageHeader header;
 
+  /**
+   * Number of total hops this messages travelled. In NBO.
+   * @e origin sets this to zero, to be incremented at
+   * each hop.
+   */
+  uint16_t total_hops GNUNET_PACKED;
+
   /**
    * Number of hops this messages includes. In NBO.
    */
   uint16_t num_hops GNUNET_PACKED;
   
   /**
-   * Position of our peer in the sequence.
-   * To be incremented at each hop. In NBO.
+   * Identity of the peer that originated the message.
    */
-  uint16_t current_hop GNUNET_PACKED;
+  struct GNUNET_PeerIdentity origin;
 
   /* Followed by @e num_hops `struct GNUNET_PeerIdentity` values;
-     the first is the sender, the last the receiver; the current
-     peer may be one in the middle. */
+     excluding the @e origin and the current peer, the last must be
+     the ultimate target; if @e num_hops is zero, the receiver of this
+     message is the ultimate target. */
 
   /* Followed by the actual message, which itself may be
-     another box, but not a DV_LEARN message! */
+     another box, but not a DV_LEARN or DV_BOX message! */
 };
 
 
@@ -696,7 +715,60 @@ struct Neighbour
 
 
 /**
- * Transmission request from CORE that is awaiting delivery.
+ * Types of different pending messages.
+ */ 
+enum PendingMessageType
+{
+
+  /**
+   * Ordinary message received from the CORE service.
+   */
+  PMT_CORE = 0,
+
+  /**
+   * Fragment box.
+   */
+  PMT_FRAGMENT_BOX = 1,
+
+  /**
+   * Reliability box.
+   */
+  PMT_RELIABILITY_BOX = 2,
+
+  /**
+   * Any type of acknowledgement.
+   */
+  PMT_ACKNOWLEDGEMENT = 3
+
+  
+};
+
+
+/**
+ * Transmission request that is awaiting delivery.  The original
+ * transmission requests from CORE may be too big for some queues.
+ * In this case, a *tree* of fragments is created.  At each
+ * level of the tree, fragments are kept in a DLL ordered by which
+ * fragment should be sent next (at the head).  The tree is searched
+ * top-down, with the original message at the root.
+ *
+ * To select a node for transmission, first it is checked if the
+ * current node's message fits with the MTU.  If it does not, we
+ * either calculate the next fragment (based on @e frag_off) from the
+ * current node, or, if all fragments have already been created,
+ * descend to the @e head_frag.  Even though the node was already
+ * fragmented, the fragment may be too big if the fragment was 
+ * generated for a queue with a larger MTU. In this case, the node
+ * may be fragmented again, thus creating a tree.
+ *
+ * When acknowledgements for fragments are received, the tree
+ * must be pruned, removing those parts that were already 
+ * acknowledged.  When fragments are sent over a reliable 
+ * channel, they can be immediately removed.
+ *
+ * If a message is ever fragmented, then the original "full" message
+ * is never again transmitted (even if it fits below the MTU), and
+ * only (remaining) fragments are sent.
  */
 struct PendingMessage
 {
@@ -711,25 +783,50 @@ struct PendingMessage
   struct PendingMessage *prev_neighbour;
 
   /**
-   * Kept in a MDLL of messages from this @a client.
+   * Kept in a MDLL of messages from this @a client (if @e pmt is #PMT_CORE)
    */
   struct PendingMessage *next_client;
-
+  
   /**
-   * Kept in a MDLL of messages from this @a client.
+   * Kept in a MDLL of messages from this @a client  (if @e pmt is #PMT_CORE)
    */
   struct PendingMessage *prev_client;
 
+  /**
+   * Kept in a MDLL of messages from this @a cpm (if @e pmt is 
#PMT_FRAGMENT_BOx)
+   */
+  struct PendingMessage *next_frag;
+  
+  /**
+   * Kept in a MDLL of messages from this @a cpm  (if @e pmt is 
#PMT_FRAGMENT_BOX)
+   */
+  struct PendingMessage *prev_frag;
+    
   /**
    * Target of the request.
    */
   struct Neighbour *target;
-
+      
   /**
-   * Client that issued the transmission request.
+   * Client that issued the transmission request, if @e pmt is #PMT_CORE.
    */
   struct TransportClient *client;
+  
+  /**
+   * Head of a MDLL of fragments created for this core message.
+   */
+  struct PendingMessage *head_frag;
+  
+  /**
+   * Tail of a MDLL of fragments created for this core message.
+   */
+  struct PendingMessage *tail_frag;
 
+  /**
+   * Our parent in the fragmentation tree.
+   */
+  struct PendingMessage *frag_parent;
+    
   /**
    * At what time should we give up on the transmission (and no longer retry)?
    */
@@ -739,12 +836,38 @@ struct PendingMessage
    * What is the earliest time for us to retry transmission of this message?
    */
   struct GNUNET_TIME_Absolute next_attempt;
+
+  /**
+   * UUID to use for this message (used for reassembly of fragments, only
+   * initialized if @e msg_uuid_set is #GNUNET_YES).
+   */
+  struct GNUNET_ShortHashCode msg_uuid;
+
+  /**
+   * Counter incremented per generated fragment.
+   */ 
+  uint32_t frag_uuidgen;
   
+  /**
+   * Type of the pending message.
+   */
+  enum PendingMessageType pmt;
+
   /**
    * Size of the original message.
    */
-  uint32_t bytes_msg;
+  uint16_t bytes_msg;
 
+  /**
+   * Offset at which we should generate the next fragment.
+   */ 
+  uint16_t frag_off;
+
+  /**
+   * #GNUNET_YES once @e msg_uuid was initialized
+   */
+  int16_t msg_uuid_set;
+  
   /* Followed by @e bytes_msg to transmit */
 };
 
@@ -1482,6 +1605,28 @@ check_client_send (void *cls,
 }
 
 
+/**
+ * Free fragment tree below @e root, excluding @e root itself.
+ *
+ * @param root root of the tree to free
+ */  
+static void
+free_fragment_tree (struct PendingMessage *root)
+{
+  struct PendingMessage *frag;
+
+  while (NULL != (frag = root->head_frag))
+  {
+    free_fragment_tree (frag);
+    GNUNET_CONTAINER_MDLL_remove (frag,
+                                 root->head_frag,
+                                 root->tail_frag,
+                                 frag);
+    GNUNET_free (frag);
+  }
+}
+
+
 /**
  * Send a response to the @a pm that we have processed a
  * "send" request with status @a success. We
@@ -1509,7 +1654,7 @@ client_send_response (struct PendingMessage *pm,
     env = GNUNET_MQ_msg (som,
                         GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK);
     som->success = htonl ((uint32_t) success);
-    som->bytes_msg = htonl (pm->bytes_msg);
+    som->bytes_msg = htons (pm->bytes_msg);
     som->bytes_physical = htonl (bytes_physical);
     som->peer = target->pid;
     GNUNET_MQ_send (tc->mq,
@@ -1523,6 +1668,7 @@ client_send_response (struct PendingMessage *pm,
                                target->pending_msg_head,
                                target->pending_msg_tail,
                                pm);
+  free_fragment_tree (pm);
   GNUNET_free (pm);
 }
 
@@ -2024,18 +2170,98 @@ static struct PendingMessage *
 fragment_message (struct PendingMessage *pm,
                  uint16_t mtu)
 {
-  if (0)
+  struct PendingMessage *ff;
+
+  if (GNUNET_NO == pm->msg_uuid_set)
   {
-    /* mtu too small */
-    // FIMXE: bitch
-    client_send_response (pm,
-                         GNUNET_NO,
-                         0);
-    return NULL;
+    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                               &pm->msg_uuid,
+                               sizeof (pm->msg_uuid));
+    pm->msg_uuid_set = GNUNET_YES;
+  }
+  
+  /* This invariant is established in #handle_add_queue_message() */
+  GNUNET_assert (mtu > sizeof (struct TransportFragmentBox));
+
+  /* select fragment for transmission, descending the tree if it has
+     been expanded until we are at a leaf or at a fragment that is small 
enough */
+  ff = pm;
+  while ( ( (ff->bytes_msg > mtu) ||
+           (pm == ff) ) &&
+         (ff->frag_off == ff->bytes_msg) &&
+         (NULL != ff->head_frag) )
+  {
+    ff = ff->head_frag; /* descent into fragmented fragments */
   }
 
-  /* FIXME: return first fragment here! */
-  return NULL;
+  if ( ( (ff->bytes_msg > mtu) ||
+        (pm == ff) ) &&
+       (pm->frag_off < pm->bytes_msg) )
+  {
+    /* Did not yet calculate all fragments, calculate next fragment */
+    struct PendingMessage *frag;
+    struct TransportFragmentBox tfb;
+    const char *orig;
+    char *msg;
+    uint16_t fragmax;
+    uint16_t fragsize;
+    uint16_t msize;
+    uint16_t xoff = 0;
+
+    orig = (const char *) &ff[1];
+    msize = ff->bytes_msg;
+    if (pm != ff)
+    {
+      const struct TransportFragmentBox *tfbo;
+
+      tfbo = (const struct TransportFragmentBox *) orig;
+      orig += sizeof (struct TransportFragmentBox);
+      msize -= sizeof (struct TransportFragmentBox);
+      xoff = ntohs (tfbo->frag_off);
+    }
+    fragmax = mtu - sizeof (struct TransportFragmentBox);
+    fragsize = GNUNET_MIN (msize - ff->frag_off,
+                          fragmax);
+    frag = GNUNET_malloc (sizeof (struct PendingMessage) +
+                         sizeof (struct TransportFragmentBox) +
+                         fragsize);
+    frag->target = pm->target;
+    frag->frag_parent = ff;
+    frag->timeout = pm->timeout;
+    frag->bytes_msg = sizeof (struct TransportFragmentBox) + fragsize;
+    frag->pmt = PMT_FRAGMENT_BOX;
+    msg = (char *) &frag[1];
+    tfb.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_FRAGMENT);
+    tfb.header.size = htons (sizeof (struct TransportFragmentBox) +
+                            fragsize);
+    tfb.frag_uuid = htonl (pm->frag_uuidgen++);
+    tfb.msg_uuid = pm->msg_uuid;
+    tfb.frag_off = htons (ff->frag_off + xoff);
+    tfb.msg_size = htons (pm->bytes_msg);
+    memcpy (msg,
+           &tfb,
+           sizeof (tfb));
+    memcpy (&msg[sizeof (tfb)],
+           &orig[ff->frag_off],
+           fragsize);
+    GNUNET_CONTAINER_MDLL_insert (frag,
+                                 ff->head_frag,
+                                 ff->tail_frag,
+                                 frag);
+    ff->frag_off += fragsize;
+    ff = frag;
+  }
+
+  /* Move head to the tail and return it */
+  GNUNET_CONTAINER_MDLL_remove (frag,
+                               ff->frag_parent->head_frag,
+                               ff->frag_parent->tail_frag,
+                               ff);
+  GNUNET_CONTAINER_MDLL_insert_tail (frag,
+                                    ff->frag_parent->head_frag,
+                                    ff->frag_parent->tail_frag,
+                                    ff);
+  return ff;
 }
 
 
@@ -2052,11 +2278,12 @@ fragment_message (struct PendingMessage *pm,
 static struct PendingMessage *
 reliability_box_message (struct PendingMessage *pm)
 {
-  if (0) // FIXME
+  if (PMT_CORE != pm->pmt) 
   {
-    /* already fragmented or reliability boxed, do nothing */
+    /* already fragmented or reliability boxed, or control message: do nothing 
*/
     return pm;
   }
+  
   if (0) // FIXME
   {
     /* failed hard */
@@ -2086,6 +2313,7 @@ transmit_on_queue (void *cls)
   struct GNUNET_ATS_Session *queue = cls;
   struct Neighbour *n = queue->neighbour;
   struct PendingMessage *pm;
+  struct PendingMessage *s;
   uint32_t overhead;
 
   queue->transmit_task = NULL;
@@ -2100,38 +2328,102 @@ transmit_on_queue (void *cls)
   overhead = 0;
   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
     overhead += sizeof (struct TransportReliabilityBox);
-  if ( (0 != queue->mtu) &&
-       (pm->bytes_msg + overhead > queue->mtu) )
-    pm = fragment_message (pm,
-                          queue->mtu);
-  if (NULL == pm)
+  s = pm;
+  if ( ( (0 != queue->mtu) &&
+        (pm->bytes_msg + overhead > queue->mtu) ) ||
+       (NULL != pm->head_frag /* fragments already exist, should
+                                respect that even if MTU is 0 for
+                                this queue */) )
+    s = fragment_message (s,
+                         (0 == queue->mtu)
+                         ? UINT16_MAX /* no real maximum */
+                         : queue->mtu);
+  if (NULL == s)
   {
     /* Fragmentation failed, try next message... */
     schedule_transmit_on_queue (queue);
     return;
   }
   if (GNUNET_TRANSPORT_CC_RELIABLE != queue->tc->details.communicator.cc)
-    pm = reliability_box_message (pm);
-  if (NULL == pm)
+    s = reliability_box_message (s);
+  if (NULL == s)
   {
     /* Reliability boxing failed, try next message... */
     schedule_transmit_on_queue (queue);
     return;
   }
   
-  // FIXME: actually do transmission
+  // FIXME: actually give 's' to communicator for transmission here!
 
-  // FIXME: unless 'pm' is an ACK or control, move 'pm' back in the
-  // transmission queue (simplistic: to the end, better: with position
-  // depending on type, timeout, etc.)
-  
-  // FIXME: do something similar in defragmentation / reliability ACK handling!
-  if (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc)
+  // FIXME: do something similar to the logic below
+  // in defragmentation / reliability ACK handling!
+
+  /* Check if this transmission somehow conclusively finished handing 'pm'
+     even without any explicit ACKs */
+  if ( (PMT_CORE == s->pmt) &&
+       (GNUNET_TRANSPORT_CC_RELIABLE == queue->tc->details.communicator.cc) )
   {
+    /* Full message sent, and over reliabile channel */
     client_send_response (pm,
                          GNUNET_YES,
                          pm->bytes_msg);
   }
+  else if ( (GNUNET_TRANSPORT_CC_RELIABLE == 
queue->tc->details.communicator.cc) &&
+           (PMT_FRAGMENT_BOX == s->pmt) )
+  {
+    struct PendingMessage *pos;
+    
+    /* Fragment sent over reliabile channel */
+    free_fragment_tree (s);
+    pos = s->frag_parent;
+    GNUNET_CONTAINER_MDLL_remove (frag,
+                                 pos->head_frag,
+                                 pos->tail_frag,
+                                 s);
+    GNUNET_free (s);
+    /* check if subtree is done */
+    while ( (NULL == pos->head_frag) &&
+           (pos->frag_off == pos->bytes_msg) &&
+           (pos != pm) )
+    {
+      s = pos;
+      pos = s->frag_parent;
+      GNUNET_CONTAINER_MDLL_remove (frag,
+                                   pos->head_frag,
+                                   pos->tail_frag,
+                                   s);
+      GNUNET_free (s);      
+    }
+    
+    /* Was this the last applicable fragmment? */
+    if ( (NULL == pm->head_frag) &&
+        (pm->frag_off == pm->bytes_msg) )
+      client_send_response (pm,
+                           GNUNET_YES,
+                           pm->bytes_msg /* FIXME: calculate and add 
overheads! */);
+  }
+  else if (PMT_CORE != pm->pmt)
+  {
+    /* This was an acknowledgement of some type, always free */
+
+    struct Neighbour *neighbour = pm->target;
+    GNUNET_CONTAINER_MDLL_remove (neighbour,
+                                 neighbour->pending_msg_head,
+                                 neighbour->pending_msg_tail,
+                                 pm);
+    GNUNET_free (pm);
+  }
+  else
+  {
+    /* message not finished, waiting for acknowledgement */
+    // FIXME: update time by which we might retransmit 's' based on
+    // queue characteristics (i.e. RTT)
+    
+    // FIXME: move 'pm' back in the transmission queue (simplistic: to
+    // the end, better: with position depending on type, timeout,
+    // etc.)
+  }
+  
   /* finally, re-schedule self */
   schedule_transmit_on_queue (queue);
 }
@@ -2217,6 +2509,14 @@ handle_add_queue_message (void *cls,
   const char *addr;
   uint16_t addr_len;
 
+  if (ntohl (aqm->mtu) <= sizeof (struct TransportFragmentBox))
+  {
+    /* MTU so small as to be useless for transmissions,
+       required for #fragment_message()! */
+    GNUNET_break_op (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
   neighbour = lookup_neighbour (&aqm->receiver);
   if (NULL == neighbour)
   {
diff --git a/src/transport/transport.h b/src/transport/transport.h
index 00d475e2b..6b1a2cac1 100644
--- a/src/transport/transport.h
+++ b/src/transport/transport.h
@@ -223,12 +223,12 @@ struct SendOkMessage
    * in either case, it is now OK for this client to
    * send us another message for the given peer.
    */
-  uint32_t success GNUNET_PACKED;
+  uint16_t success GNUNET_PACKED;
 
   /**
    * Size of message sent
    */
-  uint32_t bytes_msg GNUNET_PACKED;
+  uint16_t bytes_msg GNUNET_PACKED;
 
   /**
    * Size of message sent over wire
diff --git a/src/transport/transport_api2_core.c 
b/src/transport/transport_api2_core.c
index 607f26777..f00d00a44 100644
--- a/src/transport/transport_api2_core.c
+++ b/src/transport/transport_api2_core.c
@@ -600,15 +600,15 @@ handle_send_ok (void *cls,
 {
   struct GNUNET_TRANSPORT_CoreHandle *h = cls;
   struct Neighbour *n;
-  uint32_t bytes_msg;
+  uint16_t bytes_msg;
   uint32_t bytes_physical;
 
-  bytes_msg = ntohl (okm->bytes_msg);
+  bytes_msg = ntohs (okm->bytes_msg);
   bytes_physical = ntohl (okm->bytes_physical);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Receiving SEND_OK message, transmission to %s %s.\n",
        GNUNET_i2s (&okm->peer),
-       (GNUNET_OK == ntohl (okm->success))
+       (GNUNET_OK == ntohs (okm->success))
        ? "succeeded"
        : "failed");
   n = neighbour_find (h,

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]