gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r1842 - in GNUnet/src/applications: fs/ecrs fs/lib fs/modul


From: grothoff
Subject: [GNUnet-SVN] r1842 - in GNUnet/src/applications: fs/ecrs fs/lib fs/module gap
Date: Sun, 21 Aug 2005 23:02:18 -0700 (PDT)

Author: grothoff
Date: 2005-08-21 23:02:16 -0700 (Sun, 21 Aug 2005)
New Revision: 1842

Modified:
   GNUnet/src/applications/fs/ecrs/download.c
   GNUnet/src/applications/fs/lib/fslib.c
   GNUnet/src/applications/fs/module/fs.c
   GNUnet/src/applications/fs/module/querymanager.c
   GNUnet/src/applications/gap/gap.c
Log:
fixing bug in delays and waits that resulted in downloads (esp. via loopback) 
being much slower than necessary

Modified: GNUnet/src/applications/fs/ecrs/download.c
===================================================================
--- GNUnet/src/applications/fs/ecrs/download.c  2005-08-22 04:18:36 UTC (rev 
1841)
+++ GNUnet/src/applications/fs/ecrs/download.c  2005-08-22 06:02:16 UTC (rev 
1842)
@@ -392,6 +392,8 @@
 
   struct FS_SEARCH_CONTEXT * sctx;
 
+  PTHREAD_T requestThread;
+
 } RequestManager;
 
 static int nodeReceive(const HashCode512 * query,
@@ -409,6 +411,7 @@
   RequestManager * rm;
 
   rm = MALLOC(sizeof(RequestManager));
+  PTHREAD_GET_SELF(&rm->requestThread);
   rm->abortFlag
     = NO;
   rm->lastDET
@@ -460,7 +463,8 @@
        rm->requestListSize,
        0);
   FS_SEARCH_destroyContext(rm->sctx);
-    MUTEX_DESTROY(&rm->lock);
+  MUTEX_DESTROY(&rm->lock);
+  PTHREAD_REL_SELF(&rm->requestThread);
   FREE(rm);
 }
 
@@ -860,7 +864,7 @@
        hash2enc(query,
                 &enc));
   LOG(LOG_DEBUG,
-      "Receiving reply to query %s\n",
+      "Receiving reply to query `%s'\n",
       &enc);
 #endif
 
@@ -923,6 +927,8 @@
       requestManagerEndgame(node->ctx->rm);
     }
   }
+  PTHREAD_KILL(&node->ctx->rm->requestThread,
+              SIGALRM);
   FREE(data);
   FREE(node);
   return OK;
@@ -1090,7 +1096,7 @@
     TTL_DECREMENT = rm->requestList[0]->node->ctx->TTL_DECREMENT;
 
   for (i=0;i<rm->requestListIndex;i++) {
-    if (rm->requestList[i]->lastTimeout >= now + TTL_DECREMENT) {
+    if (rm->requestList[i]->lastTimeout >= now - TTL_DECREMENT) {
       pending++;
     } else if (rm->requestList[i]->searchHandle != NULL) {
       FS_stop_search(rm->sctx,
@@ -1117,6 +1123,8 @@
           (0 == randomi(rm->requestListIndex *
                         pOCWCubed)) ) {
        delta = (rm->requestList[j]->lastTimeout - now) + 10 * cronMILLIS;
+       LOG(LOG_DEBUG,
+           "Requesting!\n");
        issueRequest(rm, j);
        pending++;
       } else { 
@@ -1159,10 +1167,12 @@
   NodeClosure * top;
   FileIdentifier fid;
 
+#if DEBUG_DOWNLOAD
   LOG(LOG_DEBUG,
       "`%s' running for file `%s'\n",
       __FUNCTION__,
       filename);
+#endif
   GNUNET_ASSERT(filename != NULL);
   fid = uri->data.chk;
   if (! ECRS_isFileUri(uri)) {

Modified: GNUnet/src/applications/fs/lib/fslib.c
===================================================================
--- GNUnet/src/applications/fs/lib/fslib.c      2005-08-22 04:18:36 UTC (rev 
1841)
+++ GNUnet/src/applications/fs/lib/fslib.c      2005-08-22 06:02:16 UTC (rev 
1842)
@@ -55,6 +55,7 @@
 static void * processReplies(SEARCH_CONTEXT * ctx) {
   CS_MESSAGE_HEADER * hdr;
   int i;
+  int matched;
   CS_fs_reply_content_MESSAGE * rep;
   HashCode512 query;
   unsigned int size;
@@ -65,6 +66,10 @@
     hdr = NULL;
     if (OK == readFromSocket(ctx->sock,
                             &hdr)) {
+#if DEBUG_FSLIB
+      LOG(LOG_DEBUG,
+         "FSLIB: received message from gnunetd\n");
+#endif
       delay = 100 * cronMILLIS;
       /* verify hdr, if reply, process, otherwise
         signal protocol problem; if ok, find
@@ -84,12 +89,14 @@
        FREE(hdr);
        continue;
       }
+      matched = 0;
       MUTEX_LOCK(ctx->lock);
       for (i=ctx->handleCount-1;i>=0;i--) {
        if (equalsHashCode512(&query,
                              &ctx->handles[i]->req->query[0])) {
          Datastore_Value * value;
 
+         matched++;
          if (ctx->handles[i]->callback != NULL) {      
            value = MALLOC(sizeof(Datastore_Value) + size);
            value->size = htonl(size + sizeof(Datastore_Value));
@@ -111,7 +118,17 @@
        }
       }
       MUTEX_UNLOCK(ctx->lock);
+#if DEBUG_FSLIB
+      if (matched == 0) 
+       LOG(LOG_DEBUG,
+           "FSLIB: received content but have no pending request\n");
+#endif
     } else {
+#if DEBUG_FSLIB
+      LOG(LOG_DEBUG,
+         "FSLIB: error communicating with gnunetd; sleeping for %ums\n",
+         delay);
+#endif
       gnunet_util_sleep(delay);
       delay *= 2;
       if (delay > 5 * cronSECONDS)
@@ -181,6 +198,11 @@
 #endif
 
   ret = MALLOC(sizeof(SEARCH_HANDLE));
+#if DEBUG_FSLIB
+  LOG(LOG_DEBUG,
+      "FSLIB: start search (%p)\n",
+      ret);
+#endif
   req = MALLOC(sizeof(CS_fs_request_search_MESSAGE) + (keyCount-1) * 
sizeof(HashCode512));
   req->header.size = htons(sizeof(CS_fs_request_search_MESSAGE) + (keyCount-1) 
* sizeof(HashCode512));
   req->header.type = htons(CS_PROTO_gap_QUERY_START);
@@ -217,6 +239,11 @@
                   ret);
     return NULL;
   }
+#if DEBUG_FSLIB
+  LOG(LOG_DEBUG,
+      "FSLIB: search started (%p)\n",
+      ret);
+#endif
   return ret;
 }
 
@@ -227,6 +254,11 @@
                    SEARCH_HANDLE * handle) {
   int i;
 
+#if DEBUG_FSLIB
+  LOG(LOG_DEBUG,
+      "FSLIB: stop search (%p)\n",
+      handle);
+#endif
   handle->req->header.type = htons(CS_PROTO_gap_QUERY_STOP);
   writeToSocket(ctx->sock,
                &handle->req->header);
@@ -238,6 +270,11 @@
     }
   MUTEX_UNLOCK(ctx->lock);
   FREE(handle->req);
+#if DEBUG_FSLIB
+  LOG(LOG_DEBUG,
+      "FSLIB: search stopped (%p)\n",
+      handle);
+#endif
   FREE(handle);
 }
 

Modified: GNUnet/src/applications/fs/module/fs.c
===================================================================
--- GNUnet/src/applications/fs/module/fs.c      2005-08-22 04:18:36 UTC (rev 
1841)
+++ GNUnet/src/applications/fs/module/fs.c      2005-08-22 06:02:16 UTC (rev 
1842)
@@ -86,48 +86,23 @@
  */
 static DHT_TableId dht_table;
 
-/**
- * Store an item in the datastore.
- *
- * @param query the unique identifier of the item
- * @param value the value to store
- * @param prio how much does our routing code value
- *        this datum?
- * @return OK if the value could be stored,
- *         NO if the value verifies but is not stored,
- *         SYSERR if the value is malformed
- */
-static int gapPut(void * closure,
-                 const HashCode512 * query,
-                 const DataContainer * value,
-                 unsigned int prio) {
+static Datastore_Value * 
+gapWrapperToDatastoreValue(const DataContainer * value,
+                          int prio) {
   Datastore_Value * dv;
   GapWrapper * gw;
   unsigned int size;
-  int ret;
-  HashCode512 hc;
   cron_t et;
   cron_t now;
-#if DEBUG_FS
-  EncName enc;
-#endif
 
   if (ntohl(value->size) < sizeof(GapWrapper)) {
     BREAK();
-    return SYSERR;
+    return NULL;
   }
   gw = (GapWrapper*) value;
   size = ntohl(gw->dc.size)
     - sizeof(GapWrapper)
     + sizeof(Datastore_Value);
-  if ( (OK != getQueryFor(size - sizeof(Datastore_Value),
-                         (DBlock*)&gw[1],
-                         &hc)) ||
-       (! equalsHashCode512(&hc, query)) ) {
-    BREAK(); /* value failed verification! */
-    return SYSERR;
-  }
-
   dv = MALLOC(size);
   dv->size = htonl(size);
   dv->type = htonl(getTypeOfBlock(size - sizeof(Datastore_Value),
@@ -146,6 +121,44 @@
   memcpy(&dv[1],
         &gw[1],
         size - sizeof(Datastore_Value));
+  return dv;
+}
+
+/**
+ * Store an item in the datastore.
+ *
+ * @param query the unique identifier of the item
+ * @param value the value to store
+ * @param prio how much does our routing code value
+ *        this datum?
+ * @return OK if the value could be stored,
+ *         NO if the value verifies but is not stored,
+ *         SYSERR if the value is malformed
+ */
+static int gapPut(void * closure,
+                 const HashCode512 * query,
+                 const DataContainer * value,
+                 unsigned int prio) {
+  Datastore_Value * dv;
+  GapWrapper * gw;
+  unsigned int size;
+  int ret;
+  HashCode512 hc;
+#if DEBUG_FS
+  EncName enc;
+#endif
+
+  dv = gapWrapperToDatastoreValue(value, prio);
+  if (dv == NULL)
+    return SYSERR;
+  size = ntohl(gw->dc.size) - sizeof(GapWrapper);
+  if ( (OK != getQueryFor(size,
+                         (DBlock*) &gw[1],
+                         &hc)) ||
+       (! equalsHashCode512(&hc, query)) ) {
+    BREAK(); /* value failed verification! */
+    return SYSERR;
+  }
   if (YES != isDatumApplicable(ntohl(dv->type),
                               ntohl(dv->size) - sizeof(Datastore_Value),
                               (DBlock*) &dv[1],
@@ -201,61 +214,6 @@
 }
 
 /**
- * Process a query from the client. Forwards to the network.
- *
- * @return SYSERR if the TCP connection should be closed, otherwise OK
- */
-static int csHandleRequestQueryStart(ClientHandle sock,
-                                    const CS_MESSAGE_HEADER * req) {
-  const CS_fs_request_search_MESSAGE * rs;
-  unsigned int keyCount;
-#if DEBUG_FS
-  EncName enc;
-#endif
-
-  if (ntohs(req->size) < sizeof(CS_fs_request_search_MESSAGE)) {
-    BREAK();
-    return SYSERR;
-  }
-  rs = (const CS_fs_request_search_MESSAGE*) req;
-#if DEBUG_FS
-  IFLOG(LOG_DEBUG,
-       hash2enc(&rs->query[0],
-                &enc));
-  LOG(LOG_DEBUG,
-      "FS received QUERY START (query: `%s')\n",
-      &enc);
-#endif
-  trackQuery(&rs->query[0],
-            ntohl(rs->type),
-            sock);
-  keyCount = 1 + (ntohs(req->size) - sizeof(CS_fs_request_search_MESSAGE)) / 
sizeof(HashCode512);
-  gap->get_start(ntohl(rs->type),
-                ntohl(rs->anonymityLevel),             
-                keyCount,
-                &rs->query[0],
-                ntohll(rs->expiration),
-                ntohl(rs->prio));
-  if ( (ntohl(rs->anonymityLevel) == 0) &&
-       (dht != NULL) ) {
-    DHT_GET_CLS * cls;
-
-    cls = MALLOC(sizeof(DHT_GET_CLS));
-    cls->prio = ntohl(rs->prio);
-    cls->rec = dht->get_start(&dht_table,
-                             ntohl(rs->type),
-                             keyCount,
-                             &rs->query[0],
-                             ntohll(rs->expiration),
-                             (DataProcessor) &get_result_callback,
-                             cls,
-                             (DHT_OP_Complete) &get_complete_callback,
-                             cls);
-  }
-  return OK;
-}
-
-/**
  * Stop processing a query.
  *
  * @return SYSERR if the TCP connection should be closed, otherwise OK
@@ -561,7 +519,7 @@
  * Process a client request unindex content.
  */
 static int csHandleCS_fs_request_unindex_MESSAGE(ClientHandle sock,
-                                 const CS_MESSAGE_HEADER * req) {
+                                                const CS_MESSAGE_HEADER * req) 
{
   int ret;
   CS_fs_request_unindex_MESSAGE * ru;
 
@@ -586,7 +544,7 @@
  * data is indexed.
  */
 static int csHandleCS_fs_request_test_index_MESSAGEed(ClientHandle sock,
-                                     const CS_MESSAGE_HEADER * req) {
+                                                     const CS_MESSAGE_HEADER * 
req) {
   int ret;
   RequestTestindex * ru;
 
@@ -946,6 +904,24 @@
   return ret;
 }
 
+static int replyHashFunction(const DataContainer * content,
+                            HashCode512 * id) {
+  const GapWrapper * gw;
+  unsigned int size;
+
+  size = ntohl(content->size);
+  if (size < sizeof(GapWrapper)) {
+    BREAK();
+    memset(id, 0, sizeof(HashCode512));
+    return SYSERR;
+  }
+  gw = (const GapWrapper*) content;
+  hash(&gw[1],
+       size - sizeof(GapWrapper),
+       id);
+  return OK;
+}
+
 static int uniqueReplyIdentifier(const DataContainer * content,
                                 unsigned int type,
                                 const HashCode512 * primaryKey) {
@@ -978,21 +954,97 @@
     return NO;
 }
 
-static int replyHashFunction(const DataContainer * content,
-                            HashCode512 * id) {
-  const GapWrapper * gw;
-  unsigned int size;
+static int fastPathProcessor(const HashCode512 * query,
+                            const DataContainer * value,
+                            void * cls) {
+  int * done = cls;
+  Datastore_Value * dv;
 
-  size = ntohl(content->size);
-  if (size < sizeof(GapWrapper)) {
+  dv = gapWrapperToDatastoreValue(value, 0);
+  if (dv == NULL)
+    return SYSERR;
+  processResponse(query,
+                 dv);
+  if (YES == uniqueReplyIdentifier(value,
+                                  ntohl(dv->type),
+                                  query))
+    *done = YES;
+  FREE(dv);
+  return OK;
+}
+
+/**
+ * Process a query from the client. Forwards to the network.
+ *
+ * @return SYSERR if the TCP connection should be closed, otherwise OK
+ */
+static int csHandleRequestQueryStart(ClientHandle sock,
+                                    const CS_MESSAGE_HEADER * req) {
+  const CS_fs_request_search_MESSAGE * rs;
+  unsigned int keyCount;
+#if DEBUG_FS 
+  EncName enc;
+#endif
+  unsigned int type;
+  int done;
+
+  if (ntohs(req->size) < sizeof(CS_fs_request_search_MESSAGE)) {
     BREAK();
-    memset(id, 0, sizeof(HashCode512));
     return SYSERR;
   }
-  gw = (const GapWrapper*) content;
-  hash(&gw[1],
-       size - sizeof(GapWrapper),
-       id);
+  rs = (const CS_fs_request_search_MESSAGE*) req;
+#if DEBUG_FS 
+  IFLOG(LOG_DEBUG,
+       hash2enc(&rs->query[0],
+                &enc));
+  LOG(LOG_DEBUG,
+      "FS received QUERY START (query: `%s')\n",
+      &enc);
+#endif
+  type = ntohl(rs->type);
+  trackQuery(&rs->query[0],
+            type,
+            sock);
+  keyCount = 1 + (ntohs(req->size) - sizeof(CS_fs_request_search_MESSAGE)) / 
sizeof(HashCode512);
+  
+  /* try a "fast path" avoiding gap/dht if unique reply is locally available */
+  done = NO;
+  gapGet(NULL,
+        type,
+        EXTREME_PRIORITY,
+        keyCount,
+        &rs->query[0],
+        &fastPathProcessor,
+        &done);
+  if (done == YES) {
+#if DEBUG_FS
+    LOG(LOG_DEBUG,
+       "FS successfully took GAP shortcut.\n");
+#endif
+    return OK; 
+  }
+  gap->get_start(type,
+                ntohl(rs->anonymityLevel),
+                keyCount,
+                &rs->query[0],
+                ntohll(rs->expiration),
+                ntohl(rs->prio));
+  if ( (ntohl(rs->anonymityLevel) == 0) &&
+       (dht != NULL) ) {
+    DHT_GET_CLS * cls;
+
+    cls = MALLOC(sizeof(DHT_GET_CLS));
+    cls->prio = ntohl(rs->prio);
+    cls->rec = dht->get_start(&dht_table,
+                             type,
+                             keyCount,
+                             &rs->query[0],
+                             ntohll(rs->expiration),
+                             (DataProcessor) &get_result_callback,
+                             cls,
+                             (DHT_OP_Complete) &get_complete_callback,
+                             cls);
+  }
   return OK;
 }
 

Modified: GNUnet/src/applications/fs/module/querymanager.c
===================================================================
--- GNUnet/src/applications/fs/module/querymanager.c    2005-08-22 04:18:36 UTC 
(rev 1841)
+++ GNUnet/src/applications/fs/module/querymanager.c    2005-08-22 06:02:16 UTC 
(rev 1842)
@@ -30,7 +30,7 @@
 #include "fs.h"
 #include "querymanager.h"
 
-#define DEBUG_QUERYMANAGER NO
+#define DEBUG_QUERYMANAGER YES
 
 typedef struct {
   HashCode512 query;

Modified: GNUnet/src/applications/gap/gap.c
===================================================================
--- GNUnet/src/applications/gap/gap.c   2005-08-22 04:18:36 UTC (rev 1841)
+++ GNUnet/src/applications/gap/gap.c   2005-08-22 06:02:16 UTC (rev 1842)
@@ -44,7 +44,7 @@
 #include "gnunet_traffic_service.h"
 #include "gnunet_topology_service.h"
 
-#define DEBUG_GAP NO
+#define DEBUG_GAP YES
 
 #define EXTRA_CHECKS YES
 
@@ -1055,9 +1055,17 @@
   ite = &ROUTING_indTable_[computeRoutingIndex(primaryKey)];
   if (! equalsHashCode512(&ite->primaryKey,
                          primaryKey) ) {
+#if DEBUG_GAP
+    LOG(LOG_DEBUG,
+       "GAP: Dropping reply, routing table has no query associated with it 
(anymore)\n");
+#endif
     return; /* we don't care for the reply (anymore) */
   }
   if (YES == ite->successful_local_lookup_in_delay_loop) {
+#if DEBUG_GAP
+    LOG(LOG_DEBUG,
+       "GAP: Dropping reply, found reply locally during delay\n");
+#endif
     return; /* wow, really bad concurrent DB lookup and processing for
               the same query.  Well, at least we should not also
               queue the delayed reply twice... */
@@ -1139,7 +1147,17 @@
                     const PeerIdentity * sender) {
   unsigned int i;
   cron_t now;
+#if DEBUG__GAP
+  EncName enc;
 
+  IFLOG(LOG_DEBUG,
+       hash2enc(query,
+                &enc));
+  LOG(LOG_DEBUG,
+      "GAP: Queueing query '%s' in slot %p\n",
+      &enc,
+      ite);
+#endif
   GNUNET_ASSERT(sender != NULL); /* do NOT add to RT for local clients! */
   cronTime(&now);
   if (mode == ITE_REPLACE) {
@@ -1576,14 +1594,15 @@
 
   ite = &ROUTING_indTable_[computeRoutingIndex(&query->queries[0])];
   MUTEX_LOCK(&lookup_exclusion);
+  i = -1;
   if (sender != NULL) {
     if ((policy & QUERY_INDIRECT) > 0) {
-      needsForwarding(&query->queries[0],
-                     ttl,
-                     prio,
-                     sender,
-                     &isRouted,
-                     &doForward);
+      i = needsForwarding(&query->queries[0],
+                         ttl,
+                         prio,
+                         sender,
+                         &isRouted,
+                         &doForward);
     } else {
       isRouted = NO;
       doForward = YES;
@@ -1602,10 +1621,11 @@
         hash2enc(&query->queries[0],
                 &enc));
   LOG(LOG_DEBUG,
-      "GAP is executing request for `%s': %s %s\n",
+      "GAP is executing request for `%s':%s%s (%d)\n",
       &enc,
-      doForward ? "forwarding" : "",
-      isRouted ? "routing" : "");
+      doForward ? " forwarding" : "",
+      isRouted ? " routing" : "",
+      i);
 #endif
   cls.values = NULL;
   cls.valueCount = 0;
@@ -2069,8 +2089,14 @@
   if ((policy & QUERY_DROPMASK) == 0) {
     FREE(qmsg);
 #if DEBUG_GAP
+    if (sender != NULL) {
+      IFLOG(LOG_DEBUG,
+           hash2enc(&sender->hashPubKey,
+                    &enc));
+    }
     LOG(LOG_DEBUG,
-       "Dropping query, policy decided that this peer is too busy.\n");
+       "Dropping query from %s, policy decided that this peer is too busy.\n",
+       sender == NULL ? "localhost" : &enc);
 #endif
     return OK; /* straight drop. */
   }





reply via email to

[Prev in Thread] Current Thread [Next in Thread]