gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r5197 - in GNUnet/src: applications/traffic server


From: gnunet
Subject: [GNUnet-SVN] r5197 - in GNUnet/src: applications/traffic server
Date: Sun, 1 Jul 2007 04:00:47 -0600 (MDT)

Author: grothoff
Date: 2007-07-01 04:00:47 -0600 (Sun, 01 Jul 2007)
New Revision: 5197

Modified:
   GNUnet/src/applications/traffic/traffic.c
   GNUnet/src/server/connection.c
   GNUnet/src/server/handler.c
Log:
hacking

Modified: GNUnet/src/applications/traffic/traffic.c
===================================================================
--- GNUnet/src/applications/traffic/traffic.c   2007-07-01 09:18:05 UTC (rev 
5196)
+++ GNUnet/src/applications/traffic/traffic.c   2007-07-01 10:00:47 UTC (rev 
5197)
@@ -50,21 +50,13 @@
  */
 #define HISTORY_SIZE 32
 
-#define KEEP_TRANSMITTED_STATS YES
-
-#define KEEP_RECEIVE_STATS YES
-
-#if KEEP_RECEIVE_STATS || KEEP_TRANSMITTED_STATS
 static Stats_ServiceAPI * stats;
-#endif
 
-#if KEEP_RECEIVE_STATS
 static int stat_traffic_received_by_type[P2P_PROTO_MAX_USED];
-#endif
 
-#if KEEP_TRANSMITTED_STATS
+static int stat_pt_traffic_received_by_type[P2P_PROTO_MAX_USED];
+
 static int stat_traffic_transmitted_by_type[P2P_PROTO_MAX_USED];
-#endif
 
 /**
  * Macro to access the slot at time "t" in the history.
@@ -439,7 +431,6 @@
 
 static void updateTrafficSendCounter(unsigned short ptyp,
                                     unsigned short plen) {
-#if KEEP_TRANSMITTED_STATS
   if (ptyp >= P2P_PROTO_MAX_USED)
     return; /* not tracked */
   if (0 == stat_traffic_transmitted_by_type[ptyp]) {
@@ -455,12 +446,10 @@
   }
   stats->change(stat_traffic_transmitted_by_type[ptyp],
                plen);
-#endif
 }
 
 static void updateTrafficReceiveCounter(unsigned short ptyp,
                                        unsigned short plen) {
-#if KEEP_RECEIVE_STATS
   if (ptyp < P2P_PROTO_MAX_USED) {
     if (0 == stat_traffic_received_by_type[ptyp]) {
       char * s;
@@ -476,9 +465,26 @@
     stats->change(stat_traffic_received_by_type[ptyp],
                  plen);
   }
-#endif
 }
 
+static void updatePlaintextTrafficReceiveCounter(unsigned short ptyp,
+                                                unsigned short plen) {
+  if (ptyp < P2P_PROTO_MAX_USED) {
+    if (0 == stat_pt_traffic_received_by_type[ptyp]) {
+      char * s;
+      s = MALLOC(256);
+      SNPRINTF(s,
+              256,
+              _("# bytes received in plaintext of type %d"),
+              ptyp);
+      stat_pt_traffic_received_by_type[ptyp]
+       = stats->create(s);
+      FREE(s);
+    }
+    stats->change(stat_pt_traffic_received_by_type[ptyp],
+                 plen);
+  }
+}
 
 /**
  * A message was received.  Update traffic stats.
@@ -503,6 +509,7 @@
   return OK;
 }
 
+
 /**
  * A message is send.  Update traffic stats.
  *
@@ -526,16 +533,31 @@
   return OK;
 }
 
+/**
+ * A message is send.  Update traffic stats.
+ *
+ * @param header the header of the message
+ * @param receiver the identity of the receiver
+ */
+static int plaintextReceive(const PeerIdentity * receiver,
+                           const MESSAGE_HEADER * header,
+                           TSession * session) {
+  unsigned short port;
 
+  port = ntohs(MAKE_UNALIGNED(header->type));
+  updatePlaintextTrafficReceiveCounter(port,
+                                      ntohs(MAKE_UNALIGNED(header->size)));
+  return OK;
+}
+
+
 /**
  * Initialize the traffic module.
  */
 Traffic_ServiceAPI *
 provide_module_traffic(CoreAPIForApplication * capi) {
   static Traffic_ServiceAPI api;
-#if KEEP_RECEIVE_STATS || KEEP_TRANSMITTED_STATS
   int i;
-#endif
 
   coreAPI = capi;
 #if DEBUG
@@ -548,25 +570,21 @@
                                    &server_port);
 #endif
   api.get = &getTrafficStats;
-#if KEEP_TRANSMITTED_STATS
   for (i=0;i<P2P_PROTO_MAX_USED;i++)
     stat_traffic_transmitted_by_type[i] = 0;
   coreAPI->registerSendNotify(&trafficSend);
-#endif
-#if KEEP_RECEIVE_STATS
   for (i=0;i<P2P_PROTO_MAX_USED;i++) {
     stat_traffic_received_by_type[i] = 0;
     coreAPI->registerHandler(i,
                             &trafficReceive);
+    coreAPI->registerPlaintextHandler(i,
+                                     &plaintextReceive);
   }
-#endif
 
   GE_ASSERT(coreAPI->ectx, counters == NULL);
   lock = MUTEX_CREATE(NO);
-#if KEEP_RECEIVE_STATS || KEEP_TRANSMITTED_STATS
   stats = capi->requestService("stats");
-#endif
- return &api;
+  return &api;
 }
 
 /**
@@ -575,18 +593,15 @@
 void release_module_traffic() {
   unsigned int i;
 
-#if KEEP_RECEIVE_STATS
-  for (i=0;i<P2P_PROTO_MAX_USED;i++)
+  for (i=0;i<P2P_PROTO_MAX_USED;i++) {
     coreAPI->unregisterHandler(i,
                             &trafficReceive);
-#endif
-#if KEEP_TRANSMITTED_STATS
+    coreAPI->unregisterPlaintextHandler(i,
+                                       &plaintextReceive);
+  }
   coreAPI->unregisterSendNotify(&trafficSend);
-#endif
-#if KEEP_RECEIVE_STATS || KEEP_TRANSMITTED_STATS
   coreAPI->releaseService(stats);
   stats = NULL;
-#endif
   for (i=0;i<max_message_type;i++)
     FREENONNULL(counters[i]);
   GROW(counters,

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2007-07-01 09:18:05 UTC (rev 5196)
+++ GNUnet/src/server/connection.c      2007-07-01 10:00:47 UTC (rev 5197)
@@ -2580,16 +2580,19 @@
           &enc);
     return SYSERR;
   }
-  hash2enc(&sender->hashPubKey, &enc);
-  hash(&msg->sequenceNumber, size - sizeof(HashCode512), &hc);
+  if (stats != NULL)
+    stats->change(stat_received, size);
+  hash2enc(&sender->hashPubKey, 
+          &enc);
+  hash(&msg->sequenceNumber, 
+       size - sizeof(HashCode512), 
+       &hc);
   if (equalsHashCode512(&hc,
                        &msg->hash) &&
       (msg->sequenceNumber == 0) &&
       (msg->bandwidth == 0) &&
       (msg->timeStamp == 0) )
     return NO;                  /* plaintext */
-  if (stats != NULL)
-    stats->change(stat_received, size);
 
   MUTEX_LOCK(lock);
   be = lookForHost(sender);
@@ -3476,14 +3479,11 @@
   char *closure;
   unsigned short len;
 
-  if (msg == NULL) {
-    /* little hack for topology,
-       which cannot do this directly
-       due to cyclic dependencies! */
-    if (getBandwidthAssignedTo(receiver, NULL, NULL) != OK)
-      session->tryConnect(receiver);
-    return;
-  }
+  if ( (getBandwidthAssignedTo(receiver, NULL, NULL) != OK) &&
+       (identity->isBlacklistedStrict(receiver) == NO) )
+    session->tryConnect(receiver);
+  if (msg == NULL) 
+    return; 
   len = ntohs(msg->size);
   if (len == 0) {
     GE_LOG(ectx,

Modified: GNUnet/src/server/handler.c
===================================================================
--- GNUnet/src/server/handler.c 2007-07-01 09:18:05 UTC (rev 5196)
+++ GNUnet/src/server/handler.c 2007-07-01 10:00:47 UTC (rev 5197)
@@ -40,7 +40,7 @@
  * How many incoming packages do we have in the buffer
  * (max.). Must be >= THREAD_COUNT to make sense.
  */
-#define QUEUE_LENGTH 16
+#define QUEUE_LENGTH 64
 
 /**
  * How many threads do we start?
@@ -107,6 +107,14 @@
 
 static struct GE_Context * ectx;
 
+#define MEASURE_TIME YES
+
+#if MEASURE_TIME
+static cron_t time_by_type[P2P_PROTO_MAX_USED];
+static unsigned int count_by_type[P2P_PROTO_MAX_USED];
+#endif
+
+
 /**
  * Register a method as a handler for specific message types.  Note
  * that it IS possible to register multiple handlers for the same
@@ -356,6 +364,9 @@
   MESSAGE_HEADER * copy;
   int last;
   EncName enc;
+#if MEASURE_TIME
+  cron_t now;
+#endif
 
   pos = 0;
   copy = NULL;
@@ -428,6 +439,9 @@
               ptyp);
        continue; /* no handler registered, go to next part */
       }
+#if MEASURE_TIME
+      now = get_time();
+#endif
       last = 0;
       while (NULL != (callback = handlers[ptyp][last])) {
        if (SYSERR == callback(sender,
@@ -444,6 +458,12 @@
        }
        last++;
       }
+#if MEASURE_TIME
+      if (ptyp < P2P_PROTO_MAX_USED) {
+       time_by_type[ptyp] += get_time() - now;
+       count_by_type[ptyp]++;
+      }
+#endif
     } else { /* isEncrypted == NO */
       PlaintextMessagePartHandler callback;
 
@@ -455,6 +475,9 @@
               ptyp);
        continue; /* no handler registered, go to next part */
       }
+#if MEASURE_TIME
+      now = get_time();
+#endif
       last = 0;
       while (NULL != (callback = plaintextHandlers[ptyp][last])) {
        if (SYSERR == callback(sender,
@@ -463,8 +486,8 @@
 #if DEBUG_HANDLER
          GE_LOG(ectx,
                 GE_DEBUG | GE_USER | GE_BULK,
-             "Handler aborted message processing after receiving message of 
type '%d'.\n",
-             ptyp);
+                "Handler aborted message processing after receiving message of 
type '%d'.\n",
+                ptyp);
 #endif
          FREENONNULL(copy);
          copy = NULL;
@@ -472,6 +495,13 @@
        }
        last++;
       }
+#if MEASURE_TIME
+      if (ptyp < P2P_PROTO_MAX_USED) {
+       time_by_type[ptyp] += get_time() - now;
+       count_by_type[ptyp]++;
+      }
+#endif
+
     } /* if plaintext */
   } /* while loop */
   FREENONNULL(copy);
@@ -503,13 +533,13 @@
   ret = checkHeader(sender,
                    (P2P_PACKET_HEADER*) msg,
                    size);
-  if (ret == SYSERR)
-    return; /* message malformed */
+  if (ret == SYSERR) 
+    return; /* message malformed */  
   if ( (ret == YES) &&
        (tsession != NULL) &&
-       (sender != NULL) )
-    if (OK == transport->associate(tsession))
-      considerTakeover(sender, tsession);
+       (sender != NULL) &&
+       (OK == transport->associate(tsession)) )
+    considerTakeover(sender, tsession);
   injectMessage(sender,
                &msg[sizeof(P2P_PACKET_HEADER)],
                size - sizeof(P2P_PACKET_HEADER),
@@ -562,9 +592,13 @@
 void core_receive(P2P_PACKET * mp) {
   if ( (threads_running == NO) ||
        (mainShutdownSignal != NULL) ||
-       (SYSERR == SEMAPHORE_DOWN(bufferQueueWrite_, NO)) ) {
+       (SYSERR == SEMAPHORE_DOWN(bufferQueueWrite_, YES)) ) {
     /* discard message, buffer is full or
        we're shut down! */
+    GE_LOG(ectx,
+          GE_DEBUG | GE_DEVELOPER | GE_REQUEST,
+          "Discarding message of size %u -- buffer full!\n",
+          mp->size);
     FREE(mp->msg);
     FREE(mp);
     return;
@@ -725,6 +759,18 @@
   transport = NULL;
   releaseService(identity);
   identity = NULL;
+#if MEASURE_TIME
+  for (i=0;i<P2P_PROTO_MAX_USED;i++) {
+    if (count_by_type[i] == 0)
+      continue;
+    fprintf(stderr,
+           "%10u msgs of type %2u took %16llu ms (%llu on average)\n",
+           count_by_type[i],
+           i,
+           time_by_type[i],
+           time_by_type[i] / count_by_type[i]);
+  }         
+#endif
 }
 
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]