gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r15911 - gnunet/src/fragmentation


From: gnunet
Subject: [GNUnet-SVN] r15911 - gnunet/src/fragmentation
Date: Sat, 9 Jul 2011 18:21:15 +0200

Author: grothoff
Date: 2011-07-09 18:21:14 +0200 (Sat, 09 Jul 2011)
New Revision: 15911

Modified:
   gnunet/src/fragmentation/fragmentation.h
   gnunet/src/fragmentation/fragmentation_new.c
Log:
fragging

Modified: gnunet/src/fragmentation/fragmentation.h
===================================================================
--- gnunet/src/fragmentation/fragmentation.h    2011-07-09 11:54:47 UTC (rev 
15910)
+++ gnunet/src/fragmentation/fragmentation.h    2011-07-09 16:21:14 UTC (rev 
15911)
@@ -46,6 +46,13 @@
 
   struct GNUNET_MessageHeader header;
 
+  /**
+   * Bits that are being acknowledged, in big-endian.
+   * (bits that are set correspond to fragments that
+   * have not yet been received).
+   */
+  uint64_t bits;
+
 };
 
 

Modified: gnunet/src/fragmentation/fragmentation_new.c
===================================================================
--- gnunet/src/fragmentation/fragmentation_new.c        2011-07-09 11:54:47 UTC 
(rev 15910)
+++ gnunet/src/fragmentation/fragmentation_new.c        2011-07-09 16:21:14 UTC 
(rev 15911)
@@ -48,6 +48,11 @@
   struct GNUNET_TIME_Relative delay;
 
   /**
+   * Time we transmitted the last message of the last round.
+   */
+  struct GNUNET_TIME_Absolute last_round;
+
+  /**
    * Message to fragment (allocated at the end of this struct).
    */
   const struct GNUNET_MessageHeader *msg;
@@ -73,6 +78,16 @@
   GNUNET_SCHEDULER_TaskIdentifier task;
 
   /**
+   * Round-robin selector for the next transmission.
+   */
+  unsigned int next_transmission;
+
+  /**
+   * GNUNET_YES if we are waiting for an ACK.
+   */
+  int wack;
+
+  /**
    * Target fragment size.
    */
   uint16_t mtu;
@@ -91,8 +106,85 @@
               const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct GNUNET_FRAGMENT_Context *fc = cls;
+  char msg[fc->mtu];
+  const char *mbuf;
+  struct FragmentHeader *fh;
+  struct GNUNET_TIME_Relative delay;
+  unsigned int bit;
+  size_t size;
+  size_t fsize;
+  int wrap;
 
   fc->task = GNUNET_SCHEDULER_NO_TASK;
+  if (0 == fc->acks)
+    return; /* all done */
+
+  /* calculate delay */
+  wrap = 0;
+  while (0 == (fc->acks & (1 << fc->next_transmission)))    
+    {
+      fc->next_transmission = (fc->next_transmission + 1) % 64;
+      wrap |= (fc->next_transmission == 0);
+    }
+  bit = fc->next_transmission;
+  size = ntohs (fc->msg->size);
+  if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+    fsize = size % (fc->mtu - sizeof (struct FragmentHeader)) + sizeof (struct 
FragmentHeader);
+  else
+    fsize = fc->mtu;
+  delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+                                             fsize);
+  if (delay.rel_value > 0)
+    {
+      fc->task = GNUNET_SCHEDULER_add_delayed 
(GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+                                                                               
   fc->mtu),
+                                              &transmit_next,
+                                              fc);
+      return;
+    }
+  fc->next_transmission = (fc->next_transmission + 1) % 64;
+  wrap |= (fc->next_transmission == 0);
+
+  /* assemble fragmentation message */
+  mbuf = (const char*) &fc[1];
+  fh = (struct FragmentHeader*) msg;
+  fh->header.size = htons (fsize);
+  fh->header.type = htons (GNUNET_MESSAGE_TYPE_FRAGMENT);
+  /* FIXME: add specific ID info... */
+  memcpy (&fc[1],
+         &mbuf[bit * (fc->mtu - sizeof (struct FragmentHeader))], 
+         fsize - sizeof (struct FragmentHeader));
+  fc->proc (fc->proc_cls, &fh->header);
+  GNUNET_BANDWIDTH_tracker_consume (fc->tracker, fsize);
+  GNUNET_STATISTICS_update (fc->stats,
+                           _("Fragments transmitted"),
+                           1, GNUNET_NO);
+  if (0 != fc->last_round.abs_value)
+    GNUNET_STATISTICS_update (fc->stats,
+                             _("Fragments retransmitted"),
+                             1, GNUNET_NO);
+
+  /* select next message to calculate delay */
+  bit = fc->next_transmission;
+  size = ntohs (fc->msg->size);
+  if (bit == size / (fc->mtu - sizeof (struct FragmentHeader)))
+    fsize = size % (fc->mtu - sizeof (struct FragmentHeader));
+  else
+    fsize = fc->mtu;
+  delay = GNUNET_BANDWIDTH_tracker_get_delay (fc->tracker,
+                                             fsize);
+  if (wrap)
+    {
+      /* full round transmitted wait 2x delay for ACK before going again */
+      delay = GNUNET_TIME_relative_max (GNUNET_TIME_relative_multiply (delay, 
2),
+                                       fc->delay);
+      fc->last_round = GNUNET_TIME_absolute_get ();
+      fc->wack = GNUNET_YES;
+    }
+  fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay 
(fc->tracker,
+                                                                              
fc->mtu),
+                                          &transmit_next,
+                                          fc);
 }
 
 
@@ -127,8 +219,14 @@
   size_t size;
   uint64_t bits;
   
+  GNUNET_STATISTICS_update (stats,
+                           _("Messages fragmented"),
+                           1, GNUNET_NO);
   GNUNET_assert (mtu >= 1024 + sizeof (struct FragmentHeader));
   size = ntohs (msg->size);
+  GNUNET_STATISTICS_update (stats,
+                           _("Total size of fragmented messages"),
+                           size, GNUNET_NO);
   GNUNET_assert (size > mtu);
   fc = GNUNET_malloc (sizeof (struct GNUNET_FRAGMENT_Context) + size);
   fc->stats = stats;
@@ -145,9 +243,8 @@
     fc->acks = UINT64_MAX;      /* set all 64 bit */
   else
     fc->acks = (1 << bits) - 1; /* set lowest 'bits' bit */
-  fc->task = GNUNET_SCHEDULER_add_delayed (GNUNET_BANDWIDTH_tracker_get_delay 
(tracker, mtu),
-                                          &transmit_next,
-                                          fc);
+  fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
+                                      fc);
   return fc;
 }
 
@@ -167,7 +264,45 @@
 GNUNET_FRAGMENT_process_ack (struct GNUNET_FRAGMENT_Context *fc,
                             const struct GNUNET_MessageHeader *msg)
 {
-  return GNUNET_SYSERR;
+  const struct FragmentAcknowledgement *fa;
+  uint64_t abits;
+  struct GNUNET_TIME_Relative ndelay;
+
+  if (sizeof (struct FragmentAcknowledgement) !=
+      ntohs (msg->size))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+  fa = (const struct FragmentAcknowledgement *) msg;
+  abits = GNUNET_ntohll (fa->bits);
+  /* FIXME: match FA to us... */
+
+  if (GNUNET_YES == fc->wack)
+    {
+      /* normal ACK, can update running average of delay... */
+      fc->wack = GNUNET_NO;
+      ndelay = GNUNET_TIME_absolute_get_duration (fc->last_round);
+      fc->delay.rel_value = (ndelay.rel_value + 3 * fc->delay.rel_value) / 4;
+    }
+    
+  fc->acks &= abits;
+  if (0 != fc->acks)
+    {
+      /* more to transmit, do so right now (if tracker permits...) */
+      GNUNET_SCHEDULER_cancel (fc->task);
+      fc->task = GNUNET_SCHEDULER_add_now (&transmit_next,
+                                          fc);
+      return GNUNET_NO;
+    }
+
+  /* all done */
+  if (fc->task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (fc->task);
+      fc->task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  return GNUNET_OK;
 }
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]