gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r15914 - in gnunet/src: fragmentation include


From: gnunet
Subject: [GNUnet-SVN] r15914 - in gnunet/src: fragmentation include
Date: Sat, 9 Jul 2011 19:25:44 +0200

Author: grothoff
Date: 2011-07-09 19:25:43 +0200 (Sat, 09 Jul 2011)
New Revision: 15914

Modified:
   gnunet/src/fragmentation/defragmentation_new.c
   gnunet/src/include/gnunet_fragmentation_lib.h
   gnunet/src/include/gnunet_protocols.h
Log:
defrag

Modified: gnunet/src/fragmentation/defragmentation_new.c
===================================================================
--- gnunet/src/fragmentation/defragmentation_new.c      2011-07-09 16:48:59 UTC 
(rev 15913)
+++ gnunet/src/fragmentation/defragmentation_new.c      2011-07-09 17:25:43 UTC 
(rev 15914)
@@ -35,7 +35,7 @@
   /**
    * The time the fragment was received.
    */
-  GNUNET_TIME_Absolute time;
+  struct GNUNET_TIME_Absolute time;
 
   /**
    * Number of the bit for the fragment (in [0,..,63]).
@@ -45,7 +45,10 @@
 
 
 /**
- * Information we keep for one message that is being assembled.
+ * Information we keep for one message that is being assembled.  Note
+ * that we keep the context around even after the assembly is done to
+ * handle 'stray' messages that are received 'late'.  A message
+ * context is ONLY discarded when the queue gets too big.
  */
 struct MessageContext
 {
@@ -60,6 +63,11 @@
   struct MessageContext *prev;
 
   /**
+   * Associated defragmentation context.
+   */
+  struct GNUNET_DEFRAGMENT_Context *dc;
+
+  /**
    * Pointer to the assembled message, allocated at the
    * end of this struct.
    */ 
@@ -76,7 +84,7 @@
    * Task scheduled for transmitting the next ACK to the
    * other peer.
    */
-  struct GNUNET_SCHEDULER_TaskIdentifier ack_task;
+  GNUNET_SCHEDULER_TaskIdentifier ack_task;
 
   /**
    * When did we receive which fragment? Used to calculate
@@ -127,11 +135,6 @@
   struct GNUNET_STATISTICS_Handle *stats;
 
   /**
-   * Closure for 'proc' and 'ackp'.
-   */
-  void *cls;
-
-  /**
    * Head of list of messages we're defragmenting.
    */
   struct MessageContext *head;
@@ -142,6 +145,11 @@
   struct MessageContext *tail;
 
   /**
+   * Closure for 'proc' and 'ackp'.
+   */
+  void *cls;
+
+  /**
    * Function to call with defragmented messages.
    */
   GNUNET_FRAGMENT_MessageProcessor proc;
@@ -169,7 +177,10 @@
    */
   unsigned int list_size;
 
-
+  /**
+   * Maximum message size for each fragment.
+   */ 
+  uint16_t mtu;
 };
 
 
@@ -177,6 +188,7 @@
  * Create a defragmentation context.
  *
  * @param stats statistics context
+ * @param mtu the maximum message size for each fragment 
  * @param num_msgs how many fragmented messages
  *                 to we defragment at most at the same time?
  * @param cls closure for proc and ackp
@@ -187,6 +199,7 @@
  */
 struct GNUNET_DEFRAGMENT_Context *
 GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
+                                 uint16_t mtu,
                                  unsigned int num_msgs,
                                  void *cls,
                                  GNUNET_FRAGMENT_MessageProcessor proc,
@@ -200,6 +213,7 @@
   dc->proc = proc;
   dc->ackp = ackp;
   dc->num_msgs = num_msgs;
+  dc->mtu = mtu;
   dc->latency = GNUNET_TIME_UNIT_SECONDS; /* start with likely overestimate */
   return dc;
 }
@@ -213,11 +227,50 @@
 void 
 GNUNET_DEFRAGMENT_context_destroy (struct GNUNET_DEFRAGMENT_Context *dc)
 {
+  struct MessageContext *mc;
+
+  while (NULL != (mc = dc->head))
+    {
+      GNUNET_CONTAINER_DLL_remove (dc->head,
+                                  dc->tail,
+                                  mc);
+      dc->list_size--;
+      if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+       {
+         GNUNET_SCHEDULER_cancel (mc->ack_task);
+         mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+       }
+      GNUNET_free (mc);
+    }
+  GNUNET_assert (0 == dc->list_size);
   GNUNET_free (dc);
 }
 
 
 /**
+ * Send acknowledgement to the other peer now.
+ *
+ * @param cls the message context
+ * @param tc the scheduler context
+ */
+static void
+send_ack (void *cls,
+         const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct MessageContext *mc = cls;
+  struct GNUNET_DEFRAGMENT_Context *dc = mc->dc;
+  struct FragmentAcknowledgement fa;
+
+  mc->ack_task = GNUNET_SCHEDULER_NO_TASK;
+  fa.header.size = htons (sizeof (struct FragmentAcknowledgement));
+  fa.header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT_ACK);
+  fa.fragment_id = htonl (mc->fragment_id);
+  fa.bits = GNUNET_htonll (mc->bits);
+  dc->ackp (dc->cls, &fa.header);
+}
+
+
+/**
  * We have received a fragment.  Process it.
  *
  * @param dc the context
@@ -227,6 +280,127 @@
 GNUNET_DEFRAGMENT_process_fragment (struct GNUNET_DEFRAGMENT_Context *dc,
                                    const struct GNUNET_MessageHeader *msg)
 {
+  struct MessageContext *mc;
+  const struct FragmentHeader *fh;
+  uint16_t msize;
+  uint16_t foff;
+  uint32_t fid;
+  char *mbuf;
+  unsigned int bit;
+  struct GNUNET_TIME_Absolute now;
+  struct GNUNET_TIME_Relative delay;
+
+  if (ntohs(msg->size) < sizeof (struct FragmentHeader))
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+  if (ntohs (msg->size) > dc->mtu)
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+  fh = (const struct FragmentHeader*) msg;
+  msize = ntohs (fh->total_size);
+  fid = ntohl (fh->fragment_id);
+  foff = ntohl (fh->offset);
+  if (foff >= msize)
+    {
+      GNUNET_break_op (0);
+      return;
+    }
+  GNUNET_STATISTICS_update (dc->stats,
+                           _("Fragments received"),
+                           1,
+                           GNUNET_NO);
+  mc = dc->head;
+  while ( (NULL != mc) &&
+         (fid != mc->fragment_id) )
+    mc = mc->next;
+  bit = foff / dc->mtu;
+  if (bit * dc->mtu + ntohs (msg->size) 
+      - sizeof (struct FragmentHeader) > msize)
+    {
+      /* payload extends past total message size */
+      GNUNET_break_op (0);
+      return;
+    }
+  if ( (NULL != mc) && (msize != mc->total_size) )
+    {
+      /* inconsistent message size */
+      GNUNET_break_op (0);
+      return;
+    }
+  now = GNUNET_TIME_absolute_get ();
+  if (NULL == mc)
+    {
+      mc = GNUNET_malloc (sizeof (struct MessageContext) + msize);
+      mc->msg = (const struct GNUNET_MessageHeader*) &mc[1];
+      mc->dc = dc;
+      mc->total_size = msize;
+      mc->fragment_id = fid;      
+      mc->last_update = now;
+      mc->bits = (msize + dc->mtu - 1) / (dc->mtu - sizeof (struct 
FragmentHeader));   
+      GNUNET_CONTAINER_DLL_insert (dc->head,
+                                  dc->tail,
+                                  mc);
+      dc->list_size++;
+      if (dc->list_size > dc->num_msgs)
+       {
+         /* FIXME: discard oldest entry... */
+       }
+    }
+
+  /* copy data to 'mc' */
+  if (0 != (mc->bits & (1 << bit)))
+    {
+      mc->bits -= 1 << bit;
+      mbuf = (char* )&mc[1];
+      memcpy (&mbuf[bit * dc->mtu],
+             &fh[1],
+             ntohs (msg->size) - sizeof (struct FragmentHeader));
+      mc->last_update = now;
+      mc->frag_times[mc->frag_times_write_offset].time = now;
+      mc->frag_times[mc->frag_times_write_offset].bit = bit;
+      mc->frag_times_write_offset++;
+      if (0 == mc->bits)       
+       {
+         /* message complete, notify! */
+         dc->proc (dc->cls,
+                   mc->msg);
+         GNUNET_STATISTICS_update (dc->stats,
+                                   _("Messages defragmented"),
+                                   1,
+                                   GNUNET_NO);
+       }
+    }
+  else
+    {
+      GNUNET_STATISTICS_update (dc->stats,
+                               _("Duplicate fragments received"),
+                               1,
+                               GNUNET_NO);
+    }
+
+  /* FIXME: update ACK timer (if 0==mc->bits, always ACK now!) */
+  delay = GNUNET_TIME_UNIT_SECONDS; /* FIXME: bad! */
+  if (mc->frag_times_write_offset == 1)
+    {
+      /* FIXME: use number-of-fragments * dc->delay */
+    }
+  else
+    {
+      /* FIXME: use best-fit regression */
+    }
+  /* FIXME: update dc->latency! */
+
+  if (0 == mc->bits)
+    delay = GNUNET_TIME_UNIT_ZERO;
+  if (GNUNET_SCHEDULER_NO_TASK != mc->ack_task)
+    GNUNET_SCHEDULER_cancel (mc->ack_task);
+  mc->ack_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                              &send_ack,
+                                              mc);
 }
 
 /* end of defragmentation_new.c */

Modified: gnunet/src/include/gnunet_fragmentation_lib.h
===================================================================
--- gnunet/src/include/gnunet_fragmentation_lib.h       2011-07-09 16:48:59 UTC 
(rev 15913)
+++ gnunet/src/include/gnunet_fragmentation_lib.h       2011-07-09 17:25:43 UTC 
(rev 15914)
@@ -112,7 +112,7 @@
 
 
 /**
- * Defragmentation context.
+ * Defragmentation context (one per connection).
  */
 struct GNUNET_DEFRAGMENT_Context;
 
@@ -121,6 +121,9 @@
  * Create a defragmentation context.
  *
  * @param stats statistics context
+ * @param mtu the maximum message size for each fragment 
+ * @param num_msgs how many fragmented messages
+ *                 to we defragment at most at the same time?
  * @param cls closure for proc and ackp
  * @param proc function to call with defragmented messages
  * @param ackp function to call with acknowledgements (to send
@@ -129,6 +132,8 @@
  */
 struct GNUNET_DEFRAGMENT_Context *
 GNUNET_DEFRAGMENT_context_create (struct GNUNET_STATISTICS_Handle *stats,
+                                 uint16_t mtu,
+                                 unsigned int num_msgs,
                                  void *cls,
                                  GNUNET_FRAGMENT_MessageProcessor proc,
                                  GNUNET_FRAGMENT_MessageProcessor ackp);

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2011-07-09 16:48:59 UTC (rev 
15913)
+++ gnunet/src/include/gnunet_protocols.h       2011-07-09 17:25:43 UTC (rev 
15914)
@@ -103,6 +103,12 @@
 #define GNUNET_MESSAGE_TYPE_FRAGMENT 18
 
 /**
+ * Acknowledgement of a FRAGMENT of a larger message.
+ * Managed by libgnunetfragment.
+ */
+#define GNUNET_MESSAGE_TYPE_FRAGMENT_ACK 19
+
+/**
  * Message from the core saying that the transport
  * server should start giving it messages.  This
  * should automatically trigger the transmission of




reply via email to

[Prev in Thread] Current Thread [Next in Thread]