gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r409 - in GNUnet: . src/applications/advertising src/applic


From: grothoff
Subject: [GNUnet-SVN] r409 - in GNUnet: . src/applications/advertising src/applications/fragmentation src/applications/identity src/applications/tbench src/applications/topology_default src/server src/transports
Date: Tue, 8 Mar 2005 03:57:24 -0800 (PST)

Author: grothoff
Date: 2005-03-08 03:57:22 -0800 (Tue, 08 Mar 2005)
New Revision: 409

Modified:
   GNUnet/src/applications/advertising/advertising.c
   GNUnet/src/applications/fragmentation/fragmentation.c
   GNUnet/src/applications/identity/identity.c
   GNUnet/src/applications/tbench/gnunet-tbench.c
   GNUnet/src/applications/tbench/peer1.conf
   GNUnet/src/applications/tbench/peer2.conf
   GNUnet/src/applications/tbench/tbench.c
   GNUnet/src/applications/tbench/tbench.h
   GNUnet/src/applications/tbench/tbenchtest.c
   GNUnet/src/applications/topology_default/topology.c
   GNUnet/src/server/connection.c
   GNUnet/src/server/core.c
   GNUnet/src/transports/tcp.c
   GNUnet/src/transports/tcp6.c
   GNUnet/todo
Log:
bugfixing galore

Modified: GNUnet/src/applications/advertising/advertising.c
===================================================================
--- GNUnet/src/applications/advertising/advertising.c   2005-03-08 04:07:02 UTC 
(rev 408)
+++ GNUnet/src/applications/advertising/advertising.c   2005-03-08 11:57:22 UTC 
(rev 409)
@@ -458,8 +458,10 @@
 #endif
   identity->addHost(sd.m);
   if (sd.n < 1) {
-    LOG(LOG_WARNING,
-       _("Announcing ourselves pointless: no other peers are known to us so 
far.\n"));
+    if (identity->forEachHost(0, NULL, NULL) == 0) 
+      LOG(LOG_WARNING,
+         _("Announcing ourselves pointless: "
+           "no other peers are known to us so far.\n"));
     FREE(sd.m);
     return; /* no point in trying... */
   }

Modified: GNUnet/src/applications/fragmentation/fragmentation.c
===================================================================
--- GNUnet/src/applications/fragmentation/fragmentation.c       2005-03-08 
04:07:02 UTC (rev 408)
+++ GNUnet/src/applications/fragmentation/fragmentation.c       2005-03-08 
11:57:22 UTC (rev 409)
@@ -508,7 +508,9 @@
                     &frag->header,
                     EXTREME_PRIORITY,
                     ctx->transmissionTime - cronTime(NULL)); 
+    pos += mlen - sizeof(FRAGMENT_Message);
   }
+  GNUNET_ASSERT(pos == ctx->len);
   FREE(frag);
   FREE(tmp);
   FREE(ctx);

Modified: GNUnet/src/applications/identity/identity.c
===================================================================
--- GNUnet/src/applications/identity/identity.c 2005-03-08 04:07:02 UTC (rev 
408)
+++ GNUnet/src/applications/identity/identity.c 2005-03-08 11:57:22 UTC (rev 
409)
@@ -637,7 +637,7 @@
       hash2enc(&identity->hashPubKey,
               &hn);
 #if DEBUG_IDENTITY
-      LOG(LOG_DEBUG, 
+      LOG(LOG_INFO, 
          "Blacklisting host '%s' (%d) for %llu seconds until %llu 
(strict=%d).\n",
          &hn, 
          i,

Modified: GNUnet/src/applications/tbench/gnunet-tbench.c
===================================================================
--- GNUnet/src/applications/tbench/gnunet-tbench.c      2005-03-08 04:07:02 UTC 
(rev 408)
+++ GNUnet/src/applications/tbench/gnunet-tbench.c      2005-03-08 11:57:22 UTC 
(rev 409)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2001, 2002, 2004 Christian Grothoff (and other contributing authors)
+     (C) 2001, 2002, 2004, 2005 Christian Grothoff (and other contributing 
authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -28,23 +28,23 @@
 #include "gnunet_protocols.h"
 #include "tbench.h"
 
-#define TBENCH_VERSION "0.1.0"
+#define TBENCH_VERSION "0.1.1"
 
 #define DEFAULT_MESSAGE_SIZE   10
-#define DEFAULT_TIMEOUT                2
+#define DEFAULT_TIMEOUT                (2 * cronSECONDS)
 #define DEFAULT_SPACING                0
 
 #define OF_HUMAN_READABLE 0
 #define OF_GNUPLOT_INPUT 1
 
-static unsigned int  messageSize = DEFAULT_MESSAGE_SIZE;
-static unsigned int  messageCnt  = 1;
+static unsigned int messageSize = DEFAULT_MESSAGE_SIZE;
+static unsigned int messageCnt  = 1;
 static char * messageReceiver;
-static unsigned int  messageIterations = 1;
-static unsigned int  messageTrainSize = 1;
-static unsigned int  messageTimeOut = DEFAULT_TIMEOUT;
-static unsigned int  messageSpacing = DEFAULT_SPACING;
-static unsigned int outputFormat = OF_HUMAN_READABLE;
+static unsigned int messageIterations = 1;
+static unsigned int messageTrainSize  = 1;
+static cron_t messageTimeOut          = DEFAULT_TIMEOUT;
+static cron_t messageSpacing          = DEFAULT_SPACING;
+static unsigned int outputFormat      = OF_HUMAN_READABLE;
 
 /**
  * Parse the options, set the timeout.
@@ -103,9 +103,9 @@
        { 's', "size", "SIZE",
          gettext_noop("message size") },
        { 'S', "space", "SPACE",
-         gettext_noop("inter-train message spacing") },
+         gettext_noop("inter-train message spacing (in number of messages)") },
        { 't', "timeout", "TIMEOUT",
-         gettext_noop("time to wait for the arrival of a response") },
+         gettext_noop("time to wait for the completion of an iteration (in 
ms)") },
        HELP_VERSION,
        { 'X', "xspace", "COUNT",
          gettext_noop("sleep for SPACE ms after COUNT messages") },
@@ -117,7 +117,9 @@
       return SYSERR;
     }
     case 'i': 
-      if(1 != sscanf(GNoptarg, "%ud", &messageIterations)){
+      if(1 != sscanf(GNoptarg,
+                    "%ud",
+                    &messageIterations)){
        LOG(LOG_FAILURE, 
            _("You must pass a number to the '%s' option.\n"),
            "-i");
@@ -125,7 +127,9 @@
       }
       break;
     case 'n': 
-      if(1 != sscanf(GNoptarg, "%ud", &messageCnt)){
+      if(1 != sscanf(GNoptarg,
+                    "%ud",
+                    &messageCnt)){
        LOG(LOG_FAILURE, 
            _("You must pass a number to the '%s' option.\n"),
            "-n");
@@ -136,7 +140,9 @@
       messageReceiver = STRDUP(GNoptarg);
       break;
     case 's': 
-      if(1 != sscanf(GNoptarg, "%ud", &messageSize)){
+      if(1 != sscanf(GNoptarg, 
+                    "%ud",
+                    &messageSize)){
        LOG(LOG_FAILURE, 
            _("You must pass a number to the '%s' option.\n"),
            "-s");
@@ -144,7 +150,9 @@
       }
       break;
     case 'S':
-      if(1 != sscanf(GNoptarg, "%ud", &messageSpacing)){
+      if(1 != sscanf(GNoptarg,
+                    "%ud",
+                    &messageTrainSize)){
        LOG(LOG_FAILURE, 
            _("You must pass a number to the '%s' option.\n"),
            "-S");
@@ -152,7 +160,9 @@
       }
       break;
     case 't':
-      if(1 != sscanf(GNoptarg, "%ud", &messageTimeOut)){
+      if(1 != sscanf(GNoptarg, 
+                    "%llud", 
+                    &messageTimeOut)){
        LOG(LOG_FAILURE, 
            _("You must pass a number to the '%s' option.\n"),
            "-t");
@@ -165,7 +175,9 @@
             TBENCH_VERSION);
       return SYSERR;
     case 'X':
-      if(1 != sscanf(GNoptarg, "%ud", &messageTrainSize)){
+      if(1 != sscanf(GNoptarg, 
+                    "%llud",
+                    &messageSpacing)){
        LOG(LOG_FAILURE, 
            _("You must pass a number to the '%s' option.\n"),
            "-X");
@@ -174,8 +186,7 @@
       break;
     default: 
       LOG(LOG_FAILURE,
-         _("Use --help to get a list of options.\n"),
-         c);
+         _("Use --help to get a list of options.\n"));
       return -1;
     } /* end of parsing commandline */
   } /* while (1) */
@@ -201,35 +212,34 @@
   if (sock == NULL)
     errexit(_("Could not connect to gnunetd.\n"));
 
-  memset(&msg,
-        0,
-        sizeof(TBENCH_CS_MESSAGE));
-  msg.msgSize     =htons(messageSize);
-  msg.msgCnt      =htons(messageCnt);
-  msg.iterations  =htons(messageIterations);
-  msg.intPktSpace =htons(messageSpacing);
-  msg.trainSize   =htons(messageTrainSize);
-  msg.timeOut     =htonl(messageTimeOut);
+  msg.header.size = htons(sizeof(TBENCH_CS_MESSAGE));
+  msg.header.type = htons(TBENCH_CS_PROTO_REQUEST);
+  msg.msgSize     = htonl(messageSize);
+  msg.msgCnt      = htonl(messageCnt);
+  msg.iterations  = htonl(messageIterations);
+  msg.intPktSpace = htonll(messageSpacing);
+  msg.trainSize   = htonl(messageTrainSize);
+  msg.timeOut     = htonll(messageTimeOut);
+  msg.priority    = htonl(5);
   if (messageReceiver == NULL)
     errexit(_("You must specify a receiver!\n"));
   if (OK != enc2hash(messageReceiver,
                     &msg.receiverId.hashPubKey))                    
-    errexit(_("Invalid receiver peer ID specified ('%s' is not valid enc 
name).\n"),
+    errexit(_("Invalid receiver peer ID specified ('%s' is not valid 
name).\n"),
            messageReceiver);
   FREE(messageReceiver);
 
-  msg.header.size = htons(sizeof(TBENCH_CS_MESSAGE));
-  msg.header.type = htons(TBENCH_CS_PROTO_REQUEST);
-
   if (SYSERR == writeToSocket(sock,
                              &msg.header))
     return -1;
   
-  buffer = MALLOC(MAX_BUFFER_SIZE);
-  LOG(LOG_DEBUG,
-      "Reading using readFromSocket...\n");
-  if (OK == readFromSocket(sock, (CS_HEADER**)&buffer)) {
-    if((float)buffer->mean_loss <= 0){
+  buffer = NULL;
+  if (OK == readFromSocket(sock, 
+                          (CS_HEADER**)&buffer)) {
+    GNUNET_ASSERT(ntohs(buffer->header.size) ==
+                 sizeof(TBENCH_CS_REPLY));
+    if ((float)buffer->mean_loss <= 0){
+      BREAK();
       messagesPercentLoss = 0.0;
     } else {
       messagesPercentLoss = (buffer->mean_loss/((float)htons(msg.msgCnt)));
@@ -237,23 +247,23 @@
     switch (outputFormat) {
     case OF_HUMAN_READABLE:
       printf(_("Time:\n"));
-      printf(_("\tmax      %d\n"),
-            htons(buffer->max_time));
-      printf(_("\tmin      %d\n"),
-            htons(buffer->min_time));
-      printf(_("\tmean     %f\n"),
+      printf(_("\tmax      %llums\n"),
+            ntohll(buffer->max_time));
+      printf(_("\tmin      %llums\n"),
+            ntohll(buffer->min_time));
+      printf(_("\tmean     %8.4fms\n"),
             buffer->mean_time);
-      printf(_("\tvariance %f\n"),
+      printf(_("\tvariance %8.4fms\n"),
             buffer->variance_time);
       
       printf(_("Loss:\n"));
-      printf(_("\tmax      %d\n"),
-            htons(buffer->max_loss));
-      printf(_("\tmin      %d\n"),
-            htons(buffer->min_loss));
-      printf(_("\tmean     %f\n"),
+      printf(_("\tmax      %u\n"),
+            ntohl(buffer->max_loss));
+      printf(_("\tmin      %u\n"),
+            ntohl(buffer->min_loss));
+      printf(_("\tmean     %8.4f\n"),
             buffer->mean_loss);
-      printf(_("\tvariance %f\n"),
+      printf(_("\tvariance %8.4f\n"),
             buffer->variance_loss); 
       break;
     case OF_GNUPLOT_INPUT:
@@ -264,9 +274,9 @@
     default:
       printf(_("Output format not known, this should not happen.\n"));
     }
+    FREE(buffer);
   } else 
     printf(_("\nDid not receive the message from gnunetd. Is gnunetd 
running?\n"));  
-  FREE(buffer);
 
   releaseClientSocket(sock);
   doneUtil();

Modified: GNUnet/src/applications/tbench/peer1.conf
===================================================================
--- GNUnet/src/applications/tbench/peer1.conf   2005-03-08 04:07:02 UTC (rev 
408)
+++ GNUnet/src/applications/tbench/peer1.conf   2005-03-08 11:57:22 UTC (rev 
409)
@@ -6,7 +6,7 @@
 [GNUNETD]
 # VALGRIND        = 300
 HELOEXPIRES     = 60
-LOGLEVEL        = INFO
+LOGLEVEL        = DEBUG
 # LOGFILE         = 
 KEEPLOG         = 0
 PIDFILE         = $GNUNETD_HOME/gnunetd.pid

Modified: GNUnet/src/applications/tbench/peer2.conf
===================================================================
--- GNUnet/src/applications/tbench/peer2.conf   2005-03-08 04:07:02 UTC (rev 
408)
+++ GNUnet/src/applications/tbench/peer2.conf   2005-03-08 11:57:22 UTC (rev 
409)
@@ -6,7 +6,7 @@
 [GNUNETD]
 # VALGRIND        = 300
 HELOEXPIRES     = 60
-LOGLEVEL        = INFO
+LOGLEVEL        = DEBUG
 # LOGFILE         = 
 KEEPLOG         = 0
 PIDFILE         = $GNUNETD_HOME/gnunetd.pid

Modified: GNUnet/src/applications/tbench/tbench.c
===================================================================
--- GNUnet/src/applications/tbench/tbench.c     2005-03-08 04:07:02 UTC (rev 
408)
+++ GNUnet/src/applications/tbench/tbench.c     2005-03-08 11:57:22 UTC (rev 
409)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2001, 2002, 2004 Christian Grothoff (and other contributing authors)
+     (C) 2001, 2002, 2004, 2005 Christian Grothoff (and other contributing 
authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -19,234 +19,408 @@
 */
 
 /**
- * TBench CORE. This is the code that is plugged
- * into the GNUnet core to enable transport profiling.
- *
- * FIXME: this code needs some serious workover (leaks!)
- *
+ * @file applications/tbench/tbench.c
  * @author Paul Ruth
- * @file applications/tbench/tbench.c
+ * @brief module to enable transport profiling.
  */
 
 #include "platform.h"
 #include "gnunet_protocols.h"
 #include "tbench.h"
 
-struct Result {
-  cron_t   time;
-  unsigned int packets;
-};
+#define DEBUG_TBENCH NO
 
-static CoreAPIForApplication * coreAPI = NULL;
+typedef struct {
+  cron_t totalTime;
+  unsigned char * packetsReceived;
+  unsigned int maxPacketNumber;
+  unsigned int lossCount;
+  unsigned int duplicateCount;
+} IterationData;
+
+/**
+ * Message exchanged between peers for profiling
+ * transport performance.
+ */
+typedef struct {
+  p2p_HEADER header; 
+  unsigned int iterationNum;
+  unsigned int packetNum;
+  unsigned int priority;
+  unsigned int nounce;
+  unsigned int crc;
+} TBENCH_p2p_MESSAGE;
+
+/**
+ * Lock for access to semaphores.
+ */
 static Mutex lock;
-static Mutex lockCnt;
-static PeerIdentity receiverIdent;
-static Semaphore * sem;
-static cron_t startTime = 0;
-static cron_t endTime = 0;
 
-static int msgCnt = 1;
-static int msgIter = 1;
-static int receiveCnt;
-static int currIteration;
+static Semaphore * presem;
 
-/* */
+static Semaphore * postsem;
+
+/**
+ * What was the packet number we received?
+ */
+static unsigned int lastPacketNumber;
+
+/**
+ * What is the current iteration counter? (Used to verify
+ * that replies match the current request series).
+ */
+static unsigned int currIteration;
+static unsigned int currNounce;
+
+/**
+ * Did the current iteration time-out? (YES/NO)
+ */
+static int timeoutOccured;
+
+
+static CoreAPIForApplication * coreAPI;
+
+/**
+ * Check if we have received a p2p reply,
+ * update result counters accordingly.
+ *
+ * @return 0 if we have received all results, >0 otherwise
+ */
+static int pollResults(IterationData * results,
+                      int blocking) {
+  if (results->lossCount == 0)
+    return 0;
+  if (blocking == YES) {
+    if (timeoutOccured == YES)
+      return results->lossCount;
+    SEMAPHORE_DOWN(postsem);
+  } else {
+    if (OK != SEMAPHORE_DOWN_NONBLOCKING(postsem))
+      return results->lossCount;
+  }
+  do {
+    if (timeoutOccured == YES) {
+      SEMAPHORE_UP(presem);
+      return results->lossCount;
+    }
+    if (lastPacketNumber > results->maxPacketNumber) {
+      SEMAPHORE_UP(presem);
+      return results->lossCount;
+    }
+    if (0 == results->packetsReceived[lastPacketNumber]++) {
+      results->lossCount--;    
+    } else {
+      results->duplicateCount++;
+#if DEBUG_TBENCH
+      LOG(LOG_DEBUG,
+         "Received duplicate message %u from iteration %u\n",
+         lastPacketNumber, 
+         currIteration);
+#endif     
+    } 
+    SEMAPHORE_UP(presem);
+  } while (OK == SEMAPHORE_DOWN_NONBLOCKING(postsem));
+  return results->lossCount;
+}
+
+/**
+ * Another peer send us a tbench request.  Just turn
+ * around and send it back.
+ */
 static int handleTBenchReq(const PeerIdentity * sender,
                           const p2p_HEADER * message) {
-  TBENCH_p2p_MESSAGE *pmsg = (TBENCH_p2p_MESSAGE*)message;
-  
-  LOG(LOG_DEBUG, 
-      "%s received iteration %d, message %d\n",
-      __FUNCTION__,
-      htons(pmsg->iterationNum), 
-      htons(pmsg->packetNum));
-  pmsg->header.type = htons(TBENCH_p2p_PROTO_REPLY);
-  coreAPI->unicast(sender, message, 5, 0);    
+  p2p_HEADER * reply;
+  TBENCH_p2p_MESSAGE * msg;
+
+  if ( ntohs(message->size) < sizeof(TBENCH_p2p_MESSAGE)) {
+    BREAK();
+    return SYSERR;
+  }
+  msg = (TBENCH_p2p_MESSAGE*) message;
+  if (crc32N(&msg[1],
+            ntohs(message->size) - sizeof(TBENCH_p2p_MESSAGE))
+      != ntohl(msg->crc)) {
+    BREAK();
+    return SYSERR;
+  }
+
+#if DEBUG_TBENCH
+  LOG(LOG_DEBUG,
+      "Received message %u from iteration %u/%u\n",
+      htonl(msg->packetNum), 
+      htonl(msg->iterationNum),
+      htonl(msg->nounce));
+#endif
+  reply = MALLOC(ntohs(message->size));
+  memcpy(reply,
+        message,
+        ntohs(message->size));
+  reply->type = htons(TBENCH_p2p_PROTO_REPLY);
+  coreAPI->unicast(sender, 
+                  reply,
+                  ntohl(msg->priority), /* medium importance */
+                  0); /* no delay */
+  FREE(reply);
   return OK;
 }
 
-/* */
+/**
+ * We received a tbench-reply.  Check and count stats.
+ */
 static int handleTBenchReply(const PeerIdentity * sender,
                             const p2p_HEADER * message) {
-  TBENCH_p2p_MESSAGE *pmsg = (TBENCH_p2p_MESSAGE*)message;
-  
-  LOG(LOG_DEBUG, 
-      "Entering %s.\n",
-      __FUNCTION__);
-  MUTEX_LOCK(&lockCnt); 
-  if(htons(pmsg->iterationNum) == currIteration) {
-    cronTime(&endTime);
-    receiveCnt++;
-    LOG(LOG_DEBUG,
-       "iteration %d, received reply, %d\n",
-       currIteration, 
-       receiveCnt);
-    if(receiveCnt >= msgCnt)
-      SEMAPHORE_UP(sem);
+  TBENCH_p2p_MESSAGE * pmsg;
+
+  if (ntohs(message->size) < sizeof(TBENCH_p2p_MESSAGE)) {
+    BREAK();
+    return SYSERR;
+  }
+  pmsg = (TBENCH_p2p_MESSAGE*) message;  
+  if (crc32N(&pmsg[1],
+            ntohs(message->size) - sizeof(TBENCH_p2p_MESSAGE))
+      != ntohl(pmsg->crc)) {
+    BREAK();
+    return SYSERR;
+  }
+#if DEBUG_TBENCH
+  LOG(LOG_DEBUG,
+      "Received message %u from iteration %u/%u\n",
+      htonl(pmsg->packetNum), 
+      htonl(pmsg->iterationNum),
+      htonl(pmsg->nounce));
+#endif
+  MUTEX_LOCK(&lock); 
+  if ( (timeoutOccured == NO) &&
+       (presem != NULL) &&
+       (postsem != NULL) &&
+       (htonl(pmsg->iterationNum) == currIteration) &&
+       (htonl(pmsg->nounce) == currNounce) ) {
+    SEMAPHORE_DOWN(presem);
+    lastPacketNumber = ntohl(pmsg->packetNum);
+    SEMAPHORE_UP(postsem);
   } else {
+#if DEBUG_TBENCH
     LOG(LOG_DEBUG,
-       "Old Reply: iteration %d, received reply, %d\n",
-       currIteration, receiveCnt);
+       "Received message %u from iteration %u too late (now at iteration 
%u)\n",
+       ntohl(pmsg->packetNum),
+       ntohl(pmsg->iterationNum),
+       currIteration);
+#endif
   }
-  MUTEX_UNLOCK(&lockCnt);
+  MUTEX_UNLOCK(&lock);
   return OK;
 }
 
+/**
+ * Cron-job helper function to signal timeout.
+ */
 static void semaUp(Semaphore * sem) {
+  timeoutOccured = YES;
   SEMAPHORE_UP(sem);
 }
 
-/* */
+/**
+ * Handle client request (main function)
+ */
 static int csHandleTBenchRequest(ClientHandle client,
                                 const CS_HEADER * message) {
-  int i,j;
-  int sum_loss,sum_time;
-  double sum_variance_time, sum_variance_loss;
-  TBENCH_p2p_MESSAGE *opmsg;
-  TBENCH_CS_MESSAGE *icmsg;
-  TBENCH_CS_REPLY *ocmsg;
-  struct Result *results;
+  TBENCH_CS_MESSAGE * msg;
+  TBENCH_CS_REPLY reply;
+  TBENCH_p2p_MESSAGE * p2p;
+  unsigned short size;
+  unsigned int iteration;
+  unsigned int packetNum;
+  cron_t startTime;
+  cron_t endTime;
+  cron_t now;
+  cron_t delay;
+  cron_t delayStart;
+  IterationData * results;
+  unsigned long long sum_loss;
+  unsigned int max_loss;
+  unsigned int min_loss;
+  cron_t sum_time;
+  cron_t min_time;
+  cron_t max_time;
+  cron_t earlyEnd;
+  double sum_variance_time;
+  double sum_variance_loss;
+  unsigned int msgCnt;
+  unsigned int iterations;
 
-  LOG(LOG_DEBUG, 
-      "Entering %s.\n",
-      __FUNCTION__);
-  icmsg   = (TBENCH_CS_MESSAGE*)message;
- 
-  opmsg = MALLOC(sizeof(TBENCH_p2p_MESSAGE)+ntohs(icmsg->msgSize)+1);
-  ocmsg = MALLOC(sizeof(TBENCH_CS_REPLY));
-  MUTEX_LOCK(&lock); /* only one benchmark run
-                       at a time */
+  if ( ntohs(message->size) != sizeof(TBENCH_CS_MESSAGE) )
+    return SYSERR;
   
-  msgCnt  = htons(icmsg->msgCnt);
-  msgIter = htons(icmsg->iterations);
-  results = MALLOC(msgIter * sizeof(struct Result));
-
-  LOG(LOG_DEBUG,
-      "TBENCH: msgCnt %d msgIter %d\n",
+  msg = (TBENCH_CS_MESSAGE*) message; 
+  size = sizeof(TBENCH_p2p_MESSAGE) + ntohl(msg->msgSize);
+  if (size < sizeof(TBENCH_p2p_MESSAGE))
+    return SYSERR;
+  delay = ntohll(msg->intPktSpace);
+  iterations = ntohl(msg->iterations);
+  msgCnt = ntohl(msg->msgCnt);
+#if DEBUG_TBENCH
+  LOG(LOG_MESSAGE,
+      "Tbench runs %u test messages of size %u in %u iterations.\n",
       msgCnt, 
-      msgIter);
-  sem = SEMAPHORE_NEW(0);
+      size,
+      iterations);
+#endif
+  results = MALLOC(sizeof(IterationData) * iterations);
 
-  receiveCnt = 0;
+  p2p = MALLOC(size);
+  memset(p2p,
+        0,
+        size);
+  p2p->header.size = htons(size);
+  p2p->header.type = htons(TBENCH_p2p_PROTO_REQUEST);
+  p2p->priority = msg->priority;
 
-  memcpy(&receiverIdent,
-        &icmsg->receiverId,
-        sizeof(PeerIdentity));
-  
-  /* set up opmsg */
-  memset(opmsg, 0, sizeof(TBENCH_p2p_MESSAGE));
-  opmsg->header.size = htons(sizeof(TBENCH_p2p_MESSAGE)+ntohs(icmsg->msgSize));
-  opmsg->header.type = htons(TBENCH_p2p_PROTO_REQUEST);
-  opmsg->iterationNum = opmsg->packetNum = htons(0);
- 
-  for(currIteration = 0; currIteration < msgIter; currIteration++){
-    opmsg->iterationNum = htons(currIteration);
-    receiveCnt = 0;
-    LOG(LOG_DEBUG,
-       "Timeout after %ums\n",
-       ntohl(icmsg->timeOut));
+  MUTEX_LOCK(&lock);  
+  for (iteration=0;iteration<iterations;iteration++) {
+    results[iteration].maxPacketNumber = msgCnt;
+    results[iteration].packetsReceived = MALLOC(msgCnt);
+    memset(results[iteration].packetsReceived,
+          0,
+          msgCnt);
+    results[iteration].lossCount = msgCnt;
+    results[iteration].duplicateCount = 0;
+
+    earlyEnd = 0;
+    presem = SEMAPHORE_NEW(1);
+    postsem = SEMAPHORE_NEW(0);
+    currNounce = randomi(0xFFFFFF);
+    p2p->nounce 
+      = htonl(currNounce);
+    currIteration = iteration;
+    p2p->iterationNum 
+      = htonl(currIteration);
+    memset(&p2p[1],
+          randomi(256),
+          size - sizeof(TBENCH_p2p_MESSAGE));
+    p2p->crc
+      = htonl(crc32N(&p2p[1],
+                    size - sizeof(TBENCH_p2p_MESSAGE)));
+    
+    MUTEX_UNLOCK(&lock); /* allow receiving */
+
+    cronTime(&startTime);
+    endTime = startTime + ntohll(msg->timeOut);
+
+    timeoutOccured = NO;
     addCronJob((CronJob)&semaUp,
-              ntohl(icmsg->timeOut) * cronMILLIS,
+              ntohll(msg->timeOut) * cronMILLIS,
               0,
-              sem);
-    cronTime(&startTime);
-    endTime = startTime;
-    for(j = 0; j < msgCnt; j++){
-      if (cronTime(NULL) > startTime + ntohl(icmsg->timeOut)*cronMILLIS)
-       break;
-      opmsg->packetNum = htons(j);
-      coreAPI->unicast(&receiverIdent, &opmsg->header, 5, 0); 
-      if (htons(icmsg->intPktSpace)!=0 && (j % htons(icmsg->trainSize)) == 0) {
-       struct timespec del;
-       struct timespec rem;
-       del.tv_sec = htons(icmsg->intPktSpace) / cronSECONDS;
-       del.tv_nsec = (htons(icmsg->intPktSpace) - (del.tv_sec * cronSECONDS)) 
* 1000 * 1000;
-#ifndef WINDOWS
-       nanosleep(&del, &rem);
-#else
-    SleepEx(del.tv_sec * 1000 + del.tv_nsec / 1000000, TRUE);
+              postsem);
+    for (packetNum=0;packetNum<msgCnt;packetNum++){      
+      cronTime(&now);
+      if (now > endTime)
+       break; /* timeout */
+      
+      p2p->packetNum = htonl(packetNum);
+#if DEBUG_TBENCH
+      LOG(LOG_DEBUG,
+         "Sending message %u in iteration %u\n",
+         packetNum, iteration);
 #endif
-      }        
+      coreAPI->unicast(&msg->receiverId, 
+                      &p2p->header, 
+                      ntohl(msg->priority),
+                      0); /* no delay */
+      pollResults(&results[iteration], NO);
+      if ( (delay != 0) &&
+          (htonl(msg->trainSize) != 0) &&
+          (packetNum % htonl(msg->trainSize)) == 0) {
+       delayStart = now;
+       while ( (cronTime(&now) < (delayStart+delay)) &&
+               (timeoutOccured == NO) ) {
+         if (delayStart + delay - now  > 5 * cronMILLIS) {
+           pollResults(&results[iteration], NO);
+           gnunet_util_sleep(5 * cronMILLIS);
+         } else
+           gnunet_util_sleep(delayStart + delay - now);
+       }
+      }            
+      if ( (0 == pollResults(&results[iteration], NO)) &&
+          (earlyEnd == 0) )
+       earlyEnd = cronTime(NULL);
     }
-    SEMAPHORE_DOWN(sem);    
+    while ( (timeoutOccured == NO) &&
+           (cronTime(&now) < endTime) ) {
+      if ( (0 == pollResults(&results[iteration], YES) ) &&
+          (earlyEnd == 0) )
+       earlyEnd = now;
+      gnunet_util_sleep(5 * cronMILLIS);
+    }
+
+    /* make sure to unblock waiting jobs */
+    timeoutOccured = YES;
+    SEMAPHORE_UP(presem);
+
+    MUTEX_LOCK(&lock);
+    if (earlyEnd == 0)
+      earlyEnd = now;
+    results[iteration].totalTime 
+      = earlyEnd - startTime;
+    FREE(results[iteration].packetsReceived);
     suspendCron();
     delCronJob((CronJob)&semaUp,
               0,
-              sem);
+              postsem);
     resumeCron();
-    results[currIteration].time = endTime-startTime;
-    results[currIteration].packets = receiveCnt;
+    SEMAPHORE_FREE(presem);
+    SEMAPHORE_FREE(postsem);
+    presem = NULL;
+    postsem = NULL;
+
   }
-  SEMAPHORE_FREE(sem);
   MUTEX_UNLOCK(&lock);
 
-  /* Lets see what the raw results are */
-  for(i = 0; i <  msgIter; i++){
-    LOG(LOG_EVERYTHING, 
-       "iter[%d], packets %d/%d, time %dms\n",
-       i,
-       results[i].packets,
-       msgCnt,
-       results[i].time); 
-  }
-
-  sum_loss = msgCnt - results[0].packets;
-  ocmsg->max_loss = htons(msgCnt - results[0].packets);
-  ocmsg->min_loss = htons(msgCnt - results[0].packets);
-  sum_time = results[0].time;
-  ocmsg->max_time = htons(results[0].time);
-  ocmsg->min_time = htons(results[0].time);
-  for(i = 1; i < msgIter; i++) {
-    LOG(LOG_EVERYTHING, 
-       "iteration=%d\n", 
-       i);
-    sum_loss += msgCnt - results[i].packets;
-    if(msgCnt-results[i].packets > htons(ocmsg->max_loss))
-      ocmsg->max_loss = htons(msgCnt - results[i].packets);
-
-    if(msgCnt-results[i].packets < htons(ocmsg->min_loss))
-      ocmsg->min_loss = htons(msgCnt - results[i].packets);
-
-    sum_time += results[i].time;
-    if(results[i].time > htons(ocmsg->max_time))
-      ocmsg->max_time = htons(results[i].time);
-
-    if(results[i].time < htons(ocmsg->min_time))
-      ocmsg->min_time = htons(results[i].time); 
+  sum_loss = 0;
+  sum_time = 0;
+  max_loss = 0;
+  min_loss = msgCnt;
+  min_time = 1 * cronYEARS;
+  max_time = 0;
+  /* data post-processing */
+  for (iteration=0;iteration<iterations;iteration++) {
+    sum_loss += results[iteration].lossCount;
+    sum_time += results[iteration].totalTime;
+    
+    if (results[iteration].lossCount > max_loss)
+      max_loss = results[iteration].lossCount;
+    if (results[iteration].lossCount < min_loss)
+      min_loss = results[iteration].lossCount;
+    if (results[iteration].totalTime > max_time)
+      max_time = results[iteration].totalTime;
+    if (results[iteration].totalTime < min_time)
+      min_time = results[iteration].totalTime;
   } 
-  ocmsg->mean_loss = ((float)sum_loss/(float)msgIter);
-  ocmsg->mean_time = ((float)sum_time/(float)msgIter);
-  
+ 
   sum_variance_time = 0.0;
   sum_variance_loss = 0.0;
-  for(i = 0; i < msgIter; i++){
-    LOG(LOG_DEBUG,
-       "TBENCH: iteration=%d msgIter=%d\n", 
-       i,
-       msgIter);
-    sum_variance_time += (results[i].time - ocmsg->mean_time)*
-      (results[i].time - ocmsg->mean_time); 
-
-    sum_variance_loss += ((msgCnt - results[i].packets) - ocmsg->mean_loss)*
-      ((msgCnt - results[i].packets) - ocmsg->mean_loss); 
+  for(iteration = 0; iteration <iterations; iteration++){
+    sum_variance_time += 
+      (results[iteration].totalTime - sum_time/iterations) *
+      (results[iteration].totalTime - sum_time/iterations);
+    sum_variance_loss += 
+      (results[iteration].lossCount - sum_loss/iterations) *
+      (results[iteration].lossCount - sum_loss/iterations);
   }
-  ocmsg->variance_time = sum_variance_time/(msgIter-1);
-  ocmsg->variance_loss = sum_variance_loss/(msgIter-1);
-
-  ocmsg->header.size = htons(sizeof(TBENCH_CS_MESSAGE));
-  ocmsg->header.type = htons(TBENCH_CS_PROTO_REPLY);
-
-  LOG(LOG_DEBUG, 
-      "calling writeToSocket\n");
-  if (SYSERR == coreAPI->sendToClient(client,
-                                     &ocmsg->header))
-    return SYSERR;
-  FREE(opmsg);
-  FREE(ocmsg);
+  
+  /* send collected stats back to client */
+  reply.header.size = htons(sizeof(TBENCH_CS_REPLY));
+  reply.header.type = htons(TBENCH_CS_PROTO_REPLY);
+  reply.max_loss = htonl(max_loss);
+  reply.min_loss = htonl(min_loss);
+  reply.mean_loss = ((float)sum_loss/(float)iterations);
+  reply.mean_time = ((float)sum_time/(float)iterations);
+  reply.max_time = htonll(max_time);
+  reply.min_time = htonll(min_time);
+  reply.variance_time = sum_variance_time/(iterations-1);
+  reply.variance_loss = sum_variance_loss/(iterations-1);
   FREE(results);
-  LOG(LOG_DEBUG,
-      "finishing benchmark\n");
-  return OK;
+  return coreAPI->sendToClient(client,
+                              &reply.header);
 }
 
 /**
@@ -258,7 +432,6 @@
   int ok = OK;
 
   MUTEX_CREATE(&lock);
-  MUTEX_CREATE(&lockCnt);
   coreAPI = capi;
   if (SYSERR == capi->registerHandler(TBENCH_p2p_PROTO_REPLY,
                                      &handleTBenchReply))
@@ -267,7 +440,7 @@
                                      &handleTBenchReq))
     ok = SYSERR;
   if (SYSERR == capi->registerClientHandler(TBENCH_CS_PROTO_REQUEST,
-                                           (CSHandler)&csHandleTBenchRequest))
+                                           &csHandleTBenchRequest))
     ok = SYSERR;
   return ok;
 }
@@ -278,9 +451,8 @@
   coreAPI->unregisterHandler(TBENCH_p2p_PROTO_REPLY,
                             &handleTBenchReply);
   coreAPI->unregisterClientHandler(TBENCH_CS_PROTO_REQUEST,
-                                  (CSHandler)&csHandleTBenchRequest);
+                                  &csHandleTBenchRequest);
   MUTEX_DESTROY(&lock);
-  MUTEX_DESTROY(&lockCnt);
   coreAPI = NULL;
 }
 

Modified: GNUnet/src/applications/tbench/tbench.h
===================================================================
--- GNUnet/src/applications/tbench/tbench.h     2005-03-08 04:07:02 UTC (rev 
408)
+++ GNUnet/src/applications/tbench/tbench.h     2005-03-08 11:57:22 UTC (rev 
409)
@@ -19,47 +19,70 @@
 */
 
 /**
+ * @file applications/tbench/tbench.h
  * @author Christian Grothoff
- * @file applications/tbench/tbench.h
- **/
+ */
 #ifndef TBENCH_TBENCH_H
 #define TBENCH_TBENCH_H
 
 #include "gnunet_core.h"
 
-#define TBENCH_MSG_LENGTH 1024
-
+/**
+ * Client requests peer to perform some profiling.
+ */
 typedef struct {
-  p2p_HEADER header; 
-  unsigned int iterationNum;
-  unsigned int packetNum;
-} TBENCH_p2p_MESSAGE;
-
-typedef struct {
-  TBENCH_p2p_MESSAGE p2p_message;
-  char message[1];
-} TBENCH_p2p_MESSAGE_GENERIC;
-
-typedef struct {
   CS_HEADER header;
+  /**
+   * How big is each message (plus headers).
+   * Note that GNUnet is limited to 64k messages.
+   */
   unsigned int msgSize;
+  /**
+   * How many messages should be transmitted in 
+   * each iteration?
+   */
   unsigned int msgCnt;
+  /**
+   * How many iterations should be performed?
+   */
   unsigned int iterations;
+  /**
+   * Which peer should receive the messages?
+   */
   PeerIdentity receiverId;
-  unsigned int intPktSpace;    /* Inter packet space in milliseconds */
+  /**
+   * Inter packet space in milliseconds (delay
+   * introduced when sending messages).
+   */
+  cron_t intPktSpace;
+  /**
+   * Time to wait for the arrival of all repies
+   * in one iteration.
+   */
+  cron_t timeOut;              
+  /**
+   * intPktSpace delay is only introduced every
+   * trainSize messages.
+   */
   unsigned int trainSize;
-  unsigned int timeOut;                /* Time to wait for the arrival of a 
reply in secs */
+  /**
+   * Which priority should be used?
+   */
+  unsigned int priority;
 } TBENCH_CS_MESSAGE;
 
+/**
+ * Response from server with statistics.
+ */
 typedef struct {
   CS_HEADER header;
-  int max_loss;
-  int min_loss;
+  unsigned int max_loss;
+  unsigned int min_loss;
   float mean_loss;
   float variance_loss;
   
-  int max_time;
-  int min_time;
+  cron_t max_time;
+  cron_t min_time;
   float mean_time;
   float variance_time;  
 } TBENCH_CS_REPLY;

Modified: GNUnet/src/applications/tbench/tbenchtest.c
===================================================================
--- GNUnet/src/applications/tbench/tbenchtest.c 2005-03-08 04:07:02 UTC (rev 
408)
+++ GNUnet/src/applications/tbench/tbenchtest.c 2005-03-08 11:57:22 UTC (rev 
409)
@@ -30,6 +30,11 @@
 #include "tbench.h"
 #include <sys/wait.h>
 
+/**
+ * Set this to NO when debugging gnunetd processes separately.
+ */
+#define DO_FORK NO
+
 static int parseOptions(int argc,
                        char ** argv) {
   FREENONNULL(setConfigurationString("GNUNETD",
@@ -44,28 +49,30 @@
 static PeerIdentity peer2;
 
 static int test(GNUNET_TCP_SOCKET * sock,
-               unsigned short messageSize,
-               unsigned short messageCnt,
-               unsigned short messageIterations,
-               unsigned short messageSpacing,
-               unsigned short messageTrainSize,
-               unsigned int messageTimeOut /* in milli-seconds */) {
+               unsigned int messageSize,
+               unsigned int messageCnt,
+               unsigned int messageIterations,
+               cron_t messageSpacing,
+               unsigned int messageTrainSize,
+               cron_t messageTimeOut /* in milli-seconds */) {
   int ret;
   TBENCH_CS_MESSAGE msg;
   TBENCH_CS_REPLY * buffer;
   float messagesPercentLoss;
 
-  memset(&msg,
-        0,
-        sizeof(TBENCH_CS_MESSAGE));
+  printf(_("Using %u messages of size %u for %u times.\n"),
+        messageCnt, 
+        messageSize, 
+        messageIterations);
   msg.header.size = htons(sizeof(TBENCH_CS_MESSAGE));
   msg.header.type = htons(TBENCH_CS_PROTO_REQUEST);
-  msg.msgSize     = htons(messageSize);
-  msg.msgCnt      = htons(messageCnt);
-  msg.iterations  = htons(messageIterations);
-  msg.intPktSpace = htons(messageSpacing);
-  msg.trainSize   = htons(messageTrainSize);
-  msg.timeOut     = htonl(messageTimeOut);
+  msg.msgSize     = htonl(messageSize);
+  msg.msgCnt      = htonl(messageCnt);
+  msg.iterations  = htonl(messageIterations);
+  msg.intPktSpace = htonll(messageSpacing);
+  msg.trainSize   = htonl(messageTrainSize);
+  msg.timeOut     = htonll(messageTimeOut);
+  msg.priority    = htonl(5);
   msg.receiverId  = peer2;
   
   if (SYSERR == writeToSocket(sock,
@@ -80,14 +87,14 @@
     } else {
       messagesPercentLoss = (buffer->mean_loss/((float)htons(msg.msgCnt)));
     }
-    printf(_("Times: max %8d  min %8d  mean %8.4f  variance %8.4f\n"),
-          htons(buffer->max_time),
-          htons(buffer->min_time),
+    printf(_("Times: max %16llu  min %16llu  mean %12.3f  variance %12.3f\n"),
+          ntohll(buffer->max_time),
+          ntohll(buffer->min_time),
           buffer->mean_time,
           buffer->variance_time);
-    printf(_("Loss:  max %8d  min %8d  mean %8.4f  variance %8.4f\n"),
-          htons(buffer->max_loss),
-          htons(buffer->min_loss),
+    printf(_("Loss:  max %16u  min %16u  mean %12.3f  variance %12.3f\n"),
+          ntohl(buffer->max_loss),
+          ntohl(buffer->min_loss),
           buffer->mean_loss,
           buffer->variance_loss); 
   } else {
@@ -109,6 +116,27 @@
   return OK;
 }
 
+static int checkConnected(GNUNET_TCP_SOCKET * sock) {
+  int left;
+  int ret;
+
+  ret = 0;
+  left = 30; /* how many iterations should we wait? */
+  while (OK == requestStatistics(sock,
+                                &waitForConnect,
+                                NULL)) {
+    printf(_("Waiting for peers to connect (%u iterations left)...\n"), 
+          left);
+    sleep(5);
+    left--;
+    if (left == 0) {
+      ret = 1;
+      break;
+    }
+  }
+  return ret;
+}
+
 /**
  * Testcase to test p2p communications.
  *
@@ -117,19 +145,22 @@
  * @return 0: ok, -1: error
  */   
 int main(int argc, char ** argv) {
+#if DO_FORK
   pid_t daemon1;
   pid_t daemon2;
+  int status;
+#endif
   int ret;
   int left;
-  int status;
   GNUNET_TCP_SOCKET * sock;
+  int i;
 
   GNUNET_ASSERT(OK ==
                enc2hash("BV3AS3KMIIBVIFCGEG907N6NTDTH26B7T6FODUSLSGK"
                         "5B2Q58IEU1VF5FTR838449CSHVBOAHLDVQAOA33O77F"
                         "OPDA8F1VIKESLSNBO",
                         &peer2.hashPubKey));
-
+#if DO_FORK
   daemon1 = fork();
   if (daemon1 == 0) {
     if (0 != execlp("gnunetd", /* what binary to execute, must be in $PATH! */
@@ -138,7 +169,7 @@
                    "-c",
                    "peer1.conf", /* configuration file */
                    NULL)) {
-      fprintf(stderr,
+     fprintf(stderr,
              _("'%s' failed: %s\n"),
              "execlp",
              STRERROR(errno));
@@ -209,6 +240,7 @@
     }
   }
   sleep(5);
+#endif
   
   ret = 0;
   left = 5;
@@ -228,18 +260,23 @@
     }
   } while (sock == NULL);
 
+  ret = checkConnected(sock);
+  printf(_("Running benchmark...\n"));
+  /* 'slow' pass: wait for bandwidth negotiation! */
   if (ret == 0)
-    ret = test(sock, 4, 1, 1, 1, 1, 5000);
-  if (ret == 0)
-    ret = test(sock, 50, 64, 40, 50, 10, 10000);
-  if (ret == 0)
-    ret = test(sock, 1024, 64, 4, 0, 1, 10000);
-  if (ret == 0)
-    ret = test(sock, 32*1024, 8, 4, 0, 1, 30000);
-  
+    ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 30 * cronSECONDS);
+  checkConnected(sock);  
+  /* 'blast' pass: hit bandwidth limits! */
+  for (i=8;i<60000;i*=2) {
+    if (ret == 0)
+      ret = test(sock, i, 1+1024/i, 4, 10 * cronMILLIS, 2, 2 * cronSECONDS);
+    checkConnected(sock);
+  }
+  ret = test(sock, i, 10, 10, 500 * cronMILLIS, 1, 10 * cronSECONDS);
   releaseClientSocket(sock);
   doneUtil();
 
+#if DO_FORK
   if (daemon1 != -1) {
     if (0 != kill(daemon1, SIGTERM))
       DIE_STRERROR("kill");
@@ -252,6 +289,7 @@
     if (daemon2 != waitpid(daemon2, &status, 0)) 
       DIE_STRERROR("waitpid");
   }
+#endif
   return ret;
 }
 

Modified: GNUnet/src/applications/topology_default/topology.c
===================================================================
--- GNUnet/src/applications/topology_default/topology.c 2005-03-08 04:07:02 UTC 
(rev 408)
+++ GNUnet/src/applications/topology_default/topology.c 2005-03-08 11:57:22 UTC 
(rev 409)
@@ -264,6 +264,7 @@
 provide_module_topology_default(CoreAPIForApplication * capi) {
   static Topology_ServiceAPI api;
   char * data;
+  unsigned int len;
 
   coreAPI = capi;
   identity = capi->requestService("identity");
@@ -293,14 +294,15 @@
             5 * cronSECONDS,
             NULL);
 
-  if (-1 == stateReadContent(TOPOLOGY_TAG_FILE,
-                            (void**) &data)) {
+  if (-1 == (len = stateReadContent(TOPOLOGY_TAG_FILE,
+                                   (void**) &data))) {
     stateWriteContent(TOPOLOGY_TAG_FILE,
                      strlen(PACKAGE_VERSION),
                      PACKAGE_VERSION);    
   } else {
-    if (0 != strcmp(PACKAGE_VERSION,
-                   data)) {
+    if (0 != strncmp(PACKAGE_VERSION,
+                    data,
+                    len)) {
       LOG(LOG_FAILURE,
          _("Version mismatch ('%s' vs. '%s'), run gnunet-update!\n"),
          PACKAGE_VERSION,

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/src/server/connection.c      2005-03-08 11:57:22 UTC (rev 409)
@@ -838,16 +838,9 @@
   }  
   if (be->status != STAT_UP)
     return; /* status is not up, cannot send! */
-  if (be->sendBufferSize == 0) {
-#if DEBUG_CONNECTION
-    LOG(LOG_DEBUG,
-       "Message queue empty.  Nothing transmitted.\n");
-#endif
-    return; /* nothing to send */    
-  }
+  if (be->sendBufferSize == 0) 
+    return; /* nothing to send */
 
-
-
   /* recompute max send frequency */
   if (be->max_bpm <= 0)
     be->max_bpm = 1;
@@ -870,7 +863,7 @@
 
   if ( (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) &&
        (be->sendBufferSize < MAX_SEND_BUFFER_SIZE/4) ) {
-#if DEBUG_CONNECTION 
+#if DEBUG_CONNECTION
     LOG(LOG_DEBUG,
        "Send frequency too high (CPU load), send deferred.\n");
 #endif
@@ -879,7 +872,7 @@
 
   /* test if receiver has enough bandwidth available!  */
   updateCurBPS(be);
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION 
   LOG(LOG_DEBUG,
       "receiver window available: %lld bytes (MTU: %u)\n",
       be->available_send_window,
@@ -897,7 +890,7 @@
     i = 0;
     /* assumes entries are sorted by priority! */
     while (i < be->sendBufferSize) {
-      if ( (totalMessageSize + entries[i]->len < 60000) &&
+      if ( (totalMessageSize + entries[i]->len < MAX_BUFFER_SIZE) &&
           (entries[i]->pri >= EXTREME_PRIORITY) ) {
        knapsackSolution[i] = YES;
        priority += entries[i]->pri;
@@ -906,17 +899,18 @@
        break;
       }
       i++;
-    }    
+    }        
     while ( (i < be->sendBufferSize) &&
            (be->available_send_window > totalMessageSize) ) {
-      if (entries[i]->len + totalMessageSize <=
-         be->available_send_window) {
+      if ( (entries[i]->len + totalMessageSize <=
+           be->available_send_window) && 
+          (totalMessageSize + entries[i]->len < MAX_BUFFER_SIZE) ) {
        knapsackSolution[i] = YES;
        totalMessageSize += entries[i]->len;
        priority += entries[i]->pri;
       } else {
        knapsackSolution[i] = NO;
-       if (totalMessageSize == 0) {
+       if (totalMessageSize == sizeof(P2P_Message)) {    
          /* if the highest-priority message does not yet
             fit, wait for send window to grow so that
             we can get it out (otherwise we would starve
@@ -927,6 +921,15 @@
       }
       i++;
     }
+    if ( (totalMessageSize == sizeof(P2P_Message)) ||
+        ( (priority < EXTREME_PRIORITY) &&
+          ((totalMessageSize / sizeof(P2P_Message)) < 4) &&
+          (randomi(16) != 0) ) ) {
+      /* randomization necessary to ensure we eventually send
+        the message if there is nothing else to do! */
+      FREE(knapsackSolution);
+      return;
+    }      
   } else { /* if (be->session.mtu == 0) */
     /* solve knapsack problem, compute accumulated priority */
     knapsackSolution = MALLOC(sizeof(int) * be->sendBufferSize);
@@ -989,7 +992,7 @@
         which  has EXTREME_PRIORITY) */
       if (priority < EXTREME_PRIORITY) {
        FREE(knapsackSolution);
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION 
        LOG(LOG_DEBUG,
            "bandwidth limits prevent sending (send window %u too small).\n",
            be->available_send_window);
@@ -1008,7 +1011,7 @@
     int msgCap;
     FREE(knapsackSolution);
     cronTime(&be->lastSendAttempt);
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION 
     LOG(LOG_DEBUG,
        "policy prevents sending message (priority too low: %d)\n",
        priority);
@@ -1027,7 +1030,7 @@
       if (be->sendBufferSize <= msgCap)
        break;
       if ( entry->transmissionTime < expired) {
-#if DEBUG_CONNECTION 
+#if DEBUG_CONNECTION
        LOG(LOG_DEBUG,
            "expiring message, expired %ds ago, queue size is %u (bandwidth 
stressed)\n",
            (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
@@ -1046,7 +1049,7 @@
   }
   
   /* build message (start with sequence number) */
-  GNUNET_ASSERT(totalMessageSize >= sizeof(P2P_Message));
+  GNUNET_ASSERT(totalMessageSize > sizeof(P2P_Message));
   plaintextMsg = MALLOC(totalMessageSize);
   p2pHdr = (P2P_Message*) plaintextMsg;
   p2pHdr->timeStamp 
@@ -1186,23 +1189,11 @@
   hash(&p2pHdr->sequenceNumber,
        p - sizeof(HashCode512),
        (HashCode512*) encryptedMsg);
-#if DEBUG_CONNECTION
-  LOG(LOG_DEBUG,
-      "Encrypting with key %u and IV %u\n",
-      *(int*) &be->skey_local,
-      *(int*) &encryptedMsg /* IV */);
-#endif
   encryptBlock(&p2pHdr->sequenceNumber,
               p - sizeof(HashCode512),
               &be->skey_local,
               (const INITVECTOR*) encryptedMsg, /* IV */
               &((P2P_Message*)encryptedMsg)->sequenceNumber);
-#if DEBUG_CONNECTION
-  LOG(LOG_DEBUG,
-      "calling transport layer to send %d bytes with crc %x\n",
-      p,
-      crc);
-#endif
   if (! ( (SYSERR == transport->send(be->session.tsession,
                                     encryptedMsg,
                                     p)) &&
@@ -1264,7 +1255,7 @@
        (se->len > be->session.mtu - sizeof(P2P_Message)) ) {
     /* this message is so big that it must be fragmented! */
     fragmentation->fragment(&be->session.sender,
-                           be->session.mtu,
+                           be->session.mtu - sizeof(P2P_Message),
                            se->pri,
                            se->transmissionTime,
                            se->len,
@@ -1368,14 +1359,14 @@
                             int establishSession) {
   BufferEntry * root;
   BufferEntry * prev;
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION 
   EncName enc;
 
-  IFLOG(LOG_INFO,
+  IFLOG(LOG_EVERYTHING,
        hash2enc(&hostId->hashPubKey, 
                 &enc));
-  LOG(LOG_INFO, 
-      "Adding host %s to the connection table.\n",
+  LOG(LOG_EVERYTHING, 
+      "Adding host '%s' to the connection table.\n",
       &enc);
 #endif
 
@@ -1656,8 +1647,7 @@
      and then merge the values; but for now, let's just go
      hardcore and adjust all values rapidly */
   for (u=0;u<activePeerCount;u++) {
-    entries[u]->idealized_limit = 0;
-    adjustedRR[u] = entries[u]->recently_received * cronMINUTES / 
timeDifference;
+    adjustedRR[u] = entries[u]->recently_received * cronMINUTES / 
timeDifference / 2;
 
 #if DEBUG_CONNECTION
     if (adjustedRR[u] > entries[u]->idealized_limit) {
@@ -1666,7 +1656,7 @@
            hash2enc(&entries[u]->session.sender.hashPubKey,
                     &enc));
       LOG(LOG_INFO,
-         "peer %s transmitted above limit: %llu bpm > %u bpm\n",
+         "peer '%s' transmitted above limit: %llu bpm > %u bpm\n",
          &enc,
          adjustedRR[u],
          entries[u]->idealized_limit);
@@ -1678,15 +1668,14 @@
      */
     if (adjustedRR[u] > 2 * MAX_BUF_FACT * 
        entries[u]->max_transmitted_limit) {
-#if DEBUG_CONNECTION || 1
+#if DEBUG_CONNECTION
       EncName enc;
       IFLOG(LOG_INFO,
            hash2enc(&entries[u]->session.sender.hashPubKey,
                     &enc));
       LOG(LOG_INFO,
-         "blacklisting %s, it sent >%dx+MTU above mLimit: %llu bpm > %u bpm 
(cLimit %u bpm)\n",
+         "blacklisting '%s': sent %llu bpm (limit %u bpm, target %u bpm)\n",
          &enc,
-         2 * MAX_BUF_FACT,
          adjustedRR[u],
          entries[u]->max_transmitted_limit,
          entries[u]->idealized_limit);
@@ -1696,9 +1685,9 @@
                              1, /* FIXME: 1? */
                              YES);
       activePeerCount--;
-      entries[u]=entries[activePeerCount];
-      shares[u]=shares[activePeerCount];
-      adjustedRR[u]=adjustedRR[activePeerCount];
+      entries[u]    = entries[activePeerCount];
+      shares[u]     = shares[activePeerCount];
+      adjustedRR[u] = adjustedRR[activePeerCount];
       u--;
     }
     
@@ -1707,11 +1696,6 @@
                                             at least MIN_BPM_PER_PEER */
   }
 
-#if DEBUG_CONNECTION
-  LOG(LOG_DEBUG,
-      "freely schedulable bandwidth is %d bpm\n",
-      schedulableBandwidth);
-#endif
   /* now distribute the schedulableBandwidth according
      to the shares.  Note that since we cap peers at twice
      of what they transmitted last, we may not be done with
@@ -1800,7 +1784,7 @@
        entries[u]->idealized_limit);
 #endif
     entries[u]->current_connection_value /= 2.0;
-    entries[u]->recently_received = 0;
+    entries[u]->recently_received /= 2;
   }
 
   /* free memory */
@@ -1952,12 +1936,6 @@
     return SYSERR; /* could not decrypt */
   }
   tmp = MALLOC(size - sizeof(HashCode512));
-#if DEBUG_CONNECTION
-  LOG(LOG_DEBUG,
-      "Decrypting with key %u and IV %u\n",
-      *(int*) &be->skey_remote,
-      *(int*) &msg->hash);
-#endif
   res = decryptBlock(&be->skey_remote, 
                     &msg->sequenceNumber,
                     size - sizeof(HashCode512),
@@ -2092,10 +2070,6 @@
   BufferEntry * be;
 
   MUTEX_LOCK(&lock);
-  LOG(LOG_DEBUG,
-      "Assigning session key %u %s\n",
-      *(int*)key,
-      forSending == YES ? "for sending" : "for receiving");
   be = lookForHost(peer);
   if (be == NULL)
     be = addHost(peer, NO);

Modified: GNUnet/src/server/core.c
===================================================================
--- GNUnet/src/server/core.c    2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/src/server/core.c    2005-03-08 11:57:22 UTC (rev 409)
@@ -31,7 +31,7 @@
 #include "tcpserver.h"
 #include "core.h"
 
-#define DEBUG_CORE YES
+#define DEBUG_CORE NO
 
 /**
  * Linked list of loaded protocols (for clean shutdown).

Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/src/transports/tcp.c 2005-03-08 11:57:22 UTC (rev 409)
@@ -63,7 +63,10 @@
 typedef struct {
   /**
    * size of the message, in bytes, including this header; 
-   * max 65536-header (network byte order) 
+   * max 65535; we do NOT want to make this field an int
+   * because then a malicious peer could cause us to allocate
+   * lots of memory -- this bounds it by 64k/peer. 
+   * Field is in network byte order.
    */
   unsigned short size;
 
@@ -417,102 +420,98 @@
   }
   incrementBytesReceived(ret);
   tcpSession->pos += ret;
-  len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size);
-  if (len > tcpSession->rsize) /* if message larger than read buffer, grow! */
-    GROW(tcpSession->rbuff,
-        tcpSession->rsize,
-        len);
+
+  while (tcpSession->pos > 2) {
+    len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size) + 
sizeof(TCPMessagePack);
+    if (len > tcpSession->rsize) /* if message larger than read buffer, grow! 
*/
+      GROW(tcpSession->rbuff,
+          tcpSession->rsize,
+          len);
 #if DEBUG_TCP
-  LOG(LOG_DEBUG,
-      "Read %d bytes on socket %d, expecting %d for full message\n",
-      tcpSession->pos,
-      tcpSession->sock, 
-      len);
+    LOG(LOG_DEBUG,
+       "Read %d bytes on socket %d, expecting %d for full message\n",
+       tcpSession->pos,
+       tcpSession->sock, 
+       len);
 #endif
-  if ( (tcpSession->pos < 2) ||
-       (tcpSession->pos < len) ) {
-    tcpDisconnect(tsession);
-    return OK;
-  }
- 
-  /* complete message received, let's check what it is */
-  if (YES == tcpSession->expectingWelcome) {
-    TCPWelcome * welcome;
-#if DEBUG_TCP
-    EncName enc;
+    if (tcpSession->pos < len) {
+      tcpDisconnect(tsession);
+      return OK;
+    }
+    
+    /* complete message received, let's check what it is */
+    if (YES == tcpSession->expectingWelcome) {
+      TCPWelcome * welcome;
+#if DEBUG_TCP 
+      EncName enc;
 #endif
+      
+      welcome = (TCPWelcome*) &tcpSession->rbuff[0];
+      if ( (ntohs(welcome->version) != 0) ||
+          (ntohs(welcome->size) != sizeof(TCPWelcome)) ) {
+       LOG(LOG_WARNING,
+           _("Expected welcome message on tcp connection, got garbage. 
Closing.\n"));
+       tcpDisconnect(tsession);
+       return SYSERR;
+      }
+      tcpSession->expectingWelcome = NO;
+      tcpSession->sender = welcome->clientIdentity;
+#if DEBUG_TCP 
+      IFLOG(LOG_DEBUG,
+           hash2enc(&tcpSession->sender.hashPubKey,
+                    &enc));
+      LOG(LOG_DEBUG,
+         "tcp welcome message from %s received\n",
+         &enc);
+#endif
+      memmove(&tcpSession->rbuff[0],
+             &tcpSession->rbuff[sizeof(TCPWelcome)],
+             tcpSession->pos - sizeof(TCPWelcome));
+      tcpSession->pos -= sizeof(TCPWelcome); 
+      len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size) + 
sizeof(TCPMessagePack);
+    } 
+    if ( (tcpSession->pos < 2) ||
+        (tcpSession->pos < len) ) {
+      tcpDisconnect(tsession);
+      return OK;
+    }
     
-    welcome = (TCPWelcome*) &tcpSession->rbuff[0];
-    if ( (ntohs(welcome->version) != 0) ||
-        (ntohs(welcome->size) != sizeof(TCPWelcome)) ) {
+    pack = (TCPMessagePack*)&tcpSession->rbuff[0];
+    /* send msg to core! */
+    if (len <= sizeof(TCPMessagePack)) {
       LOG(LOG_WARNING,
-         _("Expected welcome message on tcp connection, got garbage. 
Closing.\n"));
+         _("Received malformed message (size %u) from tcp-peer connection. 
Closing.\n"),
+         len);
       tcpDisconnect(tsession);
       return SYSERR;
     }
-    tcpSession->expectingWelcome = NO;
-    tcpSession->sender = welcome->clientIdentity;
+    mp      = MALLOC(sizeof(MessagePack));
+    mp->msg = MALLOC(len - sizeof(TCPMessagePack));
+    memcpy(mp->msg,
+          &pack[1],
+          len - sizeof(TCPMessagePack));
+    mp->sender   = tcpSession->sender;
+    mp->size     = len - sizeof(TCPMessagePack);
+    mp->tsession = tsession;
 #if DEBUG_TCP
-    IFLOG(LOG_DEBUG,
-         hash2enc(&tcpSession->sender.hashPubKey,
-                  &enc));
     LOG(LOG_DEBUG,
-       "tcp welcome message from %s received\n",
-       &enc);
+       "tcp transport received %u bytes, forwarding to core\n",
+       mp->rsize);
 #endif
+    coreAPI->receive(mp);
+    /* finally, shrink buffer adequately */
     memmove(&tcpSession->rbuff[0],
-           &tcpSession->rbuff[sizeof(TCPWelcome)],
-           tcpSession->pos - sizeof(TCPWelcome));
-    tcpSession->pos -= sizeof(TCPWelcome); 
-    len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size);
-  } 
-  if ( (tcpSession->pos < 2) ||
-       (tcpSession->pos < len) ) {
-    tcpDisconnect(tsession);
-    return OK;
+           &tcpSession->rbuff[len],
+           tcpSession->pos - len);
+    tcpSession->pos -= len;
+    if ( (tcpSession->pos * 4 < tcpSession->rsize) &&
+        (tcpSession->rsize > 4 * 1024) ) {
+      /* read buffer far too large, shrink! */
+      GROW(tcpSession->rbuff,
+          tcpSession->rsize,
+          tcpSession->pos + 1024);
+    }
   }
-     
-  pack = (TCPMessagePack*)&tcpSession->rbuff[0];
-  /* send msg to core! */
-  if (len <= sizeof(TCPMessagePack)) {
-    LOG(LOG_WARNING,
-       _("Received malformed message from tcp-peer connection. Closing.\n"));
-    tcpDisconnect(tsession);
-    return SYSERR;
-  }
-  mp      = MALLOC(sizeof(MessagePack));
-  mp->msg = MALLOC(len);
-  memcpy(mp->msg,
-        &pack[1],
-        len - sizeof(TCPMessagePack));
-  mp->sender   = tcpSession->sender;
-  mp->size     = len - sizeof(TCPMessagePack);
-  mp->tsession = tsession;
-#if DEBUG_TCP
-  LOG(LOG_DEBUG,
-      "tcp transport received %u bytes, forwarding to core\n",
-      mp->rsize);
-#endif
-  coreAPI->receive(mp);
-
-  if (tcpSession->pos < len) { 
-    BREAK();
-    tcpDisconnect(tsession);
-    return SYSERR;
-  }
-  /* finally, shrink buffer adequately */
-  memmove(&tcpSession->rbuff[0],
-         &tcpSession->rbuff[len],
-         tcpSession->pos - len);
-  tcpSession->pos -= len;
-  if ( (tcpSession->pos * 4 < tcpSession->rsize) &&
-       (tcpSession->rsize > 4 * 1024) ) {
-    /* read buffer far too large, shrink! */
-    GROW(tcpSession->rbuff,
-        tcpSession->rsize,
-        tcpSession->pos + 1024);
-  }
-  
   tcpDisconnect(tsession);
   return OK;
 }
@@ -789,8 +788,7 @@
 static int tcpDirectSend(TCPSession * tcpSession,
                         void * mp,
                         unsigned int ssize) {
-  int ok;
-  int ret;
+  size_t ret;
   int success;
 
   if (tcp_shutdown == YES)
@@ -806,54 +804,42 @@
     BREAK(); /* size 0 not allowed */
     return SYSERR;
   }
-  ok = SYSERR;
   MUTEX_LOCK(&tcplock);
   if (tcpSession->wpos > 0) {
     /* select already pending... */
-    ret = 0;
-  } else {
-    success = SEND_NONBLOCKING(tcpSession->sock,
-                              mp,
-                              ssize,
-                              &ret);
-    if (success == SYSERR) {
-      LOG_STRERROR(LOG_INFO, "send");
-      MUTEX_UNLOCK(&tcplock);
-      return SYSERR;
-    } else if (success == NO)
-      ret = 0;
+    MUTEX_UNLOCK(&tcplock);
+    return SYSERR;
   }
-  if ((unsigned int)ret <= ssize) { /* some bytes send or blocked */
-    if ((unsigned int)ret < ssize) {
-      if (tcpSession->wbuff == NULL) {
-       tcpSession->wsize = ssize + sizeof(TCPMessagePack);
-       tcpSession->wbuff = MALLOC(tcpSession->wsize);
-       tcpSession->wpos  = 0;
-      }
-      if (ssize + tcpSession->wpos - ret > 
-         tcpSession->wsize) {
-       ssize = 0;
-       ok = SYSERR; /* buffer full, drop */
-      } else {
-       memcpy(&tcpSession->wbuff[tcpSession->wpos],
-              mp,
-              ssize - ret);
-       tcpSession->wpos += ssize - ret;
-       if (tcpSession->wpos == ssize - ret)
-         signalSelect(); /* select set changed! */
-       ok = OK; /* all buffered */
-      }      
-    } else 
-      ok = OK; /* all written */
-  } else {
-    LOG_STRERROR(LOG_WARNING, "send");
-    ssize = 0;
-    ok = SYSERR; /* write failed for real */
+  success = SEND_NONBLOCKING(tcpSession->sock,
+                            mp,
+                            ssize,
+                            &ret);
+  if (success == SYSERR) {
+#if DEBUG_TCP
+    LOG_STRERROR(LOG_INFO, "send");
+#endif
+    MUTEX_UNLOCK(&tcplock);
+    return SYSERR;
   }
+  if (success == NO)
+    ret = 0;
+ 
+  if (ret < ssize) {/* partial send */
+    if (tcpSession->wsize < ssize - ret) {
+      GROW(tcpSession->wbuff,
+          tcpSession->wsize,
+          ssize - ret);
+    }
+    memcpy(tcpSession->wbuff,
+          mp,
+          ssize - ret);
+    tcpSession->wpos = ssize - ret;
+    signalSelect(); /* select set changed! */
+  }
   MUTEX_UNLOCK(&tcplock);
   cronTime(&tcpSession->lastUse);
   incrementBytesSent(ssize);
-  return ok;
+  return OK;
 }
 
 /**
@@ -918,8 +904,9 @@
                           const unsigned int size) {
   TCPMessagePack * mp;
   int ok;
-  int ssize;
   
+  if (size >= MAX_BUFFER_SIZE)
+    return SYSERR;
   if (tcp_shutdown == YES)
     return SYSERR;
   if (size == 0) {
@@ -932,12 +919,11 @@
   memcpy(&mp[1],
         msg,
         size);
-  ssize = size + sizeof(TCPMessagePack);
-  mp->size = htons(ssize);
+  mp->size = htons(size);
   mp->reserved = 0;  
   ok = tcpDirectSendReliable(tsession->internal,
                             mp,
-                            ssize);
+                            size + sizeof(TCPMessagePack));
   FREE(mp);
   return ok;
 }
@@ -1128,8 +1114,10 @@
                   const unsigned int size) {
   TCPMessagePack * mp;
   int ok;
-  int ssize;
-  
+ 
+  if (size >= MAX_BUFFER_SIZE)
+    return SYSERR;
+ 
   if (tcp_shutdown == YES)
     return SYSERR;
   if (size == 0) {
@@ -1142,12 +1130,11 @@
   memcpy(&mp[1],
         msg,
         size);
-  ssize = size + sizeof(TCPMessagePack);
-  mp->size = htons(ssize);
+  mp->size = htons(size);
   mp->reserved = 0;
   ok = tcpDirectSend(tsession->internal,
                     mp,
-                    ssize);
+                    size + sizeof(TCPMessagePack));
   FREE(mp);
   return ok;
 }

Modified: GNUnet/src/transports/tcp6.c
===================================================================
--- GNUnet/src/transports/tcp6.c        2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/src/transports/tcp6.c        2005-03-08 11:57:22 UTC (rev 409)
@@ -410,101 +410,103 @@
   }
   incrementBytesReceived(ret);
   tcp6Session->pos += ret;
-  len = ntohs(((TCP6MessagePack*)&tcp6Session->rbuff[0])->size);
-  if (len > tcp6Session->rsize) /* if MTU larger than expected, grow! */
-    GROW(tcp6Session->rbuff,
-        tcp6Session->rsize,
-        len);
+  
+  while (tcp6Session->pos > 2) {
+    len = ntohs(((TCP6MessagePack*)&tcp6Session->rbuff[0])->size) + 
sizeof(TCP6MessagePack);
+    if (len > tcp6Session->rsize) /* if MTU larger than expected, grow! */
+      GROW(tcp6Session->rbuff,
+          tcp6Session->rsize,
+          len);
 #if DEBUG_TCP6
-  LOG(LOG_DEBUG,
-      "Read %d bytes on socket %d, expecting %d for full message\n",
-      tcp6Session->pos,
-      tcp6Session->sock, 
-      len);
+    LOG(LOG_DEBUG,
+       "Read %d bytes on socket %d, expecting %d for full message\n",
+       tcp6Session->pos,
+       tcp6Session->sock, 
+       len);
 #endif
-  if ( (tcp6Session->pos < 2) ||
-       (tcp6Session->pos < len) ) {
-    tcp6Disconnect(tsession);
-    return OK;
-  }
- 
-  /* complete message received, let's check what it is */
-  if (YES == tcp6Session->expectingWelcome) {
-    TCP6Welcome * welcome;
+    if (tcp6Session->pos < len) {
+      tcp6Disconnect(tsession);
+      return OK;
+    }
+    
+    /* complete message received, let's check what it is */
+    if (YES == tcp6Session->expectingWelcome) {
+      TCP6Welcome * welcome;
 #if DEBUG_TCP6
-    EncName hex;
+      EncName hex;
 #endif
+      
+      welcome = (TCP6Welcome*) &tcp6Session->rbuff[0];
+      if ( (ntohs(welcome->version) != 0) ||
+          (ntohs(welcome->size) != sizeof(TCP6Welcome)) ) {
+       LOG(LOG_WARNING,
+           _("Expected welcome message on tcp connection, got garbage. 
Closing.\n"));
+       tcp6Disconnect(tsession);
+       return SYSERR;
+      }
+      tcp6Session->expectingWelcome = NO;
+      tcp6Session->sender = welcome->clientIdentity;
+#if DEBUG_TCP6
+      IFLOG(LOG_DEBUG,
+           hash2enc(&tcp6Session->sender.hashPubKey,
+                    &enc));
+      LOG(LOG_DEBUG,
+         "tcp6 welcome message from %s received\n",
+         &enc);
+#endif
+      memmove(&tcp6Session->rbuff[0],
+             &tcp6Session->rbuff[sizeof(TCP6Welcome)],
+             tcp6Session->pos - sizeof(TCP6Welcome));
+      tcp6Session->pos -= sizeof(TCP6Welcome); 
+      len = ntohs(((TCP6MessagePack*)&tcp6Session->rbuff[0])->size) + 
sizeof(TCP6MessagePack);
+    } 
+    if ( (tcp6Session->pos < 2) ||
+        (tcp6Session->pos < len) ) {
+      tcp6Disconnect(tsession);
+      return OK;
+    }
     
-    welcome = (TCP6Welcome*) &tcp6Session->rbuff[0];
-    if ( (ntohs(welcome->version) != 0) ||
-        (ntohs(welcome->size) != sizeof(TCP6Welcome)) ) {
+    pack = (TCP6MessagePack*)&tcp6Session->rbuff[0];
+    /* send msg to core! */
+    if (len <= sizeof(TCP6MessagePack)) {
       LOG(LOG_WARNING,
-         _("Expected welcome message on tcp connection, got garbage. 
Closing.\n"));
+         _("Received malformed message from tcp6-peer connection. Closing 
connection.\n"));
       tcp6Disconnect(tsession);
       return SYSERR;
     }
-    tcp6Session->expectingWelcome = NO;
-    tcp6Session->sender = welcome->clientIdentity;
+    mp      = MALLOC(sizeof(MessagePack));
+    mp->msg = MALLOC(len - sizeof(TCP6MessagePack));
+    memcpy(mp->msg,
+          &pack[1],
+          len - sizeof(TCP6MessagePack));
+    mp->sender   = tcp6Session->sender;
+    mp->size     = len - sizeof(TCP6MessagePack);
+    mp->tsession = tsession;
 #if DEBUG_TCP6
-    IFLOG(LOG_DEBUG,
-         hash2enc(&tcp6Session->sender.hashPubKey,
-                  &enc));
     LOG(LOG_DEBUG,
-       "tcp6 welcome message from %s received\n",
-       &enc);
+       "tcp6 transport received %d bytes, forwarding to core\n",
+       mp->size);
 #endif
+    coreAPI->receive(mp);
+    
+    if (tcp6Session->pos < len) { 
+      BREAK();
+      tcp6Disconnect(tsession);
+      return SYSERR;
+    }
+    /* finally, shrink buffer adequately */
     memmove(&tcp6Session->rbuff[0],
-           &tcp6Session->rbuff[sizeof(TCP6Welcome)],
-           tcp6Session->pos - sizeof(TCP6Welcome));
-    tcp6Session->pos -= sizeof(TCP6Welcome); 
-    len = ntohs(((TCP6MessagePack*)&tcp6Session->rbuff[0])->size);
-  } 
-  if ( (tcp6Session->pos < 2) ||
-       (tcp6Session->pos < len) ) {
-    tcp6Disconnect(tsession);
-    return OK;
+           &tcp6Session->rbuff[len],
+           tcp6Session->pos - len);
+    tcp6Session->pos -= len;      
+    if ( (tcp6Session->pos * 4 < tcp6Session->rsize) &&
+        (tcp6Session->rsize > 4 * 1024) ) {
+      /* read buffer far too large, shrink! */
+      GROW(tcp6Session->rbuff,
+          tcp6Session->rsize,
+          tcp6Session->pos + 1024);
+    }  
   }
-     
-  pack = (TCP6MessagePack*)&tcp6Session->rbuff[0];
-  /* send msg to core! */
-  if (len <= sizeof(TCP6MessagePack)) {
-    LOG(LOG_WARNING,
-       _("Received malformed message from tcp6-peer connection. Closing 
connection.\n"));
-    tcp6Disconnect(tsession);
-    return SYSERR;
-  }
-  mp      = MALLOC(sizeof(MessagePack));
-  mp->msg = MALLOC(len);
-  memcpy(mp->msg,
-        &pack[1],
-        len - sizeof(TCP6MessagePack));
-  mp->sender   = tcp6Session->sender;
-  mp->size     = len - sizeof(TCP6MessagePack);
-  mp->tsession = tsession;
-#if DEBUG_TCP6
-  LOG(LOG_DEBUG,
-      "tcp6 transport received %d bytes, forwarding to core\n",
-      mp->size);
-#endif
-  coreAPI->receive(mp);
-
-  if (tcp6Session->pos < len) { 
-    BREAK();
-    tcp6Disconnect(tsession);
-    return SYSERR;
-  }
-  /* finally, shrink buffer adequately */
-  memmove(&tcp6Session->rbuff[0],
-         &tcp6Session->rbuff[len],
-         tcp6Session->pos - len);
-  tcp6Session->pos -= len;        
-  if ( (tcp6Session->pos * 4 < tcp6Session->rsize) &&
-       (tcp6Session->rsize > 4 * 1024) ) {
-    /* read buffer far too large, shrink! */
-    GROW(tcp6Session->rbuff,
-        tcp6Session->rsize,
-        tcp6Session->pos + 1024);
-  }  
   tcp6Disconnect(tsession);
   return OK;
 }
@@ -772,8 +774,7 @@
 static int tcp6DirectSend(TCP6Session * tcp6Session,
                          void * mp,
                          unsigned int ssize) {
-  int ok;
-  int ret;
+  size_t ret;
   int success;
 
   if (tcp6_shutdown == YES)
@@ -789,57 +790,39 @@
     BREAK();
     return SYSERR;
   }
-  if (ssize > tcp6API.mtu + sizeof(TCP6MessagePack)) {
-    BREAK();
-    return SYSERR;
-  }
-  ok = SYSERR;
   MUTEX_LOCK(&tcp6lock);
   if (tcp6Session->wpos > 0) {
-    ret = 0;
-  } else {
-    success = SEND_NONBLOCKING(tcp6Session->sock,
-                              mp,
-                              ssize,
-                              &ret);
-    if (success == SYSERR) {
-      LOG_STRERROR(LOG_INFO, "send");
-      MUTEX_UNLOCK(&tcp6lock);
-      return SYSERR;
-    } else if (success == NO)
-      ret = 0;
+    MUTEX_UNLOCK(&tcp6lock);
+    return SYSERR;
   }
-  if ((unsigned int) ret <= ssize) { /* some bytes send or blocked */
-    if ((unsigned int)ret < ssize) {
-      if (tcp6Session->wbuff == NULL) {
-       tcp6Session->wsize = ssize + sizeof(TCP6MessagePack);
-       tcp6Session->wbuff = MALLOC(tcp6Session->wsize);
-       tcp6Session->wpos  = 0;
-      }
-      if (tcp6Session->wpos + ssize - ret > 
-         tcp6Session->wsize) {
-       ssize = 0;
-       ok = SYSERR; /* buffer full, drop */
-      } else {
-       memcpy(&tcp6Session->wbuff[tcp6Session->wpos],
-              mp,
-              ssize - ret);
-       tcp6Session->wpos += ssize - ret;
-       if (tcp6Session->wpos == ssize - ret)
-         signalSelect(); /* select set changed! */
-       ok = OK; /* all buffered */
-      }      
-    } else 
-      ok = OK; /* all written */
-  } else {
-    LOG_STRERROR(LOG_WARNING, "send");
-    ssize = 0;
-    ok = SYSERR; /* write failed for real */
+  success = SEND_NONBLOCKING(tcp6Session->sock,
+                            mp,
+                            ssize,
+                            &ret);
+  if (success == SYSERR) {
+    LOG_STRERROR(LOG_INFO, "send");
+    MUTEX_UNLOCK(&tcp6lock);
+    return SYSERR;
   }
+  if (success == NO)
+    ret = 0;
+  
+  if (ret < ssize) { /* partial send */    
+    if (tcp6Session->wsize < ssize - ret) {
+      GROW(tcp6Session->wbuff,
+          tcp6Session->wsize,
+          ssize - ret);
+    }
+    memcpy(tcp6Session->wbuff,
+          mp,
+          ssize - ret);
+    tcp6Session->wpos = ssize - ret;
+    signalSelect(); /* select set changed! */    
+  }
   MUTEX_UNLOCK(&tcp6lock);
   cronTime(&tcp6Session->lastUse);
   incrementBytesSent(ssize);
-  return ok;
+  return OK;
 }
 
 
@@ -902,8 +885,9 @@
                           const unsigned int size) {
   TCP6MessagePack * mp;
   int ok;
-  int ssize;
   
+  if (size >= MAX_BUFFER_SIZE)
+    return SYSERR;
   if (tcp6_shutdown == YES)
     return SYSERR;
   if (size == 0) {
@@ -916,12 +900,11 @@
   memcpy(&mp[1],
         msg,
         size);
-  ssize = size + sizeof(TCP6MessagePack);
-  mp->size = htons(ssize);
+  mp->size = htons(size);
   mp->reserved = 0;
   ok = tcp6DirectSendReliable(tsession->internal,
                              mp,
-                             ssize);
+                             size + sizeof(TCP6MessagePack));
   FREE(mp);
   return ok;
 }
@@ -1139,8 +1122,9 @@
                    const unsigned int size) {
   TCP6MessagePack * mp;
   int ok;
-  int ssize;
   
+  if (size >= MAX_BUFFER_SIZE)
+    return SYSERR;
   if (tcp6_shutdown == YES)
     return SYSERR;
   if (size == 0) {
@@ -1153,12 +1137,11 @@
   memcpy(&mp[1],
         msg,
         size);
-  ssize = size + sizeof(TCP6MessagePack);
-  mp->size = htons(ssize);
+  mp->size = htons(size);
   mp->reserved = 0;
   ok = tcp6DirectSend(tsession->internal,
                      mp,
-                     ssize);
+                     size + sizeof(TCP6MessagePack));
   FREE(mp);
   return ok;
 }

Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/todo 2005-03-08 11:57:22 UTC (rev 409)
@@ -7,10 +7,11 @@
 
 0.7.0pre1 [4'05] (aka "preview"):
 - testing:
+  * tbench:
+    + CAN TEST: http transport (after MTU change)
   * sqlite-tests: test concurrency with iterators
   * gnunet-pseudonym
   * gnunet-search: multiple search results don't work (yet); test on FSLIB and 
ECRS levels!
-  * tbench is awful code (FIX) and also somehow _breaks_ bandwidth 
limitations! (Mantis #766 anyone?)
 - FSUI:
   * download: various details wrt generated events
   * namespace updates
@@ -21,7 +22,6 @@
   * dht / gnunet-dht-join and gnunet-dht-query 
   * fs
 - transport refactoring:
-  * TEST: TCP MTU change [ at this point, it may just work or equally just 
crash and burn ]
   * UPDATE: HTTP: MTU change!
   * port knocking support? [ tricky ]
 - UI features:





reply via email to

[Prev in Thread] Current Thread [Next in Thread]