gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r400 - in GNUnet: . src/server src/transports


From: grothoff
Subject: [GNUnet-SVN] r400 - in GNUnet: . src/server src/transports
Date: Mon, 7 Mar 2005 00:14:12 -0800 (PST)

Author: grothoff
Date: 2005-03-07 00:14:10 -0800 (Mon, 07 Mar 2005)
New Revision: 400

Modified:
   GNUnet/src/server/connection.c
   GNUnet/src/transports/tcp.c
   GNUnet/todo
Log:
TCP MTU change - only partially tested, beware

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2005-03-07 04:47:20 UTC (rev 399)
+++ GNUnet/src/server/connection.c      2005-03-07 08:14:10 UTC (rev 400)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2001, 2002, 2003, 2004 Christian Grothoff (and other contributing 
authors)
+     (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other 
contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -311,7 +311,7 @@
   PeerIdentity sender;
 
   /**
-   * The MTU for this session.
+   * The MTU for this session, 0 for streaming transports.
    */
   unsigned short mtu;
 
@@ -828,6 +828,7 @@
   int tailpos;
   int approxProb;
   int remainingBufferSize;
+  unsigned int totalMessageSize;
 
   ENTRY();
   /* fast ways out */
@@ -845,15 +846,23 @@
     return; /* nothing to send */    
   }
 
+
+
   /* recompute max send frequency */
   if (be->max_bpm <= 0)
     be->max_bpm = 1;
-  
-  be->MAX_SEND_FREQUENCY = /* ms per message */
-    be->session.mtu  /* byte per message */
-    / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
-    / 2; /* some head-room */
-  
+
+  if (be->session.mtu == 0) {
+    be->MAX_SEND_FREQUENCY = /* ms per message */
+      EXPECTED_MTU 
+      / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
+      / 2;
+  } else {
+    be->MAX_SEND_FREQUENCY = /* ms per message */
+      be->session.mtu  /* byte per message */
+      / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
+      / 2; /* some head-room */
+  }
   /* Also: allow at least MINIMUM_SAMPLE_COUNT knapsack
      solutions for any MIN_SAMPLE_TIME! */
   if (be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
@@ -867,84 +876,130 @@
 #endif
     return; /* frequency too high, wait */
   }
-  /* solve knapsack problem, compute accumulated priority */
-  knapsackSolution = MALLOC(sizeof(int) * be->sendBufferSize);
 
-  approxProb = getCPULoad();
-  if (approxProb > 50) {
-    if (approxProb > 100)
-      approxProb = 100;
-    approxProb = 100 - approxProb; /* now value between 0 and 50 */
-    approxProb *= 2; /* now value between 0 [always approx] and 100 [never 
approx] */
-    /* control CPU load probabilistically! */
-    if (randomi(1+approxProb) == 0) { 
-      priority = approximateKnapsack(be,
-                                    be->session.mtu - sizeof(P2P_Message),
-                                    knapsackSolution);
+  /* test if receiver has enough bandwidth available!  */
+  updateCurBPS(be);
+#if DEBUG_CONNECTION
+  LOG(LOG_DEBUG,
+      "receiver window available: %lld bytes (MTU: %u)\n",
+      be->available_send_window,
+      be->session.mtu);
+#endif
+
+
+  if (be->session.mtu == 0) {
+    SendEntry ** entries;
+
+    entries = be->sendBuffer;
+    totalMessageSize = 0;
+    knapsackSolution = MALLOC(sizeof(int) * be->sendBufferSize);
+    priority = 0;
+    i = 0;
+    /* assumes entries are sorted by priority! */
+    while (i < be->sendBufferSize) {
+      if ( (totalMessageSize + entries[i]->len < 60000) &&
+          (entries[i]->pri >= EXTREME_PRIORITY) ) {
+       knapsackSolution[i] = YES;
+       priority += entries[i]->pri;
+       totalMessageSize += entries[i]->len;
+      } else {
+       break;
+      }
+      i++;
+    }    
+    while ( (i < be->sendBufferSize) &&
+           (be->available_send_window > totalMessageSize) ) {
+      if (entries[i]->len + totalMessageSize <=
+         be->available_send_window) {
+       knapsackSolution[i] = YES;
+       totalMessageSize += entries[i]->len;
+       priority += entries[i]->pri;
+      } else {
+       knapsackSolution[i] = NO;
+       if (totalMessageSize == 0) {
+         /* if the highest-priority message does not yet
+            fit, wait for send window to grow so that
+            we can get it out (otherwise we would starve
+            high-priority, large messages) */
+         FREE(knapsackSolution);
+         return;
+       }
+      }
+      i++;
+    }
+  } else { /* if (be->session.mtu == 0) */
+    /* solve knapsack problem, compute accumulated priority */
+    knapsackSolution = MALLOC(sizeof(int) * be->sendBufferSize);
+    
+    approxProb = getCPULoad();
+    if (approxProb > 50) {
+      if (approxProb > 100)
+       approxProb = 100;
+      approxProb = 100 - approxProb; /* now value between 0 and 50 */
+      approxProb *= 2; /* now value between 0 [always approx] and 100 [never 
approx] */
+      /* control CPU load probabilistically! */
+      if (randomi(1+approxProb) == 0) { 
+       priority = approximateKnapsack(be,
+                                      be->session.mtu - sizeof(P2P_Message),
+                                      knapsackSolution);
 #if DEBUG_COLLECT_PRIO == YES
-      fprintf(prioFile, "%llu 0 %d\n", cronTime(NULL), priority);
+       fprintf(prioFile, "%llu 0 %d\n", cronTime(NULL), priority);
 #endif
-    } else {
+      } else {
+       priority = solveKnapsack(be,
+                                be->session.mtu - sizeof(P2P_Message),
+                                knapsackSolution);
+#if DEBUG_COLLECT_PRIO == YES
+       fprintf(prioFile, "%llu 1 %d\n", cronTime(NULL), priority);
+#endif
+      }
+    } else { /* never approximate < 50% CPU load */
       priority = solveKnapsack(be,
                               be->session.mtu - sizeof(P2P_Message),
                               knapsackSolution);
 #if DEBUG_COLLECT_PRIO == YES
-      fprintf(prioFile, "%llu 1 %d\n", cronTime(NULL), priority);
+      fprintf(prioFile, "%llu 2 %d\n", cronTime(NULL), priority);
 #endif
     }
-  } else { /* never approximate < 50% CPU load */
-    priority = solveKnapsack(be,
-                            be->session.mtu - sizeof(P2P_Message),
-                            knapsackSolution);
-#if DEBUG_COLLECT_PRIO == YES
-      fprintf(prioFile, "%llu 2 %d\n", cronTime(NULL), priority);
-#endif
-  }
-  j = 0;
-  for (i=0;i<be->sendBufferSize;i++)
-    if (knapsackSolution[i] == YES)
-      j++;
-  if (j == 0) {
-    LOG(LOG_ERROR,
-       _("'%s' selected %d out of %d messages (MTU: %d).\n"),
-       "solveKnapsack",
-       j,
-       be->sendBufferSize,
-       be->session.mtu - sizeof(P2P_Message));
-    
-    for (j=0;j<be->sendBufferSize;j++)
+    j = 0;
+    for (i=0;i<be->sendBufferSize;i++)
+      if (knapsackSolution[i] == YES)
+       j++;
+    if (j == 0) {
       LOG(LOG_ERROR,
-         _("Message details: %u: length %d, priority: %d\n"),
-         j, 
-         be->sendBuffer[j]->len,
-         be->sendBuffer[j]->pri);
-    FREE(knapsackSolution);
-    return;
-  }
+         _("'%s' selected %d out of %d messages (MTU: %d).\n"),
+         "solveKnapsack",
+         j,
+         be->sendBufferSize,
+         be->session.mtu - sizeof(P2P_Message));
+      
+      for (j=0;j<be->sendBufferSize;j++)
+       LOG(LOG_ERROR,
+           _("Message details: %u: length %d, priority: %d\n"),
+           j, 
+           be->sendBuffer[j]->len,
+           be->sendBuffer[j]->pri);
+      FREE(knapsackSolution);
+      return;
+    }
     
-  /* test if receiver has enough bandwidth available!  */
-  updateCurBPS(be);
+    if (be->available_send_window < be->session.mtu) {
+      /* if we have a very high priority, we may
+        want to ignore bandwidth availability (e.g. for HANGUP,
+        which  has EXTREME_PRIORITY) */
+      if (priority < EXTREME_PRIORITY) {
+       FREE(knapsackSolution);
 #if DEBUG_CONNECTION
-  LOG(LOG_DEBUG,
-      "receiver window available: %lld bytes (MTU: %u)\n",
-      be->available_send_window,
-      be->session.mtu);
+       LOG(LOG_DEBUG,
+           "bandwidth limits prevent sending (send window %u too small).\n",
+           be->available_send_window);
 #endif
-  if (be->available_send_window < be->session.mtu) {
-    /* if we have a very high priority, we may
-       want to ignore bandwidth availability (e.g. for HANGUP,
-       which  has EXTREME_PRIORITY) */
-    if (priority < EXTREME_PRIORITY) {
-      FREE(knapsackSolution);
-#if DEBUG_CONNECTION
-      LOG(LOG_DEBUG,
-         "bandwidth limits prevent sending (send window %u too small).\n",
-         be->available_send_window);
-#endif
-      return; /* can not send, BPS available is too small */
+       return; /* can not send, BPS available is too small */
+      }
     }
-  }
-  
+    totalMessageSize = be->session.mtu;
+  } /* end MTU > 0 */
+    
   expired = cronTime(NULL) - SECONDS_PINGATTEMPT * cronSECONDS; 
   /* if it's more than one connection "lifetime" old, always kill it! */
 
@@ -991,8 +1046,8 @@
   }
   
   /* build message (start with sequence number) */
-  GNUNET_ASSERT(be->session.mtu > sizeof(P2P_Message));
-  plaintextMsg = MALLOC(be->session.mtu);
+  GNUNET_ASSERT(totalMessageSize > sizeof(P2P_Message));
+  plaintextMsg = MALLOC(totalMessageSize);
   p2pHdr = (P2P_Message*) plaintextMsg;
   p2pHdr->timeStamp 
     = htonl(TIME(NULL));
@@ -1057,12 +1112,12 @@
       int msgCap;
       int l = getCPULoad();
       if (l >= 50) {
-       msgCap = be->session.mtu / sizeof(HashCode512);
+       msgCap = EXPECTED_MTU / sizeof(HashCode512);
       } else {
        if (l <= 0)
          l = 1;
-       msgCap = be->session.mtu / sizeof(HashCode512)
-         + (MAX_SEND_BUFFER_SIZE - be->session.mtu / sizeof(HashCode512)) / l;
+       msgCap = EXPECTED_MTU / sizeof(HashCode512)
+         + (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU / sizeof(HashCode512)) / l;
       }
       if (be->max_bpm > 2) {
        msgCap += 2 * (int) log((double)be->max_bpm);
@@ -1101,7 +1156,7 @@
   /* still room left? try callbacks! */
   pos = scl_nextHead;
   while (pos != NULL) {
-    if (pos->minimumPadding + p <= be->session.mtu) {
+    if (pos->minimumPadding + p <= totalMessageSize) {
       p += pos->callback(&be->session.sender,
                         &plaintextMsg[p],
                         be->session.mtu - p);
@@ -1110,10 +1165,10 @@
   }
   
   /* finally padd with noise */  
-  if ( (p + sizeof(p2p_HEADER) <= be->session.mtu) &&
+  if ( (p + sizeof(p2p_HEADER) <= totalMessageSize) &&
        (disable_random_padding == NO) ) {
     p2p_HEADER * part;
-    unsigned short noiseLen = be->session.mtu - p;
+    unsigned short noiseLen = totalMessageSize - p;
 
     part = (p2p_HEADER *) &plaintextMsg[p];
     part->size 
@@ -1121,10 +1176,10 @@
     part->type 
       = htons(p2p_PROTO_NOISE);
     for (i=p+sizeof(p2p_HEADER);
-        i < be->session.mtu;
+        i < totalMessageSize;
         i++)
       plaintextMsg[i] = (char) rand();
-    p = be->session.mtu;
+    p = totalMessageSize;
   }
 
   encryptedMsg = MALLOC(p); 
@@ -1155,8 +1210,8 @@
            (SYSERR == transport->sendReliable(be->session.tsession,
                                               encryptedMsg,
                                               p))) ) ) {
-    if (be->available_send_window > be->session.mtu)
-      be->available_send_window -= be->session.mtu;
+    if (be->available_send_window > totalMessageSize)
+      be->available_send_window -= totalMessageSize;
     else
       be->available_send_window = 0; /* if we overrode limits,
                                        reset to 0 at least... */
@@ -1205,7 +1260,8 @@
     BREAK();
     return;
   }
-  if (se->len > be->session.mtu - sizeof(P2P_Message)) {
+  if ( (be->session.mtu != 0) &&
+       (se->len > be->session.mtu - sizeof(P2P_Message)) ) {
     /* this message is so big that it must be fragmented! */
     fragmentation->fragment(&be->session.sender,
                            be->session.mtu,

Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2005-03-07 04:47:20 UTC (rev 399)
+++ GNUnet/src/transports/tcp.c 2005-03-07 08:14:10 UTC (rev 400)
@@ -140,9 +140,9 @@
   unsigned int pos;  
 
   /**
-   * Current size of the buffer.
+   * Current size of the read buffer.
    */
-  unsigned int size;
+  unsigned int rsize;
 
   /**
    * The read buffer.
@@ -159,6 +159,11 @@
    */
   char * wbuff;
 
+  /**
+   * Size of the write buffer
+   */
+  unsigned int wsize;
+
 } TCPSession;
 
 /* *********** globals ************* */
@@ -376,9 +381,15 @@
   if (SYSERR == tcpAssociate(tsession))
     return SYSERR;
   tcpSession = tsession->internal;
+  if (tcpSession->rsize == tcpSession->pos) {
+    /* read buffer too small, grow */
+    GROW(tcpSession->rbuff,
+        tcpSession->rsize,
+        tcpSession->rsize * 2);
+  }
   ret = READ(tcpSession->sock,
             &tcpSession->rbuff[tcpSession->pos],
-            tcpSession->size - tcpSession->pos);
+            tcpSession->rsize - tcpSession->pos);
   cronTime(&tcpSession->lastUse);
   if (ret == 0) {
     tcpDisconnect(tsession);
@@ -407,9 +418,9 @@
   incrementBytesReceived(ret);
   tcpSession->pos += ret;
   len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size);
-  if (len > tcpSession->size) /* if MTU larger than expected, grow! */
+  if (len > tcpSession->rsize) /* if message larger than read buffer, grow! */
     GROW(tcpSession->rbuff,
-        tcpSession->size,
+        tcpSession->rsize,
         len);
 #if DEBUG_TCP
   LOG(LOG_DEBUG,
@@ -479,8 +490,8 @@
   mp->tsession = tsession;
 #if DEBUG_TCP
   LOG(LOG_DEBUG,
-      "tcp transport received %d bytes, forwarding to core\n",
-      mp->size);
+      "tcp transport received %u bytes, forwarding to core\n",
+      mp->rsize);
 #endif
   coreAPI->receive(mp);
 
@@ -493,7 +504,14 @@
   memmove(&tcpSession->rbuff[0],
          &tcpSession->rbuff[len],
          tcpSession->pos - len);
-  tcpSession->pos -= len;         
+  tcpSession->pos -= len;
+  if ( (tcpSession->pos * 4 < tcpSession->rsize) &&
+       (tcpSession->rsize > 4 * 1024) ) {
+    /* read buffer far too large, shrink! */
+    GROW(tcpSession->rbuff,
+        tcpSession->rsize,
+        tcpSession->pos + 1024);
+  }
   
   tcpDisconnect(tsession);
   return OK;
@@ -531,10 +549,11 @@
 
   tcpSession = MALLOC(sizeof(TCPSession));
   tcpSession->pos = 0;
-  tcpSession->size = tcpAPI.mtu + sizeof(TCPMessagePack);
-  tcpSession->rbuff = MALLOC(tcpSession->size);
+  tcpSession->rsize = 2 * 1024 + sizeof(TCPMessagePack);
+  tcpSession->rbuff = MALLOC(tcpSession->rsize);
   tcpSession->wpos = 0;
   tcpSession->wbuff = NULL;
+  tcpSession->wsize = 0;
   tcpSession->sock = sock;
   /* fill in placeholder identity to mark that we 
      are waiting for the welcome message */
@@ -692,9 +711,9 @@
 
 try_again_1:   
        success = SEND_NONBLOCKING(sock,
-                              tcpSession->wbuff,
-                              tcpSession->wpos,
-                              &ret);
+                                  tcpSession->wbuff,
+                                  tcpSession->wpos,
+                                  &ret);
        if (success == SYSERR) {
          LOG_STRERROR(LOG_WARNING, "send");
          destroySession(i);
@@ -718,7 +737,8 @@
        if ((unsigned int)ret == tcpSession->wpos) {
          FREENONNULL(tcpSession->wbuff);
          tcpSession->wbuff = NULL;
-         tcpSession->wpos = 0;
+         tcpSession->wpos  = 0;
+         tcpSession->wsize = 0;
        } else {
          memmove(tcpSession->wbuff,
                  &tcpSession->wbuff[ret],
@@ -779,10 +799,6 @@
     BREAK(); /* size 0 not allowed */
     return SYSERR;
   }
-  if (ssize > tcpAPI.mtu + sizeof(TCPMessagePack)) {
-    BREAK(); /* size > mtu */
-    return SYSERR;
-  }
   ok = SYSERR;
   MUTEX_LOCK(&tcplock);
   if (tcpSession->wpos > 0) {
@@ -793,20 +809,21 @@
                               mp,
                               ssize,
                               &ret);
-       if (success == SYSERR) {
-         LOG_STRERROR(LOG_INFO, "send");
-         MUTEX_UNLOCK(&tcplock);
-         return SYSERR;
-       } else if (success == NO)
-         ret = 0;
+    if (success == SYSERR) {
+      LOG_STRERROR(LOG_INFO, "send");
+      MUTEX_UNLOCK(&tcplock);
+      return SYSERR;
+    } else if (success == NO)
+      ret = 0;
   }
   if ((unsigned int)ret <= ssize) { /* some bytes send or blocked */
     if ((unsigned int)ret < ssize) {
       if (tcpSession->wbuff == NULL) {
-       tcpSession->wbuff = MALLOC(tcpAPI.mtu + sizeof(TCPMessagePack));
-       tcpSession->wpos = 0;
+       tcpSession->wsize = ssize + sizeof(TCPMessagePack);
+       tcpSession->wbuff = MALLOC(tcpSession->wsize);
+       tcpSession->wpos  = 0;
       }
-      if (ssize + tcpSession->wpos > tcpAPI.mtu + sizeof(TCPMessagePack) + 
ret) {
+      if (ssize + tcpSession->wpos - ret > tcpSession->wsize) {
        ssize = 0;
        ok = SYSERR; /* buffer full, drop */
       } else {
@@ -864,9 +881,8 @@
   MUTEX_LOCK(&tcplock);
   if (tcpSession->wpos > 0) {
     unsigned int old = tcpSession->wpos;
-    /* reliable: grow send-buffer above limit! */
     GROW(tcpSession->wbuff,
-        tcpSession->wpos,
+        tcpSession->wsize,
         tcpSession->wpos + ssize);
     memcpy(&tcpSession->wbuff[old],
           mp,
@@ -1013,8 +1029,9 @@
   tcpSession->sock = sock;
   tcpSession->wpos = 0;
   tcpSession->wbuff = NULL;
-  tcpSession->size = tcpAPI.mtu + sizeof(TCPMessagePack);
-  tcpSession->rbuff = MALLOC(tcpSession->size);
+  tcpSession->wsize = 0;
+  tcpSession->rsize = 2 * 1024 + sizeof(TCPMessagePack);
+  tcpSession->rbuff = MALLOC(tcpSession->rsize);
   tsession = MALLOC(sizeof(TSession));
   tsession->internal = tcpSession;
   tsession->ttype = tcpAPI.protocolNumber;
@@ -1065,7 +1082,7 @@
   
   if (tcp_shutdown == YES)
     return SYSERR;
-  if ( (size == 0) || (size > tcpAPI.mtu) ) {
+  if (size == 0) {
     BREAK();
     return SYSERR;
   }
@@ -1104,7 +1121,7 @@
   
   if (tcp_shutdown == YES)
     return SYSERR;
-  if ( (size == 0) || (size > tcpAPI.mtu) ) {
+  if (size == 0) {
     BREAK();
     return SYSERR;
   }
@@ -1276,8 +1293,6 @@
  * via a global and returns the udp transport API.
  */ 
 TransportAPI * inittransport_tcp(CoreAPIForTransport * core) {
-  int mtu;
-
   MUTEX_CREATE_RECURSIVE(&tcplock);
   reloadConfiguration();
   tsessionCount = 0;
@@ -1286,17 +1301,8 @@
        tsessionArrayLength,
        32);
   coreAPI = core;
-  mtu = getConfigurationInt("TCP",
-                           "MTU");
-  if (mtu == 0)
-    mtu = 1460;
-  if (mtu < 1200)
-    LOG(LOG_ERROR,
-       _("MTU for '%s' is probably too low (fragmentation not 
implemented!)\n"),
-       "TCP");
- 
   tcpAPI.protocolNumber       = TCP_PROTOCOL_NUMBER;
-  tcpAPI.mtu                  = mtu - sizeof(TCPMessagePack);
+  tcpAPI.mtu                  = 0;
   tcpAPI.cost                 = 20000; /* about equal to udp */
   tcpAPI.verifyHelo           = &verifyHelo;
   tcpAPI.createHELO           = &createHELO;

Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2005-03-07 04:47:20 UTC (rev 399)
+++ GNUnet/todo 2005-03-07 08:14:10 UTC (rev 400)
@@ -21,7 +21,8 @@
   * dht / gnunet-dht-join and gnunet-dht-query 
   * fs
 - transport refactoring:
-  * TCP MTU change [ hard ]
+  * TEST: TCP MTU change [ at this point, it may just work or equally just 
crash and burn ]
+  * UPDATE: TCP6/HTTP: MTU change!
   * port knocking support? [ tricky ]
 - UI features:
   * date feature (Mantis #789)





reply via email to

[Prev in Thread] Current Thread [Next in Thread]