[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r409 - in GNUnet: . src/applications/advertising src/applic
From: |
grothoff |
Subject: |
[GNUnet-SVN] r409 - in GNUnet: . src/applications/advertising src/applications/fragmentation src/applications/identity src/applications/tbench src/applications/topology_default src/server src/transports |
Date: |
Tue, 8 Mar 2005 03:57:24 -0800 (PST) |
Author: grothoff
Date: 2005-03-08 03:57:22 -0800 (Tue, 08 Mar 2005)
New Revision: 409
Modified:
GNUnet/src/applications/advertising/advertising.c
GNUnet/src/applications/fragmentation/fragmentation.c
GNUnet/src/applications/identity/identity.c
GNUnet/src/applications/tbench/gnunet-tbench.c
GNUnet/src/applications/tbench/peer1.conf
GNUnet/src/applications/tbench/peer2.conf
GNUnet/src/applications/tbench/tbench.c
GNUnet/src/applications/tbench/tbench.h
GNUnet/src/applications/tbench/tbenchtest.c
GNUnet/src/applications/topology_default/topology.c
GNUnet/src/server/connection.c
GNUnet/src/server/core.c
GNUnet/src/transports/tcp.c
GNUnet/src/transports/tcp6.c
GNUnet/todo
Log:
bugfixing galore
Modified: GNUnet/src/applications/advertising/advertising.c
===================================================================
--- GNUnet/src/applications/advertising/advertising.c 2005-03-08 04:07:02 UTC
(rev 408)
+++ GNUnet/src/applications/advertising/advertising.c 2005-03-08 11:57:22 UTC
(rev 409)
@@ -458,8 +458,10 @@
#endif
identity->addHost(sd.m);
if (sd.n < 1) {
- LOG(LOG_WARNING,
- _("Announcing ourselves pointless: no other peers are known to us so
far.\n"));
+ if (identity->forEachHost(0, NULL, NULL) == 0)
+ LOG(LOG_WARNING,
+ _("Announcing ourselves pointless: "
+ "no other peers are known to us so far.\n"));
FREE(sd.m);
return; /* no point in trying... */
}
Modified: GNUnet/src/applications/fragmentation/fragmentation.c
===================================================================
--- GNUnet/src/applications/fragmentation/fragmentation.c 2005-03-08
04:07:02 UTC (rev 408)
+++ GNUnet/src/applications/fragmentation/fragmentation.c 2005-03-08
11:57:22 UTC (rev 409)
@@ -508,7 +508,9 @@
&frag->header,
EXTREME_PRIORITY,
ctx->transmissionTime - cronTime(NULL));
+ pos += mlen - sizeof(FRAGMENT_Message);
}
+ GNUNET_ASSERT(pos == ctx->len);
FREE(frag);
FREE(tmp);
FREE(ctx);
Modified: GNUnet/src/applications/identity/identity.c
===================================================================
--- GNUnet/src/applications/identity/identity.c 2005-03-08 04:07:02 UTC (rev
408)
+++ GNUnet/src/applications/identity/identity.c 2005-03-08 11:57:22 UTC (rev
409)
@@ -637,7 +637,7 @@
hash2enc(&identity->hashPubKey,
&hn);
#if DEBUG_IDENTITY
- LOG(LOG_DEBUG,
+ LOG(LOG_INFO,
"Blacklisting host '%s' (%d) for %llu seconds until %llu
(strict=%d).\n",
&hn,
i,
Modified: GNUnet/src/applications/tbench/gnunet-tbench.c
===================================================================
--- GNUnet/src/applications/tbench/gnunet-tbench.c 2005-03-08 04:07:02 UTC
(rev 408)
+++ GNUnet/src/applications/tbench/gnunet-tbench.c 2005-03-08 11:57:22 UTC
(rev 409)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2001, 2002, 2004 Christian Grothoff (and other contributing authors)
+ (C) 2001, 2002, 2004, 2005 Christian Grothoff (and other contributing
authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -28,23 +28,23 @@
#include "gnunet_protocols.h"
#include "tbench.h"
-#define TBENCH_VERSION "0.1.0"
+#define TBENCH_VERSION "0.1.1"
#define DEFAULT_MESSAGE_SIZE 10
-#define DEFAULT_TIMEOUT 2
+#define DEFAULT_TIMEOUT (2 * cronSECONDS)
#define DEFAULT_SPACING 0
#define OF_HUMAN_READABLE 0
#define OF_GNUPLOT_INPUT 1
-static unsigned int messageSize = DEFAULT_MESSAGE_SIZE;
-static unsigned int messageCnt = 1;
+static unsigned int messageSize = DEFAULT_MESSAGE_SIZE;
+static unsigned int messageCnt = 1;
static char * messageReceiver;
-static unsigned int messageIterations = 1;
-static unsigned int messageTrainSize = 1;
-static unsigned int messageTimeOut = DEFAULT_TIMEOUT;
-static unsigned int messageSpacing = DEFAULT_SPACING;
-static unsigned int outputFormat = OF_HUMAN_READABLE;
+static unsigned int messageIterations = 1;
+static unsigned int messageTrainSize = 1;
+static cron_t messageTimeOut = DEFAULT_TIMEOUT;
+static cron_t messageSpacing = DEFAULT_SPACING;
+static unsigned int outputFormat = OF_HUMAN_READABLE;
/**
* Parse the options, set the timeout.
@@ -103,9 +103,9 @@
{ 's', "size", "SIZE",
gettext_noop("message size") },
{ 'S', "space", "SPACE",
- gettext_noop("inter-train message spacing") },
+ gettext_noop("inter-train message spacing (in number of messages)") },
{ 't', "timeout", "TIMEOUT",
- gettext_noop("time to wait for the arrival of a response") },
+ gettext_noop("time to wait for the completion of an iteration (in
ms)") },
HELP_VERSION,
{ 'X', "xspace", "COUNT",
gettext_noop("sleep for SPACE ms after COUNT messages") },
@@ -117,7 +117,9 @@
return SYSERR;
}
case 'i':
- if(1 != sscanf(GNoptarg, "%ud", &messageIterations)){
+ if(1 != sscanf(GNoptarg,
+ "%ud",
+ &messageIterations)){
LOG(LOG_FAILURE,
_("You must pass a number to the '%s' option.\n"),
"-i");
@@ -125,7 +127,9 @@
}
break;
case 'n':
- if(1 != sscanf(GNoptarg, "%ud", &messageCnt)){
+ if(1 != sscanf(GNoptarg,
+ "%ud",
+ &messageCnt)){
LOG(LOG_FAILURE,
_("You must pass a number to the '%s' option.\n"),
"-n");
@@ -136,7 +140,9 @@
messageReceiver = STRDUP(GNoptarg);
break;
case 's':
- if(1 != sscanf(GNoptarg, "%ud", &messageSize)){
+ if(1 != sscanf(GNoptarg,
+ "%ud",
+ &messageSize)){
LOG(LOG_FAILURE,
_("You must pass a number to the '%s' option.\n"),
"-s");
@@ -144,7 +150,9 @@
}
break;
case 'S':
- if(1 != sscanf(GNoptarg, "%ud", &messageSpacing)){
+ if(1 != sscanf(GNoptarg,
+ "%ud",
+ &messageTrainSize)){
LOG(LOG_FAILURE,
_("You must pass a number to the '%s' option.\n"),
"-S");
@@ -152,7 +160,9 @@
}
break;
case 't':
- if(1 != sscanf(GNoptarg, "%ud", &messageTimeOut)){
+ if(1 != sscanf(GNoptarg,
+ "%llud",
+ &messageTimeOut)){
LOG(LOG_FAILURE,
_("You must pass a number to the '%s' option.\n"),
"-t");
@@ -165,7 +175,9 @@
TBENCH_VERSION);
return SYSERR;
case 'X':
- if(1 != sscanf(GNoptarg, "%ud", &messageTrainSize)){
+ if(1 != sscanf(GNoptarg,
+ "%llud",
+ &messageSpacing)){
LOG(LOG_FAILURE,
_("You must pass a number to the '%s' option.\n"),
"-X");
@@ -174,8 +186,7 @@
break;
default:
LOG(LOG_FAILURE,
- _("Use --help to get a list of options.\n"),
- c);
+ _("Use --help to get a list of options.\n"));
return -1;
} /* end of parsing commandline */
} /* while (1) */
@@ -201,35 +212,34 @@
if (sock == NULL)
errexit(_("Could not connect to gnunetd.\n"));
- memset(&msg,
- 0,
- sizeof(TBENCH_CS_MESSAGE));
- msg.msgSize =htons(messageSize);
- msg.msgCnt =htons(messageCnt);
- msg.iterations =htons(messageIterations);
- msg.intPktSpace =htons(messageSpacing);
- msg.trainSize =htons(messageTrainSize);
- msg.timeOut =htonl(messageTimeOut);
+ msg.header.size = htons(sizeof(TBENCH_CS_MESSAGE));
+ msg.header.type = htons(TBENCH_CS_PROTO_REQUEST);
+ msg.msgSize = htonl(messageSize);
+ msg.msgCnt = htonl(messageCnt);
+ msg.iterations = htonl(messageIterations);
+ msg.intPktSpace = htonll(messageSpacing);
+ msg.trainSize = htonl(messageTrainSize);
+ msg.timeOut = htonll(messageTimeOut);
+ msg.priority = htonl(5);
if (messageReceiver == NULL)
errexit(_("You must specify a receiver!\n"));
if (OK != enc2hash(messageReceiver,
&msg.receiverId.hashPubKey))
- errexit(_("Invalid receiver peer ID specified ('%s' is not valid enc
name).\n"),
+ errexit(_("Invalid receiver peer ID specified ('%s' is not valid
name).\n"),
messageReceiver);
FREE(messageReceiver);
- msg.header.size = htons(sizeof(TBENCH_CS_MESSAGE));
- msg.header.type = htons(TBENCH_CS_PROTO_REQUEST);
-
if (SYSERR == writeToSocket(sock,
&msg.header))
return -1;
- buffer = MALLOC(MAX_BUFFER_SIZE);
- LOG(LOG_DEBUG,
- "Reading using readFromSocket...\n");
- if (OK == readFromSocket(sock, (CS_HEADER**)&buffer)) {
- if((float)buffer->mean_loss <= 0){
+ buffer = NULL;
+ if (OK == readFromSocket(sock,
+ (CS_HEADER**)&buffer)) {
+ GNUNET_ASSERT(ntohs(buffer->header.size) ==
+ sizeof(TBENCH_CS_REPLY));
+ if ((float)buffer->mean_loss <= 0){
+ BREAK();
messagesPercentLoss = 0.0;
} else {
messagesPercentLoss = (buffer->mean_loss/((float)htons(msg.msgCnt)));
@@ -237,23 +247,23 @@
switch (outputFormat) {
case OF_HUMAN_READABLE:
printf(_("Time:\n"));
- printf(_("\tmax %d\n"),
- htons(buffer->max_time));
- printf(_("\tmin %d\n"),
- htons(buffer->min_time));
- printf(_("\tmean %f\n"),
+ printf(_("\tmax %llums\n"),
+ ntohll(buffer->max_time));
+ printf(_("\tmin %llums\n"),
+ ntohll(buffer->min_time));
+ printf(_("\tmean %8.4fms\n"),
buffer->mean_time);
- printf(_("\tvariance %f\n"),
+ printf(_("\tvariance %8.4fms\n"),
buffer->variance_time);
printf(_("Loss:\n"));
- printf(_("\tmax %d\n"),
- htons(buffer->max_loss));
- printf(_("\tmin %d\n"),
- htons(buffer->min_loss));
- printf(_("\tmean %f\n"),
+ printf(_("\tmax %u\n"),
+ ntohl(buffer->max_loss));
+ printf(_("\tmin %u\n"),
+ ntohl(buffer->min_loss));
+ printf(_("\tmean %8.4f\n"),
buffer->mean_loss);
- printf(_("\tvariance %f\n"),
+ printf(_("\tvariance %8.4f\n"),
buffer->variance_loss);
break;
case OF_GNUPLOT_INPUT:
@@ -264,9 +274,9 @@
default:
printf(_("Output format not known, this should not happen.\n"));
}
+ FREE(buffer);
} else
printf(_("\nDid not receive the message from gnunetd. Is gnunetd
running?\n"));
- FREE(buffer);
releaseClientSocket(sock);
doneUtil();
Modified: GNUnet/src/applications/tbench/peer1.conf
===================================================================
--- GNUnet/src/applications/tbench/peer1.conf 2005-03-08 04:07:02 UTC (rev
408)
+++ GNUnet/src/applications/tbench/peer1.conf 2005-03-08 11:57:22 UTC (rev
409)
@@ -6,7 +6,7 @@
[GNUNETD]
# VALGRIND = 300
HELOEXPIRES = 60
-LOGLEVEL = INFO
+LOGLEVEL = DEBUG
# LOGFILE =
KEEPLOG = 0
PIDFILE = $GNUNETD_HOME/gnunetd.pid
Modified: GNUnet/src/applications/tbench/peer2.conf
===================================================================
--- GNUnet/src/applications/tbench/peer2.conf 2005-03-08 04:07:02 UTC (rev
408)
+++ GNUnet/src/applications/tbench/peer2.conf 2005-03-08 11:57:22 UTC (rev
409)
@@ -6,7 +6,7 @@
[GNUNETD]
# VALGRIND = 300
HELOEXPIRES = 60
-LOGLEVEL = INFO
+LOGLEVEL = DEBUG
# LOGFILE =
KEEPLOG = 0
PIDFILE = $GNUNETD_HOME/gnunetd.pid
Modified: GNUnet/src/applications/tbench/tbench.c
===================================================================
--- GNUnet/src/applications/tbench/tbench.c 2005-03-08 04:07:02 UTC (rev
408)
+++ GNUnet/src/applications/tbench/tbench.c 2005-03-08 11:57:22 UTC (rev
409)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2001, 2002, 2004 Christian Grothoff (and other contributing authors)
+ (C) 2001, 2002, 2004, 2005 Christian Grothoff (and other contributing
authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -19,234 +19,408 @@
*/
/**
- * TBench CORE. This is the code that is plugged
- * into the GNUnet core to enable transport profiling.
- *
- * FIXME: this code needs some serious workover (leaks!)
- *
+ * @file applications/tbench/tbench.c
* @author Paul Ruth
- * @file applications/tbench/tbench.c
+ * @brief module to enable transport profiling.
*/
#include "platform.h"
#include "gnunet_protocols.h"
#include "tbench.h"
-struct Result {
- cron_t time;
- unsigned int packets;
-};
+#define DEBUG_TBENCH NO
-static CoreAPIForApplication * coreAPI = NULL;
+typedef struct {
+ cron_t totalTime;
+ unsigned char * packetsReceived;
+ unsigned int maxPacketNumber;
+ unsigned int lossCount;
+ unsigned int duplicateCount;
+} IterationData;
+
+/**
+ * Message exchanged between peers for profiling
+ * transport performance.
+ */
+typedef struct {
+ p2p_HEADER header;
+ unsigned int iterationNum;
+ unsigned int packetNum;
+ unsigned int priority;
+ unsigned int nounce;
+ unsigned int crc;
+} TBENCH_p2p_MESSAGE;
+
+/**
+ * Lock for access to semaphores.
+ */
static Mutex lock;
-static Mutex lockCnt;
-static PeerIdentity receiverIdent;
-static Semaphore * sem;
-static cron_t startTime = 0;
-static cron_t endTime = 0;
-static int msgCnt = 1;
-static int msgIter = 1;
-static int receiveCnt;
-static int currIteration;
+static Semaphore * presem;
-/* */
+static Semaphore * postsem;
+
+/**
+ * What was the packet number we received?
+ */
+static unsigned int lastPacketNumber;
+
+/**
+ * What is the current iteration counter? (Used to verify
+ * that replies match the current request series).
+ */
+static unsigned int currIteration;
+static unsigned int currNounce;
+
+/**
+ * Did the current iteration time-out? (YES/NO)
+ */
+static int timeoutOccured;
+
+
+static CoreAPIForApplication * coreAPI;
+
+/**
+ * Check if we have received a p2p reply,
+ * update result counters accordingly.
+ *
+ * @return 0 if we have received all results, >0 otherwise
+ */
+static int pollResults(IterationData * results,
+ int blocking) {
+ if (results->lossCount == 0)
+ return 0;
+ if (blocking == YES) {
+ if (timeoutOccured == YES)
+ return results->lossCount;
+ SEMAPHORE_DOWN(postsem);
+ } else {
+ if (OK != SEMAPHORE_DOWN_NONBLOCKING(postsem))
+ return results->lossCount;
+ }
+ do {
+ if (timeoutOccured == YES) {
+ SEMAPHORE_UP(presem);
+ return results->lossCount;
+ }
+ if (lastPacketNumber > results->maxPacketNumber) {
+ SEMAPHORE_UP(presem);
+ return results->lossCount;
+ }
+ if (0 == results->packetsReceived[lastPacketNumber]++) {
+ results->lossCount--;
+ } else {
+ results->duplicateCount++;
+#if DEBUG_TBENCH
+ LOG(LOG_DEBUG,
+ "Received duplicate message %u from iteration %u\n",
+ lastPacketNumber,
+ currIteration);
+#endif
+ }
+ SEMAPHORE_UP(presem);
+ } while (OK == SEMAPHORE_DOWN_NONBLOCKING(postsem));
+ return results->lossCount;
+}
+
+/**
+ * Another peer send us a tbench request. Just turn
+ * around and send it back.
+ */
static int handleTBenchReq(const PeerIdentity * sender,
const p2p_HEADER * message) {
- TBENCH_p2p_MESSAGE *pmsg = (TBENCH_p2p_MESSAGE*)message;
-
- LOG(LOG_DEBUG,
- "%s received iteration %d, message %d\n",
- __FUNCTION__,
- htons(pmsg->iterationNum),
- htons(pmsg->packetNum));
- pmsg->header.type = htons(TBENCH_p2p_PROTO_REPLY);
- coreAPI->unicast(sender, message, 5, 0);
+ p2p_HEADER * reply;
+ TBENCH_p2p_MESSAGE * msg;
+
+ if ( ntohs(message->size) < sizeof(TBENCH_p2p_MESSAGE)) {
+ BREAK();
+ return SYSERR;
+ }
+ msg = (TBENCH_p2p_MESSAGE*) message;
+ if (crc32N(&msg[1],
+ ntohs(message->size) - sizeof(TBENCH_p2p_MESSAGE))
+ != ntohl(msg->crc)) {
+ BREAK();
+ return SYSERR;
+ }
+
+#if DEBUG_TBENCH
+ LOG(LOG_DEBUG,
+ "Received message %u from iteration %u/%u\n",
+ htonl(msg->packetNum),
+ htonl(msg->iterationNum),
+ htonl(msg->nounce));
+#endif
+ reply = MALLOC(ntohs(message->size));
+ memcpy(reply,
+ message,
+ ntohs(message->size));
+ reply->type = htons(TBENCH_p2p_PROTO_REPLY);
+ coreAPI->unicast(sender,
+ reply,
+ ntohl(msg->priority), /* medium importance */
+ 0); /* no delay */
+ FREE(reply);
return OK;
}
-/* */
+/**
+ * We received a tbench-reply. Check and count stats.
+ */
static int handleTBenchReply(const PeerIdentity * sender,
const p2p_HEADER * message) {
- TBENCH_p2p_MESSAGE *pmsg = (TBENCH_p2p_MESSAGE*)message;
-
- LOG(LOG_DEBUG,
- "Entering %s.\n",
- __FUNCTION__);
- MUTEX_LOCK(&lockCnt);
- if(htons(pmsg->iterationNum) == currIteration) {
- cronTime(&endTime);
- receiveCnt++;
- LOG(LOG_DEBUG,
- "iteration %d, received reply, %d\n",
- currIteration,
- receiveCnt);
- if(receiveCnt >= msgCnt)
- SEMAPHORE_UP(sem);
+ TBENCH_p2p_MESSAGE * pmsg;
+
+ if (ntohs(message->size) < sizeof(TBENCH_p2p_MESSAGE)) {
+ BREAK();
+ return SYSERR;
+ }
+ pmsg = (TBENCH_p2p_MESSAGE*) message;
+ if (crc32N(&pmsg[1],
+ ntohs(message->size) - sizeof(TBENCH_p2p_MESSAGE))
+ != ntohl(pmsg->crc)) {
+ BREAK();
+ return SYSERR;
+ }
+#if DEBUG_TBENCH
+ LOG(LOG_DEBUG,
+ "Received message %u from iteration %u/%u\n",
+ htonl(pmsg->packetNum),
+ htonl(pmsg->iterationNum),
+ htonl(pmsg->nounce));
+#endif
+ MUTEX_LOCK(&lock);
+ if ( (timeoutOccured == NO) &&
+ (presem != NULL) &&
+ (postsem != NULL) &&
+ (htonl(pmsg->iterationNum) == currIteration) &&
+ (htonl(pmsg->nounce) == currNounce) ) {
+ SEMAPHORE_DOWN(presem);
+ lastPacketNumber = ntohl(pmsg->packetNum);
+ SEMAPHORE_UP(postsem);
} else {
+#if DEBUG_TBENCH
LOG(LOG_DEBUG,
- "Old Reply: iteration %d, received reply, %d\n",
- currIteration, receiveCnt);
+ "Received message %u from iteration %u too late (now at iteration
%u)\n",
+ ntohl(pmsg->packetNum),
+ ntohl(pmsg->iterationNum),
+ currIteration);
+#endif
}
- MUTEX_UNLOCK(&lockCnt);
+ MUTEX_UNLOCK(&lock);
return OK;
}
+/**
+ * Cron-job helper function to signal timeout.
+ */
static void semaUp(Semaphore * sem) {
+ timeoutOccured = YES;
SEMAPHORE_UP(sem);
}
-/* */
+/**
+ * Handle client request (main function)
+ */
static int csHandleTBenchRequest(ClientHandle client,
const CS_HEADER * message) {
- int i,j;
- int sum_loss,sum_time;
- double sum_variance_time, sum_variance_loss;
- TBENCH_p2p_MESSAGE *opmsg;
- TBENCH_CS_MESSAGE *icmsg;
- TBENCH_CS_REPLY *ocmsg;
- struct Result *results;
+ TBENCH_CS_MESSAGE * msg;
+ TBENCH_CS_REPLY reply;
+ TBENCH_p2p_MESSAGE * p2p;
+ unsigned short size;
+ unsigned int iteration;
+ unsigned int packetNum;
+ cron_t startTime;
+ cron_t endTime;
+ cron_t now;
+ cron_t delay;
+ cron_t delayStart;
+ IterationData * results;
+ unsigned long long sum_loss;
+ unsigned int max_loss;
+ unsigned int min_loss;
+ cron_t sum_time;
+ cron_t min_time;
+ cron_t max_time;
+ cron_t earlyEnd;
+ double sum_variance_time;
+ double sum_variance_loss;
+ unsigned int msgCnt;
+ unsigned int iterations;
- LOG(LOG_DEBUG,
- "Entering %s.\n",
- __FUNCTION__);
- icmsg = (TBENCH_CS_MESSAGE*)message;
-
- opmsg = MALLOC(sizeof(TBENCH_p2p_MESSAGE)+ntohs(icmsg->msgSize)+1);
- ocmsg = MALLOC(sizeof(TBENCH_CS_REPLY));
- MUTEX_LOCK(&lock); /* only one benchmark run
- at a time */
+ if ( ntohs(message->size) != sizeof(TBENCH_CS_MESSAGE) )
+ return SYSERR;
- msgCnt = htons(icmsg->msgCnt);
- msgIter = htons(icmsg->iterations);
- results = MALLOC(msgIter * sizeof(struct Result));
-
- LOG(LOG_DEBUG,
- "TBENCH: msgCnt %d msgIter %d\n",
+ msg = (TBENCH_CS_MESSAGE*) message;
+ size = sizeof(TBENCH_p2p_MESSAGE) + ntohl(msg->msgSize);
+ if (size < sizeof(TBENCH_p2p_MESSAGE))
+ return SYSERR;
+ delay = ntohll(msg->intPktSpace);
+ iterations = ntohl(msg->iterations);
+ msgCnt = ntohl(msg->msgCnt);
+#if DEBUG_TBENCH
+ LOG(LOG_MESSAGE,
+ "Tbench runs %u test messages of size %u in %u iterations.\n",
msgCnt,
- msgIter);
- sem = SEMAPHORE_NEW(0);
+ size,
+ iterations);
+#endif
+ results = MALLOC(sizeof(IterationData) * iterations);
- receiveCnt = 0;
+ p2p = MALLOC(size);
+ memset(p2p,
+ 0,
+ size);
+ p2p->header.size = htons(size);
+ p2p->header.type = htons(TBENCH_p2p_PROTO_REQUEST);
+ p2p->priority = msg->priority;
- memcpy(&receiverIdent,
- &icmsg->receiverId,
- sizeof(PeerIdentity));
-
- /* set up opmsg */
- memset(opmsg, 0, sizeof(TBENCH_p2p_MESSAGE));
- opmsg->header.size = htons(sizeof(TBENCH_p2p_MESSAGE)+ntohs(icmsg->msgSize));
- opmsg->header.type = htons(TBENCH_p2p_PROTO_REQUEST);
- opmsg->iterationNum = opmsg->packetNum = htons(0);
-
- for(currIteration = 0; currIteration < msgIter; currIteration++){
- opmsg->iterationNum = htons(currIteration);
- receiveCnt = 0;
- LOG(LOG_DEBUG,
- "Timeout after %ums\n",
- ntohl(icmsg->timeOut));
+ MUTEX_LOCK(&lock);
+ for (iteration=0;iteration<iterations;iteration++) {
+ results[iteration].maxPacketNumber = msgCnt;
+ results[iteration].packetsReceived = MALLOC(msgCnt);
+ memset(results[iteration].packetsReceived,
+ 0,
+ msgCnt);
+ results[iteration].lossCount = msgCnt;
+ results[iteration].duplicateCount = 0;
+
+ earlyEnd = 0;
+ presem = SEMAPHORE_NEW(1);
+ postsem = SEMAPHORE_NEW(0);
+ currNounce = randomi(0xFFFFFF);
+ p2p->nounce
+ = htonl(currNounce);
+ currIteration = iteration;
+ p2p->iterationNum
+ = htonl(currIteration);
+ memset(&p2p[1],
+ randomi(256),
+ size - sizeof(TBENCH_p2p_MESSAGE));
+ p2p->crc
+ = htonl(crc32N(&p2p[1],
+ size - sizeof(TBENCH_p2p_MESSAGE)));
+
+ MUTEX_UNLOCK(&lock); /* allow receiving */
+
+ cronTime(&startTime);
+ endTime = startTime + ntohll(msg->timeOut);
+
+ timeoutOccured = NO;
addCronJob((CronJob)&semaUp,
- ntohl(icmsg->timeOut) * cronMILLIS,
+ ntohll(msg->timeOut) * cronMILLIS,
0,
- sem);
- cronTime(&startTime);
- endTime = startTime;
- for(j = 0; j < msgCnt; j++){
- if (cronTime(NULL) > startTime + ntohl(icmsg->timeOut)*cronMILLIS)
- break;
- opmsg->packetNum = htons(j);
- coreAPI->unicast(&receiverIdent, &opmsg->header, 5, 0);
- if (htons(icmsg->intPktSpace)!=0 && (j % htons(icmsg->trainSize)) == 0) {
- struct timespec del;
- struct timespec rem;
- del.tv_sec = htons(icmsg->intPktSpace) / cronSECONDS;
- del.tv_nsec = (htons(icmsg->intPktSpace) - (del.tv_sec * cronSECONDS))
* 1000 * 1000;
-#ifndef WINDOWS
- nanosleep(&del, &rem);
-#else
- SleepEx(del.tv_sec * 1000 + del.tv_nsec / 1000000, TRUE);
+ postsem);
+ for (packetNum=0;packetNum<msgCnt;packetNum++){
+ cronTime(&now);
+ if (now > endTime)
+ break; /* timeout */
+
+ p2p->packetNum = htonl(packetNum);
+#if DEBUG_TBENCH
+ LOG(LOG_DEBUG,
+ "Sending message %u in iteration %u\n",
+ packetNum, iteration);
#endif
- }
+ coreAPI->unicast(&msg->receiverId,
+ &p2p->header,
+ ntohl(msg->priority),
+ 0); /* no delay */
+ pollResults(&results[iteration], NO);
+ if ( (delay != 0) &&
+ (htonl(msg->trainSize) != 0) &&
+ (packetNum % htonl(msg->trainSize)) == 0) {
+ delayStart = now;
+ while ( (cronTime(&now) < (delayStart+delay)) &&
+ (timeoutOccured == NO) ) {
+ if (delayStart + delay - now > 5 * cronMILLIS) {
+ pollResults(&results[iteration], NO);
+ gnunet_util_sleep(5 * cronMILLIS);
+ } else
+ gnunet_util_sleep(delayStart + delay - now);
+ }
+ }
+ if ( (0 == pollResults(&results[iteration], NO)) &&
+ (earlyEnd == 0) )
+ earlyEnd = cronTime(NULL);
}
- SEMAPHORE_DOWN(sem);
+ while ( (timeoutOccured == NO) &&
+ (cronTime(&now) < endTime) ) {
+ if ( (0 == pollResults(&results[iteration], YES) ) &&
+ (earlyEnd == 0) )
+ earlyEnd = now;
+ gnunet_util_sleep(5 * cronMILLIS);
+ }
+
+ /* make sure to unblock waiting jobs */
+ timeoutOccured = YES;
+ SEMAPHORE_UP(presem);
+
+ MUTEX_LOCK(&lock);
+ if (earlyEnd == 0)
+ earlyEnd = now;
+ results[iteration].totalTime
+ = earlyEnd - startTime;
+ FREE(results[iteration].packetsReceived);
suspendCron();
delCronJob((CronJob)&semaUp,
0,
- sem);
+ postsem);
resumeCron();
- results[currIteration].time = endTime-startTime;
- results[currIteration].packets = receiveCnt;
+ SEMAPHORE_FREE(presem);
+ SEMAPHORE_FREE(postsem);
+ presem = NULL;
+ postsem = NULL;
+
}
- SEMAPHORE_FREE(sem);
MUTEX_UNLOCK(&lock);
- /* Lets see what the raw results are */
- for(i = 0; i < msgIter; i++){
- LOG(LOG_EVERYTHING,
- "iter[%d], packets %d/%d, time %dms\n",
- i,
- results[i].packets,
- msgCnt,
- results[i].time);
- }
-
- sum_loss = msgCnt - results[0].packets;
- ocmsg->max_loss = htons(msgCnt - results[0].packets);
- ocmsg->min_loss = htons(msgCnt - results[0].packets);
- sum_time = results[0].time;
- ocmsg->max_time = htons(results[0].time);
- ocmsg->min_time = htons(results[0].time);
- for(i = 1; i < msgIter; i++) {
- LOG(LOG_EVERYTHING,
- "iteration=%d\n",
- i);
- sum_loss += msgCnt - results[i].packets;
- if(msgCnt-results[i].packets > htons(ocmsg->max_loss))
- ocmsg->max_loss = htons(msgCnt - results[i].packets);
-
- if(msgCnt-results[i].packets < htons(ocmsg->min_loss))
- ocmsg->min_loss = htons(msgCnt - results[i].packets);
-
- sum_time += results[i].time;
- if(results[i].time > htons(ocmsg->max_time))
- ocmsg->max_time = htons(results[i].time);
-
- if(results[i].time < htons(ocmsg->min_time))
- ocmsg->min_time = htons(results[i].time);
+ sum_loss = 0;
+ sum_time = 0;
+ max_loss = 0;
+ min_loss = msgCnt;
+ min_time = 1 * cronYEARS;
+ max_time = 0;
+ /* data post-processing */
+ for (iteration=0;iteration<iterations;iteration++) {
+ sum_loss += results[iteration].lossCount;
+ sum_time += results[iteration].totalTime;
+
+ if (results[iteration].lossCount > max_loss)
+ max_loss = results[iteration].lossCount;
+ if (results[iteration].lossCount < min_loss)
+ min_loss = results[iteration].lossCount;
+ if (results[iteration].totalTime > max_time)
+ max_time = results[iteration].totalTime;
+ if (results[iteration].totalTime < min_time)
+ min_time = results[iteration].totalTime;
}
- ocmsg->mean_loss = ((float)sum_loss/(float)msgIter);
- ocmsg->mean_time = ((float)sum_time/(float)msgIter);
-
+
sum_variance_time = 0.0;
sum_variance_loss = 0.0;
- for(i = 0; i < msgIter; i++){
- LOG(LOG_DEBUG,
- "TBENCH: iteration=%d msgIter=%d\n",
- i,
- msgIter);
- sum_variance_time += (results[i].time - ocmsg->mean_time)*
- (results[i].time - ocmsg->mean_time);
-
- sum_variance_loss += ((msgCnt - results[i].packets) - ocmsg->mean_loss)*
- ((msgCnt - results[i].packets) - ocmsg->mean_loss);
+ for(iteration = 0; iteration <iterations; iteration++){
+ sum_variance_time +=
+ (results[iteration].totalTime - sum_time/iterations) *
+ (results[iteration].totalTime - sum_time/iterations);
+ sum_variance_loss +=
+ (results[iteration].lossCount - sum_loss/iterations) *
+ (results[iteration].lossCount - sum_loss/iterations);
}
- ocmsg->variance_time = sum_variance_time/(msgIter-1);
- ocmsg->variance_loss = sum_variance_loss/(msgIter-1);
-
- ocmsg->header.size = htons(sizeof(TBENCH_CS_MESSAGE));
- ocmsg->header.type = htons(TBENCH_CS_PROTO_REPLY);
-
- LOG(LOG_DEBUG,
- "calling writeToSocket\n");
- if (SYSERR == coreAPI->sendToClient(client,
- &ocmsg->header))
- return SYSERR;
- FREE(opmsg);
- FREE(ocmsg);
+
+ /* send collected stats back to client */
+ reply.header.size = htons(sizeof(TBENCH_CS_REPLY));
+ reply.header.type = htons(TBENCH_CS_PROTO_REPLY);
+ reply.max_loss = htonl(max_loss);
+ reply.min_loss = htonl(min_loss);
+ reply.mean_loss = ((float)sum_loss/(float)iterations);
+ reply.mean_time = ((float)sum_time/(float)iterations);
+ reply.max_time = htonll(max_time);
+ reply.min_time = htonll(min_time);
+ reply.variance_time = sum_variance_time/(iterations-1);
+ reply.variance_loss = sum_variance_loss/(iterations-1);
FREE(results);
- LOG(LOG_DEBUG,
- "finishing benchmark\n");
- return OK;
+ return coreAPI->sendToClient(client,
+ &reply.header);
}
/**
@@ -258,7 +432,6 @@
int ok = OK;
MUTEX_CREATE(&lock);
- MUTEX_CREATE(&lockCnt);
coreAPI = capi;
if (SYSERR == capi->registerHandler(TBENCH_p2p_PROTO_REPLY,
&handleTBenchReply))
@@ -267,7 +440,7 @@
&handleTBenchReq))
ok = SYSERR;
if (SYSERR == capi->registerClientHandler(TBENCH_CS_PROTO_REQUEST,
- (CSHandler)&csHandleTBenchRequest))
+ &csHandleTBenchRequest))
ok = SYSERR;
return ok;
}
@@ -278,9 +451,8 @@
coreAPI->unregisterHandler(TBENCH_p2p_PROTO_REPLY,
&handleTBenchReply);
coreAPI->unregisterClientHandler(TBENCH_CS_PROTO_REQUEST,
- (CSHandler)&csHandleTBenchRequest);
+ &csHandleTBenchRequest);
MUTEX_DESTROY(&lock);
- MUTEX_DESTROY(&lockCnt);
coreAPI = NULL;
}
Modified: GNUnet/src/applications/tbench/tbench.h
===================================================================
--- GNUnet/src/applications/tbench/tbench.h 2005-03-08 04:07:02 UTC (rev
408)
+++ GNUnet/src/applications/tbench/tbench.h 2005-03-08 11:57:22 UTC (rev
409)
@@ -19,47 +19,70 @@
*/
/**
+ * @file applications/tbench/tbench.h
* @author Christian Grothoff
- * @file applications/tbench/tbench.h
- **/
+ */
#ifndef TBENCH_TBENCH_H
#define TBENCH_TBENCH_H
#include "gnunet_core.h"
-#define TBENCH_MSG_LENGTH 1024
-
+/**
+ * Client requests peer to perform some profiling.
+ */
typedef struct {
- p2p_HEADER header;
- unsigned int iterationNum;
- unsigned int packetNum;
-} TBENCH_p2p_MESSAGE;
-
-typedef struct {
- TBENCH_p2p_MESSAGE p2p_message;
- char message[1];
-} TBENCH_p2p_MESSAGE_GENERIC;
-
-typedef struct {
CS_HEADER header;
+ /**
+ * How big is each message (plus headers).
+ * Note that GNUnet is limited to 64k messages.
+ */
unsigned int msgSize;
+ /**
+ * How many messages should be transmitted in
+ * each iteration?
+ */
unsigned int msgCnt;
+ /**
+ * How many iterations should be performed?
+ */
unsigned int iterations;
+ /**
+ * Which peer should receive the messages?
+ */
PeerIdentity receiverId;
- unsigned int intPktSpace; /* Inter packet space in milliseconds */
+ /**
+ * Inter packet space in milliseconds (delay
+ * introduced when sending messages).
+ */
+ cron_t intPktSpace;
+ /**
+ * Time to wait for the arrival of all repies
+ * in one iteration.
+ */
+ cron_t timeOut;
+ /**
+ * intPktSpace delay is only introduced every
+ * trainSize messages.
+ */
unsigned int trainSize;
- unsigned int timeOut; /* Time to wait for the arrival of a
reply in secs */
+ /**
+ * Which priority should be used?
+ */
+ unsigned int priority;
} TBENCH_CS_MESSAGE;
+/**
+ * Response from server with statistics.
+ */
typedef struct {
CS_HEADER header;
- int max_loss;
- int min_loss;
+ unsigned int max_loss;
+ unsigned int min_loss;
float mean_loss;
float variance_loss;
- int max_time;
- int min_time;
+ cron_t max_time;
+ cron_t min_time;
float mean_time;
float variance_time;
} TBENCH_CS_REPLY;
Modified: GNUnet/src/applications/tbench/tbenchtest.c
===================================================================
--- GNUnet/src/applications/tbench/tbenchtest.c 2005-03-08 04:07:02 UTC (rev
408)
+++ GNUnet/src/applications/tbench/tbenchtest.c 2005-03-08 11:57:22 UTC (rev
409)
@@ -30,6 +30,11 @@
#include "tbench.h"
#include <sys/wait.h>
+/**
+ * Set this to NO when debugging gnunetd processes separately.
+ */
+#define DO_FORK NO
+
static int parseOptions(int argc,
char ** argv) {
FREENONNULL(setConfigurationString("GNUNETD",
@@ -44,28 +49,30 @@
static PeerIdentity peer2;
static int test(GNUNET_TCP_SOCKET * sock,
- unsigned short messageSize,
- unsigned short messageCnt,
- unsigned short messageIterations,
- unsigned short messageSpacing,
- unsigned short messageTrainSize,
- unsigned int messageTimeOut /* in milli-seconds */) {
+ unsigned int messageSize,
+ unsigned int messageCnt,
+ unsigned int messageIterations,
+ cron_t messageSpacing,
+ unsigned int messageTrainSize,
+ cron_t messageTimeOut /* in milli-seconds */) {
int ret;
TBENCH_CS_MESSAGE msg;
TBENCH_CS_REPLY * buffer;
float messagesPercentLoss;
- memset(&msg,
- 0,
- sizeof(TBENCH_CS_MESSAGE));
+ printf(_("Using %u messages of size %u for %u times.\n"),
+ messageCnt,
+ messageSize,
+ messageIterations);
msg.header.size = htons(sizeof(TBENCH_CS_MESSAGE));
msg.header.type = htons(TBENCH_CS_PROTO_REQUEST);
- msg.msgSize = htons(messageSize);
- msg.msgCnt = htons(messageCnt);
- msg.iterations = htons(messageIterations);
- msg.intPktSpace = htons(messageSpacing);
- msg.trainSize = htons(messageTrainSize);
- msg.timeOut = htonl(messageTimeOut);
+ msg.msgSize = htonl(messageSize);
+ msg.msgCnt = htonl(messageCnt);
+ msg.iterations = htonl(messageIterations);
+ msg.intPktSpace = htonll(messageSpacing);
+ msg.trainSize = htonl(messageTrainSize);
+ msg.timeOut = htonll(messageTimeOut);
+ msg.priority = htonl(5);
msg.receiverId = peer2;
if (SYSERR == writeToSocket(sock,
@@ -80,14 +87,14 @@
} else {
messagesPercentLoss = (buffer->mean_loss/((float)htons(msg.msgCnt)));
}
- printf(_("Times: max %8d min %8d mean %8.4f variance %8.4f\n"),
- htons(buffer->max_time),
- htons(buffer->min_time),
+ printf(_("Times: max %16llu min %16llu mean %12.3f variance %12.3f\n"),
+ ntohll(buffer->max_time),
+ ntohll(buffer->min_time),
buffer->mean_time,
buffer->variance_time);
- printf(_("Loss: max %8d min %8d mean %8.4f variance %8.4f\n"),
- htons(buffer->max_loss),
- htons(buffer->min_loss),
+ printf(_("Loss: max %16u min %16u mean %12.3f variance %12.3f\n"),
+ ntohl(buffer->max_loss),
+ ntohl(buffer->min_loss),
buffer->mean_loss,
buffer->variance_loss);
} else {
@@ -109,6 +116,27 @@
return OK;
}
+static int checkConnected(GNUNET_TCP_SOCKET * sock) {
+ int left;
+ int ret;
+
+ ret = 0;
+ left = 30; /* how many iterations should we wait? */
+ while (OK == requestStatistics(sock,
+ &waitForConnect,
+ NULL)) {
+ printf(_("Waiting for peers to connect (%u iterations left)...\n"),
+ left);
+ sleep(5);
+ left--;
+ if (left == 0) {
+ ret = 1;
+ break;
+ }
+ }
+ return ret;
+}
+
/**
* Testcase to test p2p communications.
*
@@ -117,19 +145,22 @@
* @return 0: ok, -1: error
*/
int main(int argc, char ** argv) {
+#if DO_FORK
pid_t daemon1;
pid_t daemon2;
+ int status;
+#endif
int ret;
int left;
- int status;
GNUNET_TCP_SOCKET * sock;
+ int i;
GNUNET_ASSERT(OK ==
enc2hash("BV3AS3KMIIBVIFCGEG907N6NTDTH26B7T6FODUSLSGK"
"5B2Q58IEU1VF5FTR838449CSHVBOAHLDVQAOA33O77F"
"OPDA8F1VIKESLSNBO",
&peer2.hashPubKey));
-
+#if DO_FORK
daemon1 = fork();
if (daemon1 == 0) {
if (0 != execlp("gnunetd", /* what binary to execute, must be in $PATH! */
@@ -138,7 +169,7 @@
"-c",
"peer1.conf", /* configuration file */
NULL)) {
- fprintf(stderr,
+ fprintf(stderr,
_("'%s' failed: %s\n"),
"execlp",
STRERROR(errno));
@@ -209,6 +240,7 @@
}
}
sleep(5);
+#endif
ret = 0;
left = 5;
@@ -228,18 +260,23 @@
}
} while (sock == NULL);
+ ret = checkConnected(sock);
+ printf(_("Running benchmark...\n"));
+ /* 'slow' pass: wait for bandwidth negotiation! */
if (ret == 0)
- ret = test(sock, 4, 1, 1, 1, 1, 5000);
- if (ret == 0)
- ret = test(sock, 50, 64, 40, 50, 10, 10000);
- if (ret == 0)
- ret = test(sock, 1024, 64, 4, 0, 1, 10000);
- if (ret == 0)
- ret = test(sock, 32*1024, 8, 4, 0, 1, 30000);
-
+ ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 30 * cronSECONDS);
+ checkConnected(sock);
+ /* 'blast' pass: hit bandwidth limits! */
+ for (i=8;i<60000;i*=2) {
+ if (ret == 0)
+ ret = test(sock, i, 1+1024/i, 4, 10 * cronMILLIS, 2, 2 * cronSECONDS);
+ checkConnected(sock);
+ }
+ ret = test(sock, i, 10, 10, 500 * cronMILLIS, 1, 10 * cronSECONDS);
releaseClientSocket(sock);
doneUtil();
+#if DO_FORK
if (daemon1 != -1) {
if (0 != kill(daemon1, SIGTERM))
DIE_STRERROR("kill");
@@ -252,6 +289,7 @@
if (daemon2 != waitpid(daemon2, &status, 0))
DIE_STRERROR("waitpid");
}
+#endif
return ret;
}
Modified: GNUnet/src/applications/topology_default/topology.c
===================================================================
--- GNUnet/src/applications/topology_default/topology.c 2005-03-08 04:07:02 UTC
(rev 408)
+++ GNUnet/src/applications/topology_default/topology.c 2005-03-08 11:57:22 UTC
(rev 409)
@@ -264,6 +264,7 @@
provide_module_topology_default(CoreAPIForApplication * capi) {
static Topology_ServiceAPI api;
char * data;
+ unsigned int len;
coreAPI = capi;
identity = capi->requestService("identity");
@@ -293,14 +294,15 @@
5 * cronSECONDS,
NULL);
- if (-1 == stateReadContent(TOPOLOGY_TAG_FILE,
- (void**) &data)) {
+ if (-1 == (len = stateReadContent(TOPOLOGY_TAG_FILE,
+ (void**) &data))) {
stateWriteContent(TOPOLOGY_TAG_FILE,
strlen(PACKAGE_VERSION),
PACKAGE_VERSION);
} else {
- if (0 != strcmp(PACKAGE_VERSION,
- data)) {
+ if (0 != strncmp(PACKAGE_VERSION,
+ data,
+ len)) {
LOG(LOG_FAILURE,
_("Version mismatch ('%s' vs. '%s'), run gnunet-update!\n"),
PACKAGE_VERSION,
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/src/server/connection.c 2005-03-08 11:57:22 UTC (rev 409)
@@ -838,16 +838,9 @@
}
if (be->status != STAT_UP)
return; /* status is not up, cannot send! */
- if (be->sendBufferSize == 0) {
-#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "Message queue empty. Nothing transmitted.\n");
-#endif
- return; /* nothing to send */
- }
+ if (be->sendBufferSize == 0)
+ return; /* nothing to send */
-
-
/* recompute max send frequency */
if (be->max_bpm <= 0)
be->max_bpm = 1;
@@ -870,7 +863,7 @@
if ( (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) &&
(be->sendBufferSize < MAX_SEND_BUFFER_SIZE/4) ) {
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"Send frequency too high (CPU load), send deferred.\n");
#endif
@@ -879,7 +872,7 @@
/* test if receiver has enough bandwidth available! */
updateCurBPS(be);
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"receiver window available: %lld bytes (MTU: %u)\n",
be->available_send_window,
@@ -897,7 +890,7 @@
i = 0;
/* assumes entries are sorted by priority! */
while (i < be->sendBufferSize) {
- if ( (totalMessageSize + entries[i]->len < 60000) &&
+ if ( (totalMessageSize + entries[i]->len < MAX_BUFFER_SIZE) &&
(entries[i]->pri >= EXTREME_PRIORITY) ) {
knapsackSolution[i] = YES;
priority += entries[i]->pri;
@@ -906,17 +899,18 @@
break;
}
i++;
- }
+ }
while ( (i < be->sendBufferSize) &&
(be->available_send_window > totalMessageSize) ) {
- if (entries[i]->len + totalMessageSize <=
- be->available_send_window) {
+ if ( (entries[i]->len + totalMessageSize <=
+ be->available_send_window) &&
+ (totalMessageSize + entries[i]->len < MAX_BUFFER_SIZE) ) {
knapsackSolution[i] = YES;
totalMessageSize += entries[i]->len;
priority += entries[i]->pri;
} else {
knapsackSolution[i] = NO;
- if (totalMessageSize == 0) {
+ if (totalMessageSize == sizeof(P2P_Message)) {
/* if the highest-priority message does not yet
fit, wait for send window to grow so that
we can get it out (otherwise we would starve
@@ -927,6 +921,15 @@
}
i++;
}
+ if ( (totalMessageSize == sizeof(P2P_Message)) ||
+ ( (priority < EXTREME_PRIORITY) &&
+ ((totalMessageSize / sizeof(P2P_Message)) < 4) &&
+ (randomi(16) != 0) ) ) {
+ /* randomization necessary to ensure we eventually send
+ the message if there is nothing else to do! */
+ FREE(knapsackSolution);
+ return;
+ }
} else { /* if (be->session.mtu == 0) */
/* solve knapsack problem, compute accumulated priority */
knapsackSolution = MALLOC(sizeof(int) * be->sendBufferSize);
@@ -989,7 +992,7 @@
which has EXTREME_PRIORITY) */
if (priority < EXTREME_PRIORITY) {
FREE(knapsackSolution);
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"bandwidth limits prevent sending (send window %u too small).\n",
be->available_send_window);
@@ -1008,7 +1011,7 @@
int msgCap;
FREE(knapsackSolution);
cronTime(&be->lastSendAttempt);
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"policy prevents sending message (priority too low: %d)\n",
priority);
@@ -1027,7 +1030,7 @@
if (be->sendBufferSize <= msgCap)
break;
if ( entry->transmissionTime < expired) {
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION
LOG(LOG_DEBUG,
"expiring message, expired %ds ago, queue size is %u (bandwidth
stressed)\n",
(int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
@@ -1046,7 +1049,7 @@
}
/* build message (start with sequence number) */
- GNUNET_ASSERT(totalMessageSize >= sizeof(P2P_Message));
+ GNUNET_ASSERT(totalMessageSize > sizeof(P2P_Message));
plaintextMsg = MALLOC(totalMessageSize);
p2pHdr = (P2P_Message*) plaintextMsg;
p2pHdr->timeStamp
@@ -1186,23 +1189,11 @@
hash(&p2pHdr->sequenceNumber,
p - sizeof(HashCode512),
(HashCode512*) encryptedMsg);
-#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "Encrypting with key %u and IV %u\n",
- *(int*) &be->skey_local,
- *(int*) &encryptedMsg /* IV */);
-#endif
encryptBlock(&p2pHdr->sequenceNumber,
p - sizeof(HashCode512),
&be->skey_local,
(const INITVECTOR*) encryptedMsg, /* IV */
&((P2P_Message*)encryptedMsg)->sequenceNumber);
-#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "calling transport layer to send %d bytes with crc %x\n",
- p,
- crc);
-#endif
if (! ( (SYSERR == transport->send(be->session.tsession,
encryptedMsg,
p)) &&
@@ -1264,7 +1255,7 @@
(se->len > be->session.mtu - sizeof(P2P_Message)) ) {
/* this message is so big that it must be fragmented! */
fragmentation->fragment(&be->session.sender,
- be->session.mtu,
+ be->session.mtu - sizeof(P2P_Message),
se->pri,
se->transmissionTime,
se->len,
@@ -1368,14 +1359,14 @@
int establishSession) {
BufferEntry * root;
BufferEntry * prev;
-#if DEBUG_CONNECTION
+#if DEBUG_CONNECTION
EncName enc;
- IFLOG(LOG_INFO,
+ IFLOG(LOG_EVERYTHING,
hash2enc(&hostId->hashPubKey,
&enc));
- LOG(LOG_INFO,
- "Adding host %s to the connection table.\n",
+ LOG(LOG_EVERYTHING,
+ "Adding host '%s' to the connection table.\n",
&enc);
#endif
@@ -1656,8 +1647,7 @@
and then merge the values; but for now, let's just go
hardcore and adjust all values rapidly */
for (u=0;u<activePeerCount;u++) {
- entries[u]->idealized_limit = 0;
- adjustedRR[u] = entries[u]->recently_received * cronMINUTES /
timeDifference;
+ adjustedRR[u] = entries[u]->recently_received * cronMINUTES /
timeDifference / 2;
#if DEBUG_CONNECTION
if (adjustedRR[u] > entries[u]->idealized_limit) {
@@ -1666,7 +1656,7 @@
hash2enc(&entries[u]->session.sender.hashPubKey,
&enc));
LOG(LOG_INFO,
- "peer %s transmitted above limit: %llu bpm > %u bpm\n",
+ "peer '%s' transmitted above limit: %llu bpm > %u bpm\n",
&enc,
adjustedRR[u],
entries[u]->idealized_limit);
@@ -1678,15 +1668,14 @@
*/
if (adjustedRR[u] > 2 * MAX_BUF_FACT *
entries[u]->max_transmitted_limit) {
-#if DEBUG_CONNECTION || 1
+#if DEBUG_CONNECTION
EncName enc;
IFLOG(LOG_INFO,
hash2enc(&entries[u]->session.sender.hashPubKey,
&enc));
LOG(LOG_INFO,
- "blacklisting %s, it sent >%dx+MTU above mLimit: %llu bpm > %u bpm
(cLimit %u bpm)\n",
+ "blacklisting '%s': sent %llu bpm (limit %u bpm, target %u bpm)\n",
&enc,
- 2 * MAX_BUF_FACT,
adjustedRR[u],
entries[u]->max_transmitted_limit,
entries[u]->idealized_limit);
@@ -1696,9 +1685,9 @@
1, /* FIXME: 1? */
YES);
activePeerCount--;
- entries[u]=entries[activePeerCount];
- shares[u]=shares[activePeerCount];
- adjustedRR[u]=adjustedRR[activePeerCount];
+ entries[u] = entries[activePeerCount];
+ shares[u] = shares[activePeerCount];
+ adjustedRR[u] = adjustedRR[activePeerCount];
u--;
}
@@ -1707,11 +1696,6 @@
at least MIN_BPM_PER_PEER */
}
-#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "freely schedulable bandwidth is %d bpm\n",
- schedulableBandwidth);
-#endif
/* now distribute the schedulableBandwidth according
to the shares. Note that since we cap peers at twice
of what they transmitted last, we may not be done with
@@ -1800,7 +1784,7 @@
entries[u]->idealized_limit);
#endif
entries[u]->current_connection_value /= 2.0;
- entries[u]->recently_received = 0;
+ entries[u]->recently_received /= 2;
}
/* free memory */
@@ -1952,12 +1936,6 @@
return SYSERR; /* could not decrypt */
}
tmp = MALLOC(size - sizeof(HashCode512));
-#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "Decrypting with key %u and IV %u\n",
- *(int*) &be->skey_remote,
- *(int*) &msg->hash);
-#endif
res = decryptBlock(&be->skey_remote,
&msg->sequenceNumber,
size - sizeof(HashCode512),
@@ -2092,10 +2070,6 @@
BufferEntry * be;
MUTEX_LOCK(&lock);
- LOG(LOG_DEBUG,
- "Assigning session key %u %s\n",
- *(int*)key,
- forSending == YES ? "for sending" : "for receiving");
be = lookForHost(peer);
if (be == NULL)
be = addHost(peer, NO);
Modified: GNUnet/src/server/core.c
===================================================================
--- GNUnet/src/server/core.c 2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/src/server/core.c 2005-03-08 11:57:22 UTC (rev 409)
@@ -31,7 +31,7 @@
#include "tcpserver.h"
#include "core.h"
-#define DEBUG_CORE YES
+#define DEBUG_CORE NO
/**
* Linked list of loaded protocols (for clean shutdown).
Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/src/transports/tcp.c 2005-03-08 11:57:22 UTC (rev 409)
@@ -63,7 +63,10 @@
typedef struct {
/**
* size of the message, in bytes, including this header;
- * max 65536-header (network byte order)
+ * max 65535; we do NOT want to make this field an int
+ * because then a malicious peer could cause us to allocate
+ * lots of memory -- this bounds it by 64k/peer.
+ * Field is in network byte order.
*/
unsigned short size;
@@ -417,102 +420,98 @@
}
incrementBytesReceived(ret);
tcpSession->pos += ret;
- len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size);
- if (len > tcpSession->rsize) /* if message larger than read buffer, grow! */
- GROW(tcpSession->rbuff,
- tcpSession->rsize,
- len);
+
+ while (tcpSession->pos > 2) {
+ len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size) +
sizeof(TCPMessagePack);
+ if (len > tcpSession->rsize) /* if message larger than read buffer, grow!
*/
+ GROW(tcpSession->rbuff,
+ tcpSession->rsize,
+ len);
#if DEBUG_TCP
- LOG(LOG_DEBUG,
- "Read %d bytes on socket %d, expecting %d for full message\n",
- tcpSession->pos,
- tcpSession->sock,
- len);
+ LOG(LOG_DEBUG,
+ "Read %d bytes on socket %d, expecting %d for full message\n",
+ tcpSession->pos,
+ tcpSession->sock,
+ len);
#endif
- if ( (tcpSession->pos < 2) ||
- (tcpSession->pos < len) ) {
- tcpDisconnect(tsession);
- return OK;
- }
-
- /* complete message received, let's check what it is */
- if (YES == tcpSession->expectingWelcome) {
- TCPWelcome * welcome;
-#if DEBUG_TCP
- EncName enc;
+ if (tcpSession->pos < len) {
+ tcpDisconnect(tsession);
+ return OK;
+ }
+
+ /* complete message received, let's check what it is */
+ if (YES == tcpSession->expectingWelcome) {
+ TCPWelcome * welcome;
+#if DEBUG_TCP
+ EncName enc;
#endif
+
+ welcome = (TCPWelcome*) &tcpSession->rbuff[0];
+ if ( (ntohs(welcome->version) != 0) ||
+ (ntohs(welcome->size) != sizeof(TCPWelcome)) ) {
+ LOG(LOG_WARNING,
+ _("Expected welcome message on tcp connection, got garbage.
Closing.\n"));
+ tcpDisconnect(tsession);
+ return SYSERR;
+ }
+ tcpSession->expectingWelcome = NO;
+ tcpSession->sender = welcome->clientIdentity;
+#if DEBUG_TCP
+ IFLOG(LOG_DEBUG,
+ hash2enc(&tcpSession->sender.hashPubKey,
+ &enc));
+ LOG(LOG_DEBUG,
+ "tcp welcome message from %s received\n",
+ &enc);
+#endif
+ memmove(&tcpSession->rbuff[0],
+ &tcpSession->rbuff[sizeof(TCPWelcome)],
+ tcpSession->pos - sizeof(TCPWelcome));
+ tcpSession->pos -= sizeof(TCPWelcome);
+ len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size) +
sizeof(TCPMessagePack);
+ }
+ if ( (tcpSession->pos < 2) ||
+ (tcpSession->pos < len) ) {
+ tcpDisconnect(tsession);
+ return OK;
+ }
- welcome = (TCPWelcome*) &tcpSession->rbuff[0];
- if ( (ntohs(welcome->version) != 0) ||
- (ntohs(welcome->size) != sizeof(TCPWelcome)) ) {
+ pack = (TCPMessagePack*)&tcpSession->rbuff[0];
+ /* send msg to core! */
+ if (len <= sizeof(TCPMessagePack)) {
LOG(LOG_WARNING,
- _("Expected welcome message on tcp connection, got garbage.
Closing.\n"));
+ _("Received malformed message (size %u) from tcp-peer connection.
Closing.\n"),
+ len);
tcpDisconnect(tsession);
return SYSERR;
}
- tcpSession->expectingWelcome = NO;
- tcpSession->sender = welcome->clientIdentity;
+ mp = MALLOC(sizeof(MessagePack));
+ mp->msg = MALLOC(len - sizeof(TCPMessagePack));
+ memcpy(mp->msg,
+ &pack[1],
+ len - sizeof(TCPMessagePack));
+ mp->sender = tcpSession->sender;
+ mp->size = len - sizeof(TCPMessagePack);
+ mp->tsession = tsession;
#if DEBUG_TCP
- IFLOG(LOG_DEBUG,
- hash2enc(&tcpSession->sender.hashPubKey,
- &enc));
LOG(LOG_DEBUG,
- "tcp welcome message from %s received\n",
- &enc);
+ "tcp transport received %u bytes, forwarding to core\n",
+ mp->rsize);
#endif
+ coreAPI->receive(mp);
+ /* finally, shrink buffer adequately */
memmove(&tcpSession->rbuff[0],
- &tcpSession->rbuff[sizeof(TCPWelcome)],
- tcpSession->pos - sizeof(TCPWelcome));
- tcpSession->pos -= sizeof(TCPWelcome);
- len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size);
- }
- if ( (tcpSession->pos < 2) ||
- (tcpSession->pos < len) ) {
- tcpDisconnect(tsession);
- return OK;
+ &tcpSession->rbuff[len],
+ tcpSession->pos - len);
+ tcpSession->pos -= len;
+ if ( (tcpSession->pos * 4 < tcpSession->rsize) &&
+ (tcpSession->rsize > 4 * 1024) ) {
+ /* read buffer far too large, shrink! */
+ GROW(tcpSession->rbuff,
+ tcpSession->rsize,
+ tcpSession->pos + 1024);
+ }
}
-
- pack = (TCPMessagePack*)&tcpSession->rbuff[0];
- /* send msg to core! */
- if (len <= sizeof(TCPMessagePack)) {
- LOG(LOG_WARNING,
- _("Received malformed message from tcp-peer connection. Closing.\n"));
- tcpDisconnect(tsession);
- return SYSERR;
- }
- mp = MALLOC(sizeof(MessagePack));
- mp->msg = MALLOC(len);
- memcpy(mp->msg,
- &pack[1],
- len - sizeof(TCPMessagePack));
- mp->sender = tcpSession->sender;
- mp->size = len - sizeof(TCPMessagePack);
- mp->tsession = tsession;
-#if DEBUG_TCP
- LOG(LOG_DEBUG,
- "tcp transport received %u bytes, forwarding to core\n",
- mp->rsize);
-#endif
- coreAPI->receive(mp);
-
- if (tcpSession->pos < len) {
- BREAK();
- tcpDisconnect(tsession);
- return SYSERR;
- }
- /* finally, shrink buffer adequately */
- memmove(&tcpSession->rbuff[0],
- &tcpSession->rbuff[len],
- tcpSession->pos - len);
- tcpSession->pos -= len;
- if ( (tcpSession->pos * 4 < tcpSession->rsize) &&
- (tcpSession->rsize > 4 * 1024) ) {
- /* read buffer far too large, shrink! */
- GROW(tcpSession->rbuff,
- tcpSession->rsize,
- tcpSession->pos + 1024);
- }
-
tcpDisconnect(tsession);
return OK;
}
@@ -789,8 +788,7 @@
static int tcpDirectSend(TCPSession * tcpSession,
void * mp,
unsigned int ssize) {
- int ok;
- int ret;
+ size_t ret;
int success;
if (tcp_shutdown == YES)
@@ -806,54 +804,42 @@
BREAK(); /* size 0 not allowed */
return SYSERR;
}
- ok = SYSERR;
MUTEX_LOCK(&tcplock);
if (tcpSession->wpos > 0) {
/* select already pending... */
- ret = 0;
- } else {
- success = SEND_NONBLOCKING(tcpSession->sock,
- mp,
- ssize,
- &ret);
- if (success == SYSERR) {
- LOG_STRERROR(LOG_INFO, "send");
- MUTEX_UNLOCK(&tcplock);
- return SYSERR;
- } else if (success == NO)
- ret = 0;
+ MUTEX_UNLOCK(&tcplock);
+ return SYSERR;
}
- if ((unsigned int)ret <= ssize) { /* some bytes send or blocked */
- if ((unsigned int)ret < ssize) {
- if (tcpSession->wbuff == NULL) {
- tcpSession->wsize = ssize + sizeof(TCPMessagePack);
- tcpSession->wbuff = MALLOC(tcpSession->wsize);
- tcpSession->wpos = 0;
- }
- if (ssize + tcpSession->wpos - ret >
- tcpSession->wsize) {
- ssize = 0;
- ok = SYSERR; /* buffer full, drop */
- } else {
- memcpy(&tcpSession->wbuff[tcpSession->wpos],
- mp,
- ssize - ret);
- tcpSession->wpos += ssize - ret;
- if (tcpSession->wpos == ssize - ret)
- signalSelect(); /* select set changed! */
- ok = OK; /* all buffered */
- }
- } else
- ok = OK; /* all written */
- } else {
- LOG_STRERROR(LOG_WARNING, "send");
- ssize = 0;
- ok = SYSERR; /* write failed for real */
+ success = SEND_NONBLOCKING(tcpSession->sock,
+ mp,
+ ssize,
+ &ret);
+ if (success == SYSERR) {
+#if DEBUG_TCP
+ LOG_STRERROR(LOG_INFO, "send");
+#endif
+ MUTEX_UNLOCK(&tcplock);
+ return SYSERR;
}
+ if (success == NO)
+ ret = 0;
+
+ if (ret < ssize) {/* partial send */
+ if (tcpSession->wsize < ssize - ret) {
+ GROW(tcpSession->wbuff,
+ tcpSession->wsize,
+ ssize - ret);
+ }
+ memcpy(tcpSession->wbuff,
+ mp,
+ ssize - ret);
+ tcpSession->wpos = ssize - ret;
+ signalSelect(); /* select set changed! */
+ }
MUTEX_UNLOCK(&tcplock);
cronTime(&tcpSession->lastUse);
incrementBytesSent(ssize);
- return ok;
+ return OK;
}
/**
@@ -918,8 +904,9 @@
const unsigned int size) {
TCPMessagePack * mp;
int ok;
- int ssize;
+ if (size >= MAX_BUFFER_SIZE)
+ return SYSERR;
if (tcp_shutdown == YES)
return SYSERR;
if (size == 0) {
@@ -932,12 +919,11 @@
memcpy(&mp[1],
msg,
size);
- ssize = size + sizeof(TCPMessagePack);
- mp->size = htons(ssize);
+ mp->size = htons(size);
mp->reserved = 0;
ok = tcpDirectSendReliable(tsession->internal,
mp,
- ssize);
+ size + sizeof(TCPMessagePack));
FREE(mp);
return ok;
}
@@ -1128,8 +1114,10 @@
const unsigned int size) {
TCPMessagePack * mp;
int ok;
- int ssize;
-
+
+ if (size >= MAX_BUFFER_SIZE)
+ return SYSERR;
+
if (tcp_shutdown == YES)
return SYSERR;
if (size == 0) {
@@ -1142,12 +1130,11 @@
memcpy(&mp[1],
msg,
size);
- ssize = size + sizeof(TCPMessagePack);
- mp->size = htons(ssize);
+ mp->size = htons(size);
mp->reserved = 0;
ok = tcpDirectSend(tsession->internal,
mp,
- ssize);
+ size + sizeof(TCPMessagePack));
FREE(mp);
return ok;
}
Modified: GNUnet/src/transports/tcp6.c
===================================================================
--- GNUnet/src/transports/tcp6.c 2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/src/transports/tcp6.c 2005-03-08 11:57:22 UTC (rev 409)
@@ -410,101 +410,103 @@
}
incrementBytesReceived(ret);
tcp6Session->pos += ret;
- len = ntohs(((TCP6MessagePack*)&tcp6Session->rbuff[0])->size);
- if (len > tcp6Session->rsize) /* if MTU larger than expected, grow! */
- GROW(tcp6Session->rbuff,
- tcp6Session->rsize,
- len);
+
+ while (tcp6Session->pos > 2) {
+ len = ntohs(((TCP6MessagePack*)&tcp6Session->rbuff[0])->size) +
sizeof(TCP6MessagePack);
+ if (len > tcp6Session->rsize) /* if MTU larger than expected, grow! */
+ GROW(tcp6Session->rbuff,
+ tcp6Session->rsize,
+ len);
#if DEBUG_TCP6
- LOG(LOG_DEBUG,
- "Read %d bytes on socket %d, expecting %d for full message\n",
- tcp6Session->pos,
- tcp6Session->sock,
- len);
+ LOG(LOG_DEBUG,
+ "Read %d bytes on socket %d, expecting %d for full message\n",
+ tcp6Session->pos,
+ tcp6Session->sock,
+ len);
#endif
- if ( (tcp6Session->pos < 2) ||
- (tcp6Session->pos < len) ) {
- tcp6Disconnect(tsession);
- return OK;
- }
-
- /* complete message received, let's check what it is */
- if (YES == tcp6Session->expectingWelcome) {
- TCP6Welcome * welcome;
+ if (tcp6Session->pos < len) {
+ tcp6Disconnect(tsession);
+ return OK;
+ }
+
+ /* complete message received, let's check what it is */
+ if (YES == tcp6Session->expectingWelcome) {
+ TCP6Welcome * welcome;
#if DEBUG_TCP6
- EncName hex;
+ EncName hex;
#endif
+
+ welcome = (TCP6Welcome*) &tcp6Session->rbuff[0];
+ if ( (ntohs(welcome->version) != 0) ||
+ (ntohs(welcome->size) != sizeof(TCP6Welcome)) ) {
+ LOG(LOG_WARNING,
+ _("Expected welcome message on tcp connection, got garbage.
Closing.\n"));
+ tcp6Disconnect(tsession);
+ return SYSERR;
+ }
+ tcp6Session->expectingWelcome = NO;
+ tcp6Session->sender = welcome->clientIdentity;
+#if DEBUG_TCP6
+ IFLOG(LOG_DEBUG,
+ hash2enc(&tcp6Session->sender.hashPubKey,
+ &enc));
+ LOG(LOG_DEBUG,
+ "tcp6 welcome message from %s received\n",
+ &enc);
+#endif
+ memmove(&tcp6Session->rbuff[0],
+ &tcp6Session->rbuff[sizeof(TCP6Welcome)],
+ tcp6Session->pos - sizeof(TCP6Welcome));
+ tcp6Session->pos -= sizeof(TCP6Welcome);
+ len = ntohs(((TCP6MessagePack*)&tcp6Session->rbuff[0])->size) +
sizeof(TCP6MessagePack);
+ }
+ if ( (tcp6Session->pos < 2) ||
+ (tcp6Session->pos < len) ) {
+ tcp6Disconnect(tsession);
+ return OK;
+ }
- welcome = (TCP6Welcome*) &tcp6Session->rbuff[0];
- if ( (ntohs(welcome->version) != 0) ||
- (ntohs(welcome->size) != sizeof(TCP6Welcome)) ) {
+ pack = (TCP6MessagePack*)&tcp6Session->rbuff[0];
+ /* send msg to core! */
+ if (len <= sizeof(TCP6MessagePack)) {
LOG(LOG_WARNING,
- _("Expected welcome message on tcp connection, got garbage.
Closing.\n"));
+ _("Received malformed message from tcp6-peer connection. Closing
connection.\n"));
tcp6Disconnect(tsession);
return SYSERR;
}
- tcp6Session->expectingWelcome = NO;
- tcp6Session->sender = welcome->clientIdentity;
+ mp = MALLOC(sizeof(MessagePack));
+ mp->msg = MALLOC(len - sizeof(TCP6MessagePack));
+ memcpy(mp->msg,
+ &pack[1],
+ len - sizeof(TCP6MessagePack));
+ mp->sender = tcp6Session->sender;
+ mp->size = len - sizeof(TCP6MessagePack);
+ mp->tsession = tsession;
#if DEBUG_TCP6
- IFLOG(LOG_DEBUG,
- hash2enc(&tcp6Session->sender.hashPubKey,
- &enc));
LOG(LOG_DEBUG,
- "tcp6 welcome message from %s received\n",
- &enc);
+ "tcp6 transport received %d bytes, forwarding to core\n",
+ mp->size);
#endif
+ coreAPI->receive(mp);
+
+ if (tcp6Session->pos < len) {
+ BREAK();
+ tcp6Disconnect(tsession);
+ return SYSERR;
+ }
+ /* finally, shrink buffer adequately */
memmove(&tcp6Session->rbuff[0],
- &tcp6Session->rbuff[sizeof(TCP6Welcome)],
- tcp6Session->pos - sizeof(TCP6Welcome));
- tcp6Session->pos -= sizeof(TCP6Welcome);
- len = ntohs(((TCP6MessagePack*)&tcp6Session->rbuff[0])->size);
- }
- if ( (tcp6Session->pos < 2) ||
- (tcp6Session->pos < len) ) {
- tcp6Disconnect(tsession);
- return OK;
+ &tcp6Session->rbuff[len],
+ tcp6Session->pos - len);
+ tcp6Session->pos -= len;
+ if ( (tcp6Session->pos * 4 < tcp6Session->rsize) &&
+ (tcp6Session->rsize > 4 * 1024) ) {
+ /* read buffer far too large, shrink! */
+ GROW(tcp6Session->rbuff,
+ tcp6Session->rsize,
+ tcp6Session->pos + 1024);
+ }
}
-
- pack = (TCP6MessagePack*)&tcp6Session->rbuff[0];
- /* send msg to core! */
- if (len <= sizeof(TCP6MessagePack)) {
- LOG(LOG_WARNING,
- _("Received malformed message from tcp6-peer connection. Closing
connection.\n"));
- tcp6Disconnect(tsession);
- return SYSERR;
- }
- mp = MALLOC(sizeof(MessagePack));
- mp->msg = MALLOC(len);
- memcpy(mp->msg,
- &pack[1],
- len - sizeof(TCP6MessagePack));
- mp->sender = tcp6Session->sender;
- mp->size = len - sizeof(TCP6MessagePack);
- mp->tsession = tsession;
-#if DEBUG_TCP6
- LOG(LOG_DEBUG,
- "tcp6 transport received %d bytes, forwarding to core\n",
- mp->size);
-#endif
- coreAPI->receive(mp);
-
- if (tcp6Session->pos < len) {
- BREAK();
- tcp6Disconnect(tsession);
- return SYSERR;
- }
- /* finally, shrink buffer adequately */
- memmove(&tcp6Session->rbuff[0],
- &tcp6Session->rbuff[len],
- tcp6Session->pos - len);
- tcp6Session->pos -= len;
- if ( (tcp6Session->pos * 4 < tcp6Session->rsize) &&
- (tcp6Session->rsize > 4 * 1024) ) {
- /* read buffer far too large, shrink! */
- GROW(tcp6Session->rbuff,
- tcp6Session->rsize,
- tcp6Session->pos + 1024);
- }
tcp6Disconnect(tsession);
return OK;
}
@@ -772,8 +774,7 @@
static int tcp6DirectSend(TCP6Session * tcp6Session,
void * mp,
unsigned int ssize) {
- int ok;
- int ret;
+ size_t ret;
int success;
if (tcp6_shutdown == YES)
@@ -789,57 +790,39 @@
BREAK();
return SYSERR;
}
- if (ssize > tcp6API.mtu + sizeof(TCP6MessagePack)) {
- BREAK();
- return SYSERR;
- }
- ok = SYSERR;
MUTEX_LOCK(&tcp6lock);
if (tcp6Session->wpos > 0) {
- ret = 0;
- } else {
- success = SEND_NONBLOCKING(tcp6Session->sock,
- mp,
- ssize,
- &ret);
- if (success == SYSERR) {
- LOG_STRERROR(LOG_INFO, "send");
- MUTEX_UNLOCK(&tcp6lock);
- return SYSERR;
- } else if (success == NO)
- ret = 0;
+ MUTEX_UNLOCK(&tcp6lock);
+ return SYSERR;
}
- if ((unsigned int) ret <= ssize) { /* some bytes send or blocked */
- if ((unsigned int)ret < ssize) {
- if (tcp6Session->wbuff == NULL) {
- tcp6Session->wsize = ssize + sizeof(TCP6MessagePack);
- tcp6Session->wbuff = MALLOC(tcp6Session->wsize);
- tcp6Session->wpos = 0;
- }
- if (tcp6Session->wpos + ssize - ret >
- tcp6Session->wsize) {
- ssize = 0;
- ok = SYSERR; /* buffer full, drop */
- } else {
- memcpy(&tcp6Session->wbuff[tcp6Session->wpos],
- mp,
- ssize - ret);
- tcp6Session->wpos += ssize - ret;
- if (tcp6Session->wpos == ssize - ret)
- signalSelect(); /* select set changed! */
- ok = OK; /* all buffered */
- }
- } else
- ok = OK; /* all written */
- } else {
- LOG_STRERROR(LOG_WARNING, "send");
- ssize = 0;
- ok = SYSERR; /* write failed for real */
+ success = SEND_NONBLOCKING(tcp6Session->sock,
+ mp,
+ ssize,
+ &ret);
+ if (success == SYSERR) {
+ LOG_STRERROR(LOG_INFO, "send");
+ MUTEX_UNLOCK(&tcp6lock);
+ return SYSERR;
}
+ if (success == NO)
+ ret = 0;
+
+ if (ret < ssize) { /* partial send */
+ if (tcp6Session->wsize < ssize - ret) {
+ GROW(tcp6Session->wbuff,
+ tcp6Session->wsize,
+ ssize - ret);
+ }
+ memcpy(tcp6Session->wbuff,
+ mp,
+ ssize - ret);
+ tcp6Session->wpos = ssize - ret;
+ signalSelect(); /* select set changed! */
+ }
MUTEX_UNLOCK(&tcp6lock);
cronTime(&tcp6Session->lastUse);
incrementBytesSent(ssize);
- return ok;
+ return OK;
}
@@ -902,8 +885,9 @@
const unsigned int size) {
TCP6MessagePack * mp;
int ok;
- int ssize;
+ if (size >= MAX_BUFFER_SIZE)
+ return SYSERR;
if (tcp6_shutdown == YES)
return SYSERR;
if (size == 0) {
@@ -916,12 +900,11 @@
memcpy(&mp[1],
msg,
size);
- ssize = size + sizeof(TCP6MessagePack);
- mp->size = htons(ssize);
+ mp->size = htons(size);
mp->reserved = 0;
ok = tcp6DirectSendReliable(tsession->internal,
mp,
- ssize);
+ size + sizeof(TCP6MessagePack));
FREE(mp);
return ok;
}
@@ -1139,8 +1122,9 @@
const unsigned int size) {
TCP6MessagePack * mp;
int ok;
- int ssize;
+ if (size >= MAX_BUFFER_SIZE)
+ return SYSERR;
if (tcp6_shutdown == YES)
return SYSERR;
if (size == 0) {
@@ -1153,12 +1137,11 @@
memcpy(&mp[1],
msg,
size);
- ssize = size + sizeof(TCP6MessagePack);
- mp->size = htons(ssize);
+ mp->size = htons(size);
mp->reserved = 0;
ok = tcp6DirectSend(tsession->internal,
mp,
- ssize);
+ size + sizeof(TCP6MessagePack));
FREE(mp);
return ok;
}
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2005-03-08 04:07:02 UTC (rev 408)
+++ GNUnet/todo 2005-03-08 11:57:22 UTC (rev 409)
@@ -7,10 +7,11 @@
0.7.0pre1 [4'05] (aka "preview"):
- testing:
+ * tbench:
+ + CAN TEST: http transport (after MTU change)
* sqlite-tests: test concurrency with iterators
* gnunet-pseudonym
* gnunet-search: multiple search results don't work (yet); test on FSLIB and
ECRS levels!
- * tbench is awful code (FIX) and also somehow _breaks_ bandwidth
limitations! (Mantis #766 anyone?)
- FSUI:
* download: various details wrt generated events
* namespace updates
@@ -21,7 +22,6 @@
* dht / gnunet-dht-join and gnunet-dht-query
* fs
- transport refactoring:
- * TEST: TCP MTU change [ at this point, it may just work or equally just
crash and burn ]
* UPDATE: HTTP: MTU change!
* port knocking support? [ tricky ]
- UI features:
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r409 - in GNUnet: . src/applications/advertising src/applications/fragmentation src/applications/identity src/applications/tbench src/applications/topology_default src/server src/transports,
grothoff <=