[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r400 - in GNUnet: . src/server src/transports
From: |
grothoff |
Subject: |
[GNUnet-SVN] r400 - in GNUnet: . src/server src/transports |
Date: |
Mon, 7 Mar 2005 00:14:12 -0800 (PST) |
Author: grothoff
Date: 2005-03-07 00:14:10 -0800 (Mon, 07 Mar 2005)
New Revision: 400
Modified:
GNUnet/src/server/connection.c
GNUnet/src/transports/tcp.c
GNUnet/todo
Log:
TCP MTU change - only partially tested, beware
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2005-03-07 04:47:20 UTC (rev 399)
+++ GNUnet/src/server/connection.c 2005-03-07 08:14:10 UTC (rev 400)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2001, 2002, 2003, 2004 Christian Grothoff (and other contributing
authors)
+ (C) 2001, 2002, 2003, 2004, 2005 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -311,7 +311,7 @@
PeerIdentity sender;
/**
- * The MTU for this session.
+ * The MTU for this session, 0 for streaming transports.
*/
unsigned short mtu;
@@ -828,6 +828,7 @@
int tailpos;
int approxProb;
int remainingBufferSize;
+ unsigned int totalMessageSize;
ENTRY();
/* fast ways out */
@@ -845,15 +846,23 @@
return; /* nothing to send */
}
+
+
/* recompute max send frequency */
if (be->max_bpm <= 0)
be->max_bpm = 1;
-
- be->MAX_SEND_FREQUENCY = /* ms per message */
- be->session.mtu /* byte per message */
- / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
- / 2; /* some head-room */
-
+
+ if (be->session.mtu == 0) {
+ be->MAX_SEND_FREQUENCY = /* ms per message */
+ EXPECTED_MTU
+ / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
+ / 2;
+ } else {
+ be->MAX_SEND_FREQUENCY = /* ms per message */
+ be->session.mtu /* byte per message */
+ / (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
+ / 2; /* some head-room */
+ }
/* Also: allow at least MINIMUM_SAMPLE_COUNT knapsack
solutions for any MIN_SAMPLE_TIME! */
if (be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
@@ -867,84 +876,130 @@
#endif
return; /* frequency too high, wait */
}
- /* solve knapsack problem, compute accumulated priority */
- knapsackSolution = MALLOC(sizeof(int) * be->sendBufferSize);
- approxProb = getCPULoad();
- if (approxProb > 50) {
- if (approxProb > 100)
- approxProb = 100;
- approxProb = 100 - approxProb; /* now value between 0 and 50 */
- approxProb *= 2; /* now value between 0 [always approx] and 100 [never
approx] */
- /* control CPU load probabilistically! */
- if (randomi(1+approxProb) == 0) {
- priority = approximateKnapsack(be,
- be->session.mtu - sizeof(P2P_Message),
- knapsackSolution);
+ /* test if receiver has enough bandwidth available! */
+ updateCurBPS(be);
+#if DEBUG_CONNECTION
+ LOG(LOG_DEBUG,
+ "receiver window available: %lld bytes (MTU: %u)\n",
+ be->available_send_window,
+ be->session.mtu);
+#endif
+
+
+ if (be->session.mtu == 0) {
+ SendEntry ** entries;
+
+ entries = be->sendBuffer;
+ totalMessageSize = 0;
+ knapsackSolution = MALLOC(sizeof(int) * be->sendBufferSize);
+ priority = 0;
+ i = 0;
+ /* assumes entries are sorted by priority! */
+ while (i < be->sendBufferSize) {
+ if ( (totalMessageSize + entries[i]->len < 60000) &&
+ (entries[i]->pri >= EXTREME_PRIORITY) ) {
+ knapsackSolution[i] = YES;
+ priority += entries[i]->pri;
+ totalMessageSize += entries[i]->len;
+ } else {
+ break;
+ }
+ i++;
+ }
+ while ( (i < be->sendBufferSize) &&
+ (be->available_send_window > totalMessageSize) ) {
+ if (entries[i]->len + totalMessageSize <=
+ be->available_send_window) {
+ knapsackSolution[i] = YES;
+ totalMessageSize += entries[i]->len;
+ priority += entries[i]->pri;
+ } else {
+ knapsackSolution[i] = NO;
+ if (totalMessageSize == 0) {
+ /* if the highest-priority message does not yet
+ fit, wait for send window to grow so that
+ we can get it out (otherwise we would starve
+ high-priority, large messages) */
+ FREE(knapsackSolution);
+ return;
+ }
+ }
+ i++;
+ }
+ } else { /* if (be->session.mtu == 0) */
+ /* solve knapsack problem, compute accumulated priority */
+ knapsackSolution = MALLOC(sizeof(int) * be->sendBufferSize);
+
+ approxProb = getCPULoad();
+ if (approxProb > 50) {
+ if (approxProb > 100)
+ approxProb = 100;
+ approxProb = 100 - approxProb; /* now value between 0 and 50 */
+ approxProb *= 2; /* now value between 0 [always approx] and 100 [never
approx] */
+ /* control CPU load probabilistically! */
+ if (randomi(1+approxProb) == 0) {
+ priority = approximateKnapsack(be,
+ be->session.mtu - sizeof(P2P_Message),
+ knapsackSolution);
#if DEBUG_COLLECT_PRIO == YES
- fprintf(prioFile, "%llu 0 %d\n", cronTime(NULL), priority);
+ fprintf(prioFile, "%llu 0 %d\n", cronTime(NULL), priority);
#endif
- } else {
+ } else {
+ priority = solveKnapsack(be,
+ be->session.mtu - sizeof(P2P_Message),
+ knapsackSolution);
+#if DEBUG_COLLECT_PRIO == YES
+ fprintf(prioFile, "%llu 1 %d\n", cronTime(NULL), priority);
+#endif
+ }
+ } else { /* never approximate < 50% CPU load */
priority = solveKnapsack(be,
be->session.mtu - sizeof(P2P_Message),
knapsackSolution);
#if DEBUG_COLLECT_PRIO == YES
- fprintf(prioFile, "%llu 1 %d\n", cronTime(NULL), priority);
+ fprintf(prioFile, "%llu 2 %d\n", cronTime(NULL), priority);
#endif
}
- } else { /* never approximate < 50% CPU load */
- priority = solveKnapsack(be,
- be->session.mtu - sizeof(P2P_Message),
- knapsackSolution);
-#if DEBUG_COLLECT_PRIO == YES
- fprintf(prioFile, "%llu 2 %d\n", cronTime(NULL), priority);
-#endif
- }
- j = 0;
- for (i=0;i<be->sendBufferSize;i++)
- if (knapsackSolution[i] == YES)
- j++;
- if (j == 0) {
- LOG(LOG_ERROR,
- _("'%s' selected %d out of %d messages (MTU: %d).\n"),
- "solveKnapsack",
- j,
- be->sendBufferSize,
- be->session.mtu - sizeof(P2P_Message));
-
- for (j=0;j<be->sendBufferSize;j++)
+ j = 0;
+ for (i=0;i<be->sendBufferSize;i++)
+ if (knapsackSolution[i] == YES)
+ j++;
+ if (j == 0) {
LOG(LOG_ERROR,
- _("Message details: %u: length %d, priority: %d\n"),
- j,
- be->sendBuffer[j]->len,
- be->sendBuffer[j]->pri);
- FREE(knapsackSolution);
- return;
- }
+ _("'%s' selected %d out of %d messages (MTU: %d).\n"),
+ "solveKnapsack",
+ j,
+ be->sendBufferSize,
+ be->session.mtu - sizeof(P2P_Message));
+
+ for (j=0;j<be->sendBufferSize;j++)
+ LOG(LOG_ERROR,
+ _("Message details: %u: length %d, priority: %d\n"),
+ j,
+ be->sendBuffer[j]->len,
+ be->sendBuffer[j]->pri);
+ FREE(knapsackSolution);
+ return;
+ }
- /* test if receiver has enough bandwidth available! */
- updateCurBPS(be);
+ if (be->available_send_window < be->session.mtu) {
+ /* if we have a very high priority, we may
+ want to ignore bandwidth availability (e.g. for HANGUP,
+ which has EXTREME_PRIORITY) */
+ if (priority < EXTREME_PRIORITY) {
+ FREE(knapsackSolution);
#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "receiver window available: %lld bytes (MTU: %u)\n",
- be->available_send_window,
- be->session.mtu);
+ LOG(LOG_DEBUG,
+ "bandwidth limits prevent sending (send window %u too small).\n",
+ be->available_send_window);
#endif
- if (be->available_send_window < be->session.mtu) {
- /* if we have a very high priority, we may
- want to ignore bandwidth availability (e.g. for HANGUP,
- which has EXTREME_PRIORITY) */
- if (priority < EXTREME_PRIORITY) {
- FREE(knapsackSolution);
-#if DEBUG_CONNECTION
- LOG(LOG_DEBUG,
- "bandwidth limits prevent sending (send window %u too small).\n",
- be->available_send_window);
-#endif
- return; /* can not send, BPS available is too small */
+ return; /* can not send, BPS available is too small */
+ }
}
- }
-
+ totalMessageSize = be->session.mtu;
+ } /* end MTU > 0 */
+
expired = cronTime(NULL) - SECONDS_PINGATTEMPT * cronSECONDS;
/* if it's more than one connection "lifetime" old, always kill it! */
@@ -991,8 +1046,8 @@
}
/* build message (start with sequence number) */
- GNUNET_ASSERT(be->session.mtu > sizeof(P2P_Message));
- plaintextMsg = MALLOC(be->session.mtu);
+ GNUNET_ASSERT(totalMessageSize > sizeof(P2P_Message));
+ plaintextMsg = MALLOC(totalMessageSize);
p2pHdr = (P2P_Message*) plaintextMsg;
p2pHdr->timeStamp
= htonl(TIME(NULL));
@@ -1057,12 +1112,12 @@
int msgCap;
int l = getCPULoad();
if (l >= 50) {
- msgCap = be->session.mtu / sizeof(HashCode512);
+ msgCap = EXPECTED_MTU / sizeof(HashCode512);
} else {
if (l <= 0)
l = 1;
- msgCap = be->session.mtu / sizeof(HashCode512)
- + (MAX_SEND_BUFFER_SIZE - be->session.mtu / sizeof(HashCode512)) / l;
+ msgCap = EXPECTED_MTU / sizeof(HashCode512)
+ + (MAX_SEND_BUFFER_SIZE - EXPECTED_MTU / sizeof(HashCode512)) / l;
}
if (be->max_bpm > 2) {
msgCap += 2 * (int) log((double)be->max_bpm);
@@ -1101,7 +1156,7 @@
/* still room left? try callbacks! */
pos = scl_nextHead;
while (pos != NULL) {
- if (pos->minimumPadding + p <= be->session.mtu) {
+ if (pos->minimumPadding + p <= totalMessageSize) {
p += pos->callback(&be->session.sender,
&plaintextMsg[p],
be->session.mtu - p);
@@ -1110,10 +1165,10 @@
}
/* finally padd with noise */
- if ( (p + sizeof(p2p_HEADER) <= be->session.mtu) &&
+ if ( (p + sizeof(p2p_HEADER) <= totalMessageSize) &&
(disable_random_padding == NO) ) {
p2p_HEADER * part;
- unsigned short noiseLen = be->session.mtu - p;
+ unsigned short noiseLen = totalMessageSize - p;
part = (p2p_HEADER *) &plaintextMsg[p];
part->size
@@ -1121,10 +1176,10 @@
part->type
= htons(p2p_PROTO_NOISE);
for (i=p+sizeof(p2p_HEADER);
- i < be->session.mtu;
+ i < totalMessageSize;
i++)
plaintextMsg[i] = (char) rand();
- p = be->session.mtu;
+ p = totalMessageSize;
}
encryptedMsg = MALLOC(p);
@@ -1155,8 +1210,8 @@
(SYSERR == transport->sendReliable(be->session.tsession,
encryptedMsg,
p))) ) ) {
- if (be->available_send_window > be->session.mtu)
- be->available_send_window -= be->session.mtu;
+ if (be->available_send_window > totalMessageSize)
+ be->available_send_window -= totalMessageSize;
else
be->available_send_window = 0; /* if we overrode limits,
reset to 0 at least... */
@@ -1205,7 +1260,8 @@
BREAK();
return;
}
- if (se->len > be->session.mtu - sizeof(P2P_Message)) {
+ if ( (be->session.mtu != 0) &&
+ (se->len > be->session.mtu - sizeof(P2P_Message)) ) {
/* this message is so big that it must be fragmented! */
fragmentation->fragment(&be->session.sender,
be->session.mtu,
Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2005-03-07 04:47:20 UTC (rev 399)
+++ GNUnet/src/transports/tcp.c 2005-03-07 08:14:10 UTC (rev 400)
@@ -140,9 +140,9 @@
unsigned int pos;
/**
- * Current size of the buffer.
+ * Current size of the read buffer.
*/
- unsigned int size;
+ unsigned int rsize;
/**
* The read buffer.
@@ -159,6 +159,11 @@
*/
char * wbuff;
+ /**
+ * Size of the write buffer
+ */
+ unsigned int wsize;
+
} TCPSession;
/* *********** globals ************* */
@@ -376,9 +381,15 @@
if (SYSERR == tcpAssociate(tsession))
return SYSERR;
tcpSession = tsession->internal;
+ if (tcpSession->rsize == tcpSession->pos) {
+ /* read buffer too small, grow */
+ GROW(tcpSession->rbuff,
+ tcpSession->rsize,
+ tcpSession->rsize * 2);
+ }
ret = READ(tcpSession->sock,
&tcpSession->rbuff[tcpSession->pos],
- tcpSession->size - tcpSession->pos);
+ tcpSession->rsize - tcpSession->pos);
cronTime(&tcpSession->lastUse);
if (ret == 0) {
tcpDisconnect(tsession);
@@ -407,9 +418,9 @@
incrementBytesReceived(ret);
tcpSession->pos += ret;
len = ntohs(((TCPMessagePack*)&tcpSession->rbuff[0])->size);
- if (len > tcpSession->size) /* if MTU larger than expected, grow! */
+ if (len > tcpSession->rsize) /* if message larger than read buffer, grow! */
GROW(tcpSession->rbuff,
- tcpSession->size,
+ tcpSession->rsize,
len);
#if DEBUG_TCP
LOG(LOG_DEBUG,
@@ -479,8 +490,8 @@
mp->tsession = tsession;
#if DEBUG_TCP
LOG(LOG_DEBUG,
- "tcp transport received %d bytes, forwarding to core\n",
- mp->size);
+ "tcp transport received %u bytes, forwarding to core\n",
+ mp->rsize);
#endif
coreAPI->receive(mp);
@@ -493,7 +504,14 @@
memmove(&tcpSession->rbuff[0],
&tcpSession->rbuff[len],
tcpSession->pos - len);
- tcpSession->pos -= len;
+ tcpSession->pos -= len;
+ if ( (tcpSession->pos * 4 < tcpSession->rsize) &&
+ (tcpSession->rsize > 4 * 1024) ) {
+ /* read buffer far too large, shrink! */
+ GROW(tcpSession->rbuff,
+ tcpSession->rsize,
+ tcpSession->pos + 1024);
+ }
tcpDisconnect(tsession);
return OK;
@@ -531,10 +549,11 @@
tcpSession = MALLOC(sizeof(TCPSession));
tcpSession->pos = 0;
- tcpSession->size = tcpAPI.mtu + sizeof(TCPMessagePack);
- tcpSession->rbuff = MALLOC(tcpSession->size);
+ tcpSession->rsize = 2 * 1024 + sizeof(TCPMessagePack);
+ tcpSession->rbuff = MALLOC(tcpSession->rsize);
tcpSession->wpos = 0;
tcpSession->wbuff = NULL;
+ tcpSession->wsize = 0;
tcpSession->sock = sock;
/* fill in placeholder identity to mark that we
are waiting for the welcome message */
@@ -692,9 +711,9 @@
try_again_1:
success = SEND_NONBLOCKING(sock,
- tcpSession->wbuff,
- tcpSession->wpos,
- &ret);
+ tcpSession->wbuff,
+ tcpSession->wpos,
+ &ret);
if (success == SYSERR) {
LOG_STRERROR(LOG_WARNING, "send");
destroySession(i);
@@ -718,7 +737,8 @@
if ((unsigned int)ret == tcpSession->wpos) {
FREENONNULL(tcpSession->wbuff);
tcpSession->wbuff = NULL;
- tcpSession->wpos = 0;
+ tcpSession->wpos = 0;
+ tcpSession->wsize = 0;
} else {
memmove(tcpSession->wbuff,
&tcpSession->wbuff[ret],
@@ -779,10 +799,6 @@
BREAK(); /* size 0 not allowed */
return SYSERR;
}
- if (ssize > tcpAPI.mtu + sizeof(TCPMessagePack)) {
- BREAK(); /* size > mtu */
- return SYSERR;
- }
ok = SYSERR;
MUTEX_LOCK(&tcplock);
if (tcpSession->wpos > 0) {
@@ -793,20 +809,21 @@
mp,
ssize,
&ret);
- if (success == SYSERR) {
- LOG_STRERROR(LOG_INFO, "send");
- MUTEX_UNLOCK(&tcplock);
- return SYSERR;
- } else if (success == NO)
- ret = 0;
+ if (success == SYSERR) {
+ LOG_STRERROR(LOG_INFO, "send");
+ MUTEX_UNLOCK(&tcplock);
+ return SYSERR;
+ } else if (success == NO)
+ ret = 0;
}
if ((unsigned int)ret <= ssize) { /* some bytes send or blocked */
if ((unsigned int)ret < ssize) {
if (tcpSession->wbuff == NULL) {
- tcpSession->wbuff = MALLOC(tcpAPI.mtu + sizeof(TCPMessagePack));
- tcpSession->wpos = 0;
+ tcpSession->wsize = ssize + sizeof(TCPMessagePack);
+ tcpSession->wbuff = MALLOC(tcpSession->wsize);
+ tcpSession->wpos = 0;
}
- if (ssize + tcpSession->wpos > tcpAPI.mtu + sizeof(TCPMessagePack) +
ret) {
+ if (ssize + tcpSession->wpos - ret > tcpSession->wsize) {
ssize = 0;
ok = SYSERR; /* buffer full, drop */
} else {
@@ -864,9 +881,8 @@
MUTEX_LOCK(&tcplock);
if (tcpSession->wpos > 0) {
unsigned int old = tcpSession->wpos;
- /* reliable: grow send-buffer above limit! */
GROW(tcpSession->wbuff,
- tcpSession->wpos,
+ tcpSession->wsize,
tcpSession->wpos + ssize);
memcpy(&tcpSession->wbuff[old],
mp,
@@ -1013,8 +1029,9 @@
tcpSession->sock = sock;
tcpSession->wpos = 0;
tcpSession->wbuff = NULL;
- tcpSession->size = tcpAPI.mtu + sizeof(TCPMessagePack);
- tcpSession->rbuff = MALLOC(tcpSession->size);
+ tcpSession->wsize = 0;
+ tcpSession->rsize = 2 * 1024 + sizeof(TCPMessagePack);
+ tcpSession->rbuff = MALLOC(tcpSession->rsize);
tsession = MALLOC(sizeof(TSession));
tsession->internal = tcpSession;
tsession->ttype = tcpAPI.protocolNumber;
@@ -1065,7 +1082,7 @@
if (tcp_shutdown == YES)
return SYSERR;
- if ( (size == 0) || (size > tcpAPI.mtu) ) {
+ if (size == 0) {
BREAK();
return SYSERR;
}
@@ -1104,7 +1121,7 @@
if (tcp_shutdown == YES)
return SYSERR;
- if ( (size == 0) || (size > tcpAPI.mtu) ) {
+ if (size == 0) {
BREAK();
return SYSERR;
}
@@ -1276,8 +1293,6 @@
* via a global and returns the udp transport API.
*/
TransportAPI * inittransport_tcp(CoreAPIForTransport * core) {
- int mtu;
-
MUTEX_CREATE_RECURSIVE(&tcplock);
reloadConfiguration();
tsessionCount = 0;
@@ -1286,17 +1301,8 @@
tsessionArrayLength,
32);
coreAPI = core;
- mtu = getConfigurationInt("TCP",
- "MTU");
- if (mtu == 0)
- mtu = 1460;
- if (mtu < 1200)
- LOG(LOG_ERROR,
- _("MTU for '%s' is probably too low (fragmentation not
implemented!)\n"),
- "TCP");
-
tcpAPI.protocolNumber = TCP_PROTOCOL_NUMBER;
- tcpAPI.mtu = mtu - sizeof(TCPMessagePack);
+ tcpAPI.mtu = 0;
tcpAPI.cost = 20000; /* about equal to udp */
tcpAPI.verifyHelo = &verifyHelo;
tcpAPI.createHELO = &createHELO;
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2005-03-07 04:47:20 UTC (rev 399)
+++ GNUnet/todo 2005-03-07 08:14:10 UTC (rev 400)
@@ -21,7 +21,8 @@
* dht / gnunet-dht-join and gnunet-dht-query
* fs
- transport refactoring:
- * TCP MTU change [ hard ]
+ * TEST: TCP MTU change [ at this point, it may just work or equally just
crash and burn ]
+ * UPDATE: TCP6/HTTP: MTU change!
* port knocking support? [ tricky ]
- UI features:
* date feature (Mantis #789)
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r400 - in GNUnet: . src/server src/transports,
grothoff <=