gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3157 - GNUnet/src/server


From: grothoff
Subject: [GNUnet-SVN] r3157 - GNUnet/src/server
Date: Sat, 29 Jul 2006 02:58:32 -0700 (PDT)

Author: grothoff
Date: 2006-07-29 02:58:30 -0700 (Sat, 29 Jul 2006)
New Revision: 3157

Modified:
   GNUnet/src/server/connection.c
   GNUnet/src/server/connection.h
Log:
working on connection

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2006-07-29 09:44:21 UTC (rev 3156)
+++ GNUnet/src/server/connection.c      2006-07-29 09:58:30 UTC (rev 3157)
@@ -245,7 +245,7 @@
  * well-behaved, non-malicious nodes that like each other).
  */
 typedef struct {
-  P2P_MESSAGE_HEADER header;
+  MESSAGE_HEADER header;
   PeerIdentity sender;
 } P2P_hangup_MESSAGE;
 
@@ -470,7 +470,7 @@
 /**
  * Lock for the connection module.
  */
-static Mutex lock;
+static struct MUTEX * lock;
 
 /**
  * What is the available downstream bandwidth (in bytes
@@ -483,6 +483,13 @@
  */
 static MessagePartHandler *rsns;
 
+static struct GE_Context * ectx;
+
+static struct GC_Configuration * cfg;
+
+static struct LoadMonitor * load_monitor;
+
+
 /**
  * Size of rsns.
  */
@@ -552,12 +559,12 @@
   be->idealized_limit = MIN_BPM_PER_PEER;
   be->max_transmitted_limit = MIN_BPM_PER_PEER;
   be->lastSendAttempt = 0;      /* never */
-  load = getCPULoad();
+  load = os_cpu_get_load(ectx, cfg);
   if (load == -1)
     load = 50; /* failed to determine load, assume 50% */
   be->MAX_SEND_FREQUENCY = 50 * cronMILLIS * load;
   be->inSendBuffer = NO;
-  cronTime(&be->last_bps_update); /* now */
+  be->last_bps_update = get_time(); /* now */
   return be;
 }
 
@@ -569,7 +576,7 @@
   cron_t now;
   cron_t delta;
 
-  cronTime(&now);
+  now = get_time();
   if(now <= be->last_bps_update)
     return;
   delta = now - be->last_bps_update;
@@ -665,13 +672,13 @@
 #define VARR(i,j) v[(i)+(j)*(count+1)]
 
   if(available < 0) {
-    BREAK();
+    GE_BREAK(ectx, 0);
     return -1;
   }
   ENTRY();
   entries = be->sendBuffer;
   count = be->sendBufferSize;
-  cronTime(&startTime);
+  startTime = get_time();
 
   /* fast test: schedule everything? */
   max = 0;
@@ -697,7 +704,7 @@
     if(entries[i]->len > 0)
       max = gcd(max, entries[i]->len);
   }
-  GNUNET_ASSERT(max != 0);
+  GE_ASSERT(ectx, max != 0);
   available = available / max;
   for(i = 0; i < count; i++)
     efflen[i] = entries[i]->len / max;
@@ -763,10 +770,10 @@
       }
     }
   }
-  GNUNET_ASSERT(j == 0);
+  GE_ASSERT(ectx, j == 0);
   FREE(v);
   FREE(efflen);
-  cronTime(&endTime);
+  endTime = get_time();
 
   return max;
 }
@@ -783,7 +790,7 @@
   int load;
   unsigned int delta;
 
-  load = getNetworkLoadUp();    /* how much free bandwidth do we have? */
+  load = os_network_monitor_get_load(load_monitor, Upload);  /* how much free 
bandwidth do we have? */
   if(load >= 150) {
     return SYSERR;              /* => always drop */
   }
@@ -850,7 +857,7 @@
   if(be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
     be->MAX_SEND_FREQUENCY = MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
 
-  if(be->lastSendAttempt + be->MAX_SEND_FREQUENCY > cronTime(NULL)) {
+  if(be->lastSendAttempt + be->MAX_SEND_FREQUENCY > get_time()) {
 #if DEBUG_CONNECTION
     LOG(LOG_DEBUG, "Send frequency too high (CPU load), send deferred.\n");
 #endif
@@ -940,7 +947,7 @@
     }
   } else { /* if (be->session.mtu == 0) */
     /* solve knapsack problem, compute accumulated priority */
-    approxProb = getCPULoad();
+    approxProb = os_cpu_get_load(ectx, cfg);
     if (approxProb < 0)
       approxProb = 50; /* failed to determine load, assume 50% */
     if (approxProb > 50) {
@@ -954,7 +961,7 @@
                                           be->session.mtu -
                                           sizeof(P2P_PACKET_HEADER));
 #if DEBUG_COLLECT_PRIO == YES
-        FPRINTF(prioFile, "%llu 0 %d\n", cronTime(NULL), priority);
+        FPRINTF(prioFile, "%llu 0 %d\n", get_time(), priority);
 #endif
       }
       else {
@@ -962,7 +969,7 @@
                                     be->session.mtu -
                                     sizeof(P2P_PACKET_HEADER));
 #if DEBUG_COLLECT_PRIO == YES
-        FPRINTF(prioFile, "%llu 1 %d\n", cronTime(NULL), priority);
+        FPRINTF(prioFile, "%llu 1 %d\n", get_time(), priority);
 #endif
       }
     }
@@ -971,7 +978,7 @@
                                   be->session.mtu -
                                   sizeof(P2P_PACKET_HEADER));
 #if DEBUG_COLLECT_PRIO == YES
-      FPRINTF(prioFile, "%llu 2 %d\n", cronTime(NULL), priority);
+      FPRINTF(prioFile, "%llu 2 %d\n", get_time(), priority);
 #endif
     }
     j = 0;
@@ -1027,13 +1034,13 @@
   int j;
 
   /* if it's more than one connection "lifetime" old, always kill it! */
-  expired =
-    cronTime(&be->lastSendAttempt) - SECONDS_PINGATTEMPT * cronSECONDS;
+  be->lastSendAttempt = get_time();
+  expired = be->lastSendAttempt - SECONDS_PINGATTEMPT * cronSECONDS;
 #if DEBUG_CONNECTION
   LOG(LOG_DEBUG, "policy prevents sending message\n");
 #endif
 
-  load = getCPULoad();
+  load = os_cpu_get_load(ectx, cfg);
   if (load < 0)
     load = 50; /* failed to determine load, assume 50%* */
   /* cleanup queue */
@@ -1060,7 +1067,7 @@
 #if DEBUG_CONNECTION
       LOG(LOG_DEBUG,
           "expiring message, expired %ds ago, queue size is %llu (bandwidth 
stressed)\n",
-          (int) ((cronTime(NULL) - entry->transmissionTime) / cronSECONDS),
+          (int) ((get_time() - entry->transmissionTime) / cronSECONDS),
           usedBytes);
 #endif
       if (stats != NULL) {
@@ -1124,10 +1131,10 @@
       }
 #if 0
       {
-        P2P_MESSAGE_HEADER *hdr;
+        MESSAGE_HEADER *hdr;
         EncName enc;
 
-        hdr = (P2P_MESSAGE_HEADER *) entry->closure;
+        hdr = (MESSAGE_HEADER *) entry->closure;
         IFLOG(LOG_DEBUG, hash2enc(&be->session.sender.hashPubKey, &enc));
         LOG(LOG_DEBUG,
             "Core selected message of type %u and size %u for sending to peer 
`%s'.\n",
@@ -1189,9 +1196,9 @@
 
   for(i = 0; i < be->sendBufferSize; i++) {
     entry = be->sendBuffer[i];
-    GNUNET_ASSERT(entry != NULL);
+    GE_ASSERT(ectx, entry != NULL);
     if(entry->knapsackSolution == YES) {
-      GNUNET_ASSERT(entry->callback == NULL);
+      GE_ASSERT(ectx, entry->callback == NULL);
       FREENONNULL(entry->closure);
       FREE(entry);
       be->sendBuffer[i] = NULL;
@@ -1296,7 +1303,7 @@
   ENTRY();
   /* fast ways out */
   if(be == NULL) {
-    BREAK();
+    GE_BREAK(ectx, 0);
     return;
   }
   if((be->status != STAT_UP) ||
@@ -1325,7 +1332,7 @@
     be->inSendBuffer = NO;
     return;                     /* deferr further */
   }
-  GNUNET_ASSERT(totalMessageSize > sizeof(P2P_PACKET_HEADER));
+  GE_ASSERT(ectx, totalMessageSize > sizeof(P2P_PACKET_HEADER));
 
   /* check if we (sender) have enough bandwidth available
      if so, trigger callbacks on selected entries; if either
@@ -1358,8 +1365,8 @@
 #if DEBUG_CONNECTION
       LOG(LOG_DEBUG, "Queuing msg %u with length %u\n", perm[i], entry->len);
 #endif
-      GNUNET_ASSERT(entry->callback == NULL);
-      GNUNET_ASSERT(p + entry->len <= totalMessageSize);
+      GE_ASSERT(ectx, entry->callback == NULL);
+      GE_ASSERT(ectx, p + entry->len <= totalMessageSize);
       memcpy(&plaintextMsg[p], entry->closure, entry->len);
       p += entry->len;
     }
@@ -1379,17 +1386,17 @@
   }
 
   /* finally padd with noise */
-  if ( (p + sizeof(P2P_MESSAGE_HEADER) <= totalMessageSize) &&
+  if ( (p + sizeof(MESSAGE_HEADER) <= totalMessageSize) &&
        (disable_random_padding == NO) ) {
-    P2P_MESSAGE_HEADER part;
+    MESSAGE_HEADER part;
     unsigned short noiseLen = totalMessageSize - p;
 
     part.size = htons(noiseLen);
     part.type = htons(P2P_PROTO_noise);
     memcpy(&plaintextMsg[p],
            &part,
-          sizeof(P2P_MESSAGE_HEADER));
-    for (i = p + sizeof(P2P_MESSAGE_HEADER); i < totalMessageSize; i++)
+          sizeof(MESSAGE_HEADER));
+    for (i = p + sizeof(MESSAGE_HEADER); i < totalMessageSize; i++)
       plaintextMsg[i] = (char) rand();
     p = totalMessageSize;
     if (stats != NULL)
@@ -1412,7 +1419,7 @@
 #endif
   if(stats != NULL)
     stats->change(stat_encrypted, p - sizeof(HashCode512));
-  GNUNET_ASSERT(be->session.tsession != NULL);
+  GE_ASSERT(ectx, be->session.tsession != NULL);
   ret = transport->send(be->session.tsession, encryptedMsg, p);
   if((ret == NO) && (priority >= EXTREME_PRIORITY)) {
     ret = transport->sendReliable(be->session.tsession, encryptedMsg, p);
@@ -1435,10 +1442,10 @@
     if (rsnSize > 0) {
       j = sizeof(P2P_PACKET_HEADER);
       while (j < p) {
-        P2P_MESSAGE_HEADER * part = (P2P_MESSAGE_HEADER *) &plaintextMsg[j];
+        MESSAGE_HEADER * part = (MESSAGE_HEADER *) &plaintextMsg[j];
         unsigned short plen = htons(part->size);
-        if (plen < sizeof(P2P_MESSAGE_HEADER)) {
-          BREAK();
+        if (plen < sizeof(MESSAGE_HEADER)) {
+          GE_BREAK(ectx, 0);
           break;
         }
         for (rsi = 0; rsi < rsnSize; rsi++)
@@ -1478,7 +1485,7 @@
   ENTRY();
   if ( (se == NULL) || 
        (se->len == 0) ) {
-    BREAK();
+    GE_BREAK(ectx, 0);
     FREENONNULL(se);
     return;
   }
@@ -1544,7 +1551,7 @@
   }
   /* grow send buffer, insertion sort! */
   ne = MALLOC((be->sendBufferSize + 1) * sizeof(SendEntry *));
-  GNUNET_ASSERT(se->len != 0);
+  GE_ASSERT(ectx, se->len != 0);
   apri = (float) se->pri / (float) se->len;
   i = 0;
   while((i < be->sendBufferSize) &&
@@ -1732,7 +1739,7 @@
     se->len = sizeof(P2P_hangup_MESSAGE);
     se->flags = SE_FLAG_PLACE_TAIL;
     se->pri = EXTREME_PRIORITY;
-    se->transmissionTime = cronTime(NULL);  /* now */
+    se->transmissionTime = get_time();  /* now */
     se->callback = &copyCallback;
     se->closure = MALLOC(sizeof(P2P_hangup_MESSAGE));
     se->knapsackSolution = NO;
@@ -1820,21 +1827,21 @@
   int earlyRun;
   int load;
 
-  MUTEX_LOCK(&lock);
-  cronTime(&now);
+  MUTEX_LOCK(lock);
+  now = get_time();
 
   /* if this is the first round, don't bother... */
   if(lastRoundStart == 0) {
     /* no allocation the first time this function is called! */
     lastRoundStart = now;
     forAllConnectedHosts(&resetRecentlyReceived, NULL);
-    MUTEX_UNLOCK(&lock);
+    MUTEX_UNLOCK(lock);
     return;
   }
 
   activePeerCount = forAllConnectedHosts(NULL, NULL);
   if(activePeerCount == 0) {
-    MUTEX_UNLOCK(&lock);
+    MUTEX_UNLOCK(lock);
     return;                     /* nothing to be done here. */
   }
 
@@ -1849,7 +1856,7 @@
   if(timeDifference < MIN_SAMPLE_TIME) {
     earlyRun = 1;
     if(activePeerCount > CONNECTION_MAX_HOSTS_ / 16) {
-      MUTEX_UNLOCK(&lock);
+      MUTEX_UNLOCK(lock);
       return;                   /* don't update too frequently, we need at 
least some
                                    semi-representative sampling! */
     }
@@ -1890,7 +1897,8 @@
   if(minCon > activePeerCount)
     minCon = activePeerCount;
   schedulableBandwidth = max_bpm - minCon * MIN_BPM_PER_PEER;
-  load = getNetworkLoadDown();
+  load = os_network_monitor_get_load(load_monitor,
+                                    Download);
   if(load > 100) {
     /* take counter measures! */
     schedulableBandwidth = schedulableBandwidth * 100 / load;
@@ -1906,7 +1914,7 @@
      algorithm, we'd need to compute the new limits separately
      and then merge the values; but for now, let's just go
      hardcore and adjust all values rapidly */
-  GNUNET_ASSERT(timeDifference != 0);
+  GE_ASSERT(ectx, timeDifference != 0);
   for(u = 0; u < activePeerCount; u++) {
     adjustedRR[u] =
       entries[u]->recently_received * cronMINUTES / timeDifference / 2;
@@ -2122,7 +2130,7 @@
     }
   }
 
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
 }
 
 /* ******** end of inbound bandwidth scheduling ************* */
@@ -2141,8 +2149,8 @@
   int i;
 
   scheduleInboundTraffic();
-  cronTime(&now);
-  MUTEX_LOCK(&lock);
+  now = get_time();
+  MUTEX_LOCK(lock);
   for(i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
     root = CONNECTION_buffer_[i];
     prev = NULL;
@@ -2175,8 +2183,8 @@
         if ( (root->available_send_window >= 60000) &&
             (root->sendBufferSize < 4) &&
             (scl_nextHead != NULL) &&
-            (getNetworkLoadUp() < 25) && 
-            (getCPULoad() < 50) ) {
+            (os_network_monitor_get_load(load_monitor, Upload) < 25) && 
+            (os_cpu_get_load(ectx, cfg) < 50) ) {
           /* create some traffic by force! */
           char * msgBuf;
           unsigned int mSize;
@@ -2191,7 +2199,7 @@
                                    60000);
               if(mSize > 0)
                 unicast(&root->session.sender,
-                        (P2P_MESSAGE_HEADER *) msgBuf, 
+                        (MESSAGE_HEADER *) msgBuf, 
                        0, 
                        5 * cronMINUTES);
             }
@@ -2219,7 +2227,7 @@
     }                           /* end of while */
   }                             /* for all buckets */
 
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
 }
 
 /**
@@ -2246,8 +2254,8 @@
   EncName enc;
 
   ENTRY();
-  GNUNET_ASSERT(msg != NULL);
-  GNUNET_ASSERT(sender != NULL);
+  GE_ASSERT(ectx, msg != NULL);
+  GE_ASSERT(ectx, sender != NULL);
   hash2enc(&sender->hashPubKey, &enc);
   if(size < sizeof(P2P_PACKET_HEADER)) {
     LOG(LOG_WARNING,
@@ -2268,7 +2276,7 @@
 #if DEBUG_CONNECTION
   LOG(LOG_DEBUG, "Decrypting message from host `%s'\n", &enc);
 #endif
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(sender);
   if((be == NULL) ||
      (be->status == STAT_DOWN) || (be->status == STAT_SETKEY_SENT)) {
@@ -2279,7 +2287,7 @@
        getting bogus messages until the other one times out. */
     if((be == NULL) || (be->status == STAT_DOWN))
       addHost(sender, YES);
-    MUTEX_UNLOCK(&lock);
+    MUTEX_UNLOCK(lock);
     return SYSERR;              /* could not decrypt */
   }
   tmp = MALLOC(size - sizeof(HashCode512));
@@ -2299,7 +2307,7 @@
              crc32N(&msg->sequenceNumber, size - sizeof(HashCode512)));
 #endif
     addHost(sender, YES);
-    MUTEX_UNLOCK(&lock);
+    MUTEX_UNLOCK(lock);
     FREE(tmp);
     return SYSERR;
   }
@@ -2325,7 +2333,7 @@
           _("Invalid sequence number"
             " %u <= %u, dropping message.\n"),
           sequenceNumber, be->lastSequenceNumberReceived);
-      MUTEX_UNLOCK(&lock);
+      MUTEX_UNLOCK(lock);
       return SYSERR;
     }
   }
@@ -2338,7 +2346,7 @@
   stamp = ntohl(msg->timeStamp);
   if(stamp + 1 * cronDAYS < TIME(NULL)) {
     LOG(LOG_INFO, _("Message received more than one day old. Dropped.\n"));
-    MUTEX_UNLOCK(&lock);
+    MUTEX_UNLOCK(lock);
     return SYSERR;
   }
 
@@ -2348,10 +2356,10 @@
 #endif
   if(be->available_send_window >= be->max_bpm) {
     be->available_send_window = be->max_bpm;
-    cronTime(&be->last_bps_update);
+    be->last_bps_update = get_time();
   }
   be->recently_received += size;
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return YES;
 }
 
@@ -2364,25 +2372,27 @@
  * @return OK on success, SYSERR on error
  */
 static int handleHANGUP(const PeerIdentity * sender,
-                        const P2P_MESSAGE_HEADER * msg) {
+                        const MESSAGE_HEADER * msg) {
   BufferEntry *be;
   EncName enc;
 
   ENTRY();
   if(ntohs(msg->size) != sizeof(P2P_hangup_MESSAGE))
     return SYSERR;
-  if(!hostIdentityEquals(sender, &((P2P_hangup_MESSAGE *) msg)->sender))
+  if(0 != memcmp(sender,
+                &((P2P_hangup_MESSAGE *) msg)->sender,
+                sizeof(PeerIdentity)))
     return SYSERR;
   IFLOG(LOG_INFO, hash2enc(&sender->hashPubKey, &enc));
   LOG(LOG_INFO, "received HANGUP from `%s'\n", &enc);
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(sender);
   if(be == NULL) {
-    MUTEX_UNLOCK(&lock);
+    MUTEX_UNLOCK(lock);
     return SYSERR;
   }
   shutdownConnection(be);
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return OK;
 }
 
@@ -2401,12 +2411,12 @@
                       const PeerIdentity * peer, TIME_T age, int forSending) {
   BufferEntry *be;
 
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(peer);
   if(be == NULL)
     be = addHost(peer, NO);
   if(be != NULL) {
-    cronTime(&be->isAlive);
+    be->isAlive = get_time();
     if(forSending == YES) {
       be->skey_local = *key;
       be->skey_local_created = age;
@@ -2424,7 +2434,7 @@
       }
     }
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
 }
 
 /**
@@ -2435,10 +2445,10 @@
 void confirmSessionUp(const PeerIdentity * peer) {
   BufferEntry *be;
 
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(peer);
   if(be != NULL) {
-    cronTime(&be->isAlive);
+    be->isAlive = get_time();
     identity->whitelistHost(peer);
     if(((be->status & STAT_SETKEY_SENT) > 0) &&
        ((be->status & STAT_SETKEY_RECEIVED) > 0) &&
@@ -2448,7 +2458,7 @@
       be->lastSequenceNumberSend = 1;
     }
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
 }
 
 
@@ -2469,7 +2479,7 @@
   BufferEntry *be;
   int ret;
   ret = 0;
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   if((slot >= 0) && (slot < CONNECTION_MAX_HOSTS_)) {
     be = CONNECTION_buffer_[slot];
     while(be != NULL) {
@@ -2478,7 +2488,7 @@
       be = be->overflowChain;
     }
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return ret;
 }
 
@@ -2493,7 +2503,7 @@
   BufferEntry *be;
 
   ret = 0;
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(peer);
   if((be != NULL) && (be->status == STAT_UP)) {
     *time = be->isAlive;
@@ -2503,7 +2513,7 @@
     *time = 0;
     ret = SYSERR;
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return ret;
 }
 
@@ -2525,7 +2535,7 @@
   int ret;
   BufferEntry *be;
   ret = SYSERR;
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(peer);
   if(be != NULL) {
     if (forSending == YES) {
@@ -2542,7 +2552,7 @@
       }
     }
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return ret;
 }
 
@@ -2568,7 +2578,7 @@
   ENTRY();
   if(tsession == NULL)
     return;
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(sender);
   if(be != NULL) {
     if(be->status != STAT_DOWN) {
@@ -2597,7 +2607,7 @@
       } /* end if cheaper AND possible */
     } /* end if connected */
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   transport->disconnect(tsession);
 }
 
@@ -2611,7 +2621,7 @@
   unsigned long long new_max_bpm;
   unsigned int i;
 
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   /* max_bpm may change... */
   new_max_bpm = 60 * getConfigurationInt("LOAD", "MAXNETDOWNBPSTOTAL");
   if(new_max_bpm == 0)
@@ -2668,19 +2678,24 @@
   }
   disable_random_padding = testConfigurationString("GNUNETD-EXPERIMENTAL",
                                                    "PADDING", "NO");
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
 }
 
 /**
  * Initialize this module.
  */
-void initConnection() {
-  GNUNET_ASSERT(P2P_MESSAGE_OVERHEAD == sizeof(P2P_PACKET_HEADER));
-  GNUNET_ASSERT(sizeof(P2P_hangup_MESSAGE) == 68);
+void initConnection(struct GE_Context * e,
+                   struct GC_Configuration * c,
+                   struct LoadMonitor * m) {
+  ectx = e;
+  cfg = c;
+  load_monitor = m;
+  GE_ASSERT(ectx, P2P_MESSAGE_OVERHEAD == sizeof(P2P_PACKET_HEADER));
+  GE_ASSERT(ectx, sizeof(P2P_hangup_MESSAGE) == 68);
   ENTRY();
   scl_nextHead = NULL;
   scl_nextTail = NULL;
-  MUTEX_CREATE_RECURSIVE(&lock);
+  lock = MUTEX_CREATE(YES);
   registerConfigurationUpdateCallback(&connectionConfigChangeCallback);
   CONNECTION_MAX_HOSTS_ = 0;
   connectionConfigChangeCallback();
@@ -2691,15 +2706,15 @@
 #endif
 
   transport = requestService("transport");
-  GNUNET_ASSERT(transport != NULL);
+  GE_ASSERT(ectx, transport != NULL);
   identity = requestService("identity");
-  GNUNET_ASSERT(identity != NULL);
+  GE_ASSERT(ectx, identity != NULL);
   session = requestService("session");
-  GNUNET_ASSERT(session != NULL);
+  GE_ASSERT(ectx, session != NULL);
   fragmentation = requestService("fragmentation");
-  GNUNET_ASSERT(fragmentation != NULL);
+  GE_ASSERT(ectx, fragmentation != NULL);
   topology = requestService("topology");
-  GNUNET_ASSERT(topology != NULL);
+  GE_ASSERT(ectx, topology != NULL);
   stats = requestService("stats");
   if(stats != NULL) {
     stat_messagesDropped
@@ -2756,7 +2771,7 @@
       FREE(prev);
     }
   }
-  MUTEX_DESTROY(&lock);
+  MUTEX_DESTROY(lock);
   FREENONNULL(CONNECTION_buffer_);
   CONNECTION_buffer_ = NULL;
   CONNECTION_MAX_HOSTS_ = 0;
@@ -2781,6 +2796,9 @@
 #if DEBUG_COLLECT_PRIO == YES
   fclose(prioFile);
 #endif
+  ectx = NULL;
+  cfg = NULL;
+  load_monitor = NULL;
 }
 
 
@@ -2798,9 +2816,9 @@
 
   wrap.method = method;
   wrap.arg = arg;
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   ret = forAllConnectedHosts(&fENHCallback, &wrap);
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return ret;
 }
 
@@ -2815,18 +2833,17 @@
   EncName skey_remote;
   unsigned int ttype;
 
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   ENTRY();
   for(i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
     tmp = CONNECTION_buffer_[i];
     while(tmp != NULL) {
       if(tmp->status != STAT_DOWN) {
-        IFLOG(LOG_MESSAGE,
-              hash2enc(&tmp->session.sender.hashPubKey,
-                       &hostName);
-              hash2enc((HashCode512 *) & tmp->skey_local,
-                       &skey_local);
-              hash2enc((HashCode512 *) & tmp->skey_remote, &skey_remote));
+       hash2enc(&tmp->session.sender.hashPubKey,
+                &hostName);
+       hash2enc((HashCode512 *) & tmp->skey_local,
+                &skey_local);
+       hash2enc((HashCode512 *) & tmp->skey_remote, &skey_remote);
         hostName.encoding[4] = '\0';
         skey_local.encoding[4] = '\0';
         skey_remote.encoding[4] = '\0';
@@ -2839,7 +2856,7 @@
             i,
             tmp->status,
             ttype,
-            (int) ((cronTime(NULL) - tmp->isAlive) / cronSECONDS),
+            (int) ((get_time() - tmp->isAlive) / cronSECONDS),
             SECONDS_INACTIVE_DROP,
             tmp->recently_received,
             tmp->idealized_limit,
@@ -2851,7 +2868,7 @@
       tmp = tmp->overflowChain;
     }
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
 }
 
 /**
@@ -2883,7 +2900,7 @@
   scl->minimumPadding = minimumPadding;
   scl->callback = callback;
   scl->next = NULL;
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   if(scl_nextTail == NULL) {
     scl_nextHead = scl;
     scl_nextTail = scl;
@@ -2892,7 +2909,7 @@
     scl_nextTail->next = scl;
     scl_nextTail = scl;
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return OK;
 }
 
@@ -2915,7 +2932,7 @@
   SendCallbackList *prev;
 
   prev = NULL;
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   pos = scl_nextHead;
   while(pos != NULL) {
     if((pos->callback == callback) && (pos->minimumPadding == minimumPadding)) 
{
@@ -2926,13 +2943,13 @@
       if(scl_nextTail == pos)
         scl_nextTail = prev;
       FREE(pos);
-      MUTEX_UNLOCK(&lock);
+      MUTEX_UNLOCK(lock);
       return OK;
     }
     prev = pos;
     pos = pos->next;
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return SYSERR;
 }
 
@@ -2945,7 +2962,7 @@
  * from the GNUnet core.
  *
  * @param session the transport session
- * @param msg the message to transmit, should contain P2P_MESSAGE_HEADERs
+ * @param msg the message to transmit, should contain MESSAGE_HEADERs
  * @return OK on success, SYSERR on failure, NO on temporary failure
  */
 int sendPlaintext(TSession * tsession, const char *msg, unsigned int size) {
@@ -2953,11 +2970,11 @@
   int ret;
   P2P_PACKET_HEADER *hdr;
 
-  GNUNET_ASSERT(tsession != NULL);
+  GE_ASSERT(ectx, tsession != NULL);
   if((transport->getMTU(tsession->ttype) > 0) &&
      (transport->getMTU(tsession->ttype) <
       size + sizeof(P2P_PACKET_HEADER))) {
-    BREAK();
+    GE_BREAK(ectx, 0);
     return SYSERR;
   }
   buf = MALLOC(size + sizeof(P2P_PACKET_HEADER));
@@ -2999,7 +3016,7 @@
       __FUNCTION__, &enc, len);
 #endif
   ENTRY();
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = addHost(hostId, YES);
   if((be != NULL) && (be->status != STAT_DOWN)) {
     SendEntry *entry;
@@ -3008,7 +3025,7 @@
     entry->len = len;
     entry->flags = SE_FLAG_NONE;
     entry->pri = importance;
-    entry->transmissionTime = cronTime(NULL) + maxdelay;
+    entry->transmissionTime = get_time() + maxdelay;
     entry->callback = callback;
     entry->closure = closure;
     entry->knapsackSolution = NO;
@@ -3017,7 +3034,7 @@
   else {
     FREENONNULL(closure);
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
 }
 
 /**
@@ -3030,7 +3047,7 @@
  * @param maxdelay how long can the message be delayed?
  */
 void unicast(const PeerIdentity * receiver,
-             const P2P_MESSAGE_HEADER * msg,
+             const MESSAGE_HEADER * msg,
              unsigned int importance, unsigned int maxdelay) {
   char *closure;
   unsigned short len;
@@ -3061,9 +3078,9 @@
 int isConnected(const PeerIdentity * hi) {
   BufferEntry *be;
 
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(hi);
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   if(be == NULL) {
     return NO;
   }
@@ -3081,7 +3098,7 @@
 unsigned int computeIndex(const PeerIdentity * hostId) {
   unsigned int res = (((unsigned int) hostId->hashPubKey.bits[0]) &
                       ((unsigned int) (CONNECTION_MAX_HOSTS_ - 1)));
-  GNUNET_ASSERT(res < CONNECTION_MAX_HOSTS_);
+  GE_ASSERT(ectx, res < CONNECTION_MAX_HOSTS_);
   return res;
 }
 
@@ -3090,8 +3107,8 @@
  *
  * @return the lock
  */
-Mutex *getConnectionModuleLock() {
-  return &lock;
+struct MUTEX * getConnectionModuleLock() {
+  return lock;
 }
 
 unsigned int getBandwidthAssignedTo(const PeerIdentity * node) {
@@ -3099,7 +3116,7 @@
   unsigned int ret;
 
   ENTRY();
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(node);
   if((be != NULL) && (be->status == STAT_UP)) {
     ret = be->idealized_limit;
@@ -3109,7 +3126,7 @@
   else {
     ret = 0;
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return ret;
 }
 
@@ -3122,11 +3139,11 @@
   BufferEntry *be;
 
   ENTRY();
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(node);
   if(be != NULL)
     be->current_connection_value += preference;
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
 }
 
 /**
@@ -3140,7 +3157,7 @@
   EncName enc;
 
   ENTRY();
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   be = lookForHost(node);
   if(be != NULL) {
     IFLOG(LOG_DEBUG, hash2enc(&node->hashPubKey, &enc));
@@ -3148,7 +3165,7 @@
         "Closing connection to `%s' as requested by application.\n", &enc);
     shutdownConnection(be);
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
 }
 
 /**
@@ -3162,10 +3179,10 @@
 int registerSendNotify(MessagePartHandler callback) {
   if(callback == NULL)
     return SYSERR;
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   GROW(rsns, rsnSize, rsnSize + 1);
   rsns[rsnSize - 1] = callback;
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return OK;
 }
 
@@ -3179,16 +3196,16 @@
  */
 int unregisterSendNotify(MessagePartHandler callback) {
   int i;
-  MUTEX_LOCK(&lock);
+  MUTEX_LOCK(lock);
   for(i = 0; i < rsnSize; i++) {
     if(rsns[i] == callback) {
       rsns[i] = rsns[rsnSize - 1];
       GROW(rsns, rsnSize, rsnSize - 1);
-      MUTEX_UNLOCK(&lock);
+      MUTEX_UNLOCK(lock);
       return OK;
     }
   }
-  MUTEX_UNLOCK(&lock);
+  MUTEX_UNLOCK(lock);
   return SYSERR;
 }
 

Modified: GNUnet/src/server/connection.h
===================================================================
--- GNUnet/src/server/connection.h      2006-07-29 09:44:21 UTC (rev 3156)
+++ GNUnet/src/server/connection.h      2006-07-29 09:58:30 UTC (rev 3157)
@@ -56,17 +56,19 @@
 /**
  * Initialize this module.
  */
-void initConnection();
+void initConnection(struct GE_Context * ectx,
+                   struct GC_Configuration * cfg,
+                   struct LoadMonitor * mon);
 
 /**
  * Shutdown the connection module.
  */
-void doneConnection();
+void doneConnection(void);
 
 /**
  * For debugging.
  */
-void printConnectionBuffer();
+void printConnectionBuffer(void);
 
 /**
  * Check the sequence number and timestamp.  Decrypts the
@@ -197,7 +199,7 @@
 /**
  * Return a pointer to the lock of the connection module.
  */
-struct MUTEX * getConnectionModuleLock();
+struct MUTEX * getConnectionModuleLock(void);
 
 
 /* ******************** traffic management ********** */





reply via email to

[Prev in Thread] Current Thread [Next in Thread]