gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r6401 - in GNUnet/src: applications/fs/ecrs applications/fs


From: gnunet
Subject: [GNUnet-SVN] r6401 - in GNUnet/src: applications/fs/ecrs applications/fs/gap include
Date: Wed, 20 Feb 2008 23:51:29 -0700 (MST)

Author: grothoff
Date: 2008-02-20 23:51:29 -0700 (Wed, 20 Feb 2008)
New Revision: 6401

Modified:
   GNUnet/src/applications/fs/ecrs/search.c
   GNUnet/src/applications/fs/gap/fs.c
   GNUnet/src/applications/fs/gap/gap.c
   GNUnet/src/applications/fs/gap/querymanager.c
   GNUnet/src/applications/fs/gap/querymanager.h
   GNUnet/src/applications/fs/gap/shared.h
   GNUnet/src/include/fs.h
Log:
better async processing

Modified: GNUnet/src/applications/fs/ecrs/search.c
===================================================================
--- GNUnet/src/applications/fs/ecrs/search.c    2008-02-21 06:50:11 UTC (rev 
6400)
+++ GNUnet/src/applications/fs/ecrs/search.c    2008-02-21 06:51:29 UTC (rev 
6401)
@@ -330,7 +330,10 @@
       GNUNET_EC_file_block_check_and_get_query (size,
                                                 (const DBlock *) &value[1],
                                                 GNUNET_YES, &query))
-    return GNUNET_SYSERR;
+    {
+      GNUNET_GE_BREAK(NULL, 0);
+      return GNUNET_SYSERR;
+    }
   if (!((0 == memcmp (&query,
                       (GNUNET_HashCode *) & ps[1], sizeof (GNUNET_HashCode)))
         && ((ps->type == type) || (ps->type == GNUNET_ECRS_BLOCKTYPE_ANY))
@@ -341,7 +344,9 @@
                                                      ps->keyCount,
                                                      (GNUNET_HashCode *) &
                                                      ps[1]))))
-    return GNUNET_OK;           /* not a match */
+    {
+      return GNUNET_OK;           /* not a match */
+    }
 
   switch (type)
     {
@@ -355,7 +360,10 @@
         int j;
 
         if (size < sizeof (KBlock))
-          return GNUNET_SYSERR;
+         {
+           GNUNET_GE_BREAK(NULL, 0);
+           return GNUNET_SYSERR;
+         }
         kb = GNUNET_malloc (size);
         memcpy (kb, &value[1], size);
 #if DEBUG_SEARCH

Modified: GNUnet/src/applications/fs/gap/fs.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs.c 2008-02-21 06:50:11 UTC (rev 6400)
+++ GNUnet/src/applications/fs/gap/fs.c 2008-02-21 06:51:29 UTC (rev 6401)
@@ -392,6 +392,8 @@
 {
   struct GNUNET_ClientHandle *sock;
   struct ResponseList *seen;
+  unsigned int processed;
+  int have_more;
 };
 
 /**
@@ -413,7 +415,14 @@
   GNUNET_DatastoreValue *enc;
   const GNUNET_DatastoreValue *use;
   unsigned int type;
+  int ret;
 
+  if (cls->processed > MAX_SYNC_PROCESSED)
+    {
+      cls->have_more = GNUNET_YES;
+      return GNUNET_SYSERR;
+    }
+  cls->processed++;
   size = ntohl (value->size) - sizeof (GNUNET_DatastoreValue);
   dblock = (const DBlock *) &value[1];
   enc = NULL;
@@ -435,11 +444,12 @@
   memcpy (&msg[1], dblock, size);
   type = ntohl (dblock->type);
   GNUNET_free_non_null (enc);
-  coreAPI->cs_send_to_client (sock, &msg->header,
-                             type != GNUNET_ECRS_BLOCKTYPE_DATA ? GNUNET_NO : 
GNUNET_YES);
+  ret = coreAPI->cs_send_to_client (sock, &msg->header,
+                                   type != GNUNET_ECRS_BLOCKTYPE_DATA ? 
GNUNET_NO : GNUNET_YES);
   GNUNET_free (msg);
-  if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
-    return GNUNET_SYSERR;       /* unique response */
+  if ( (type == GNUNET_ECRS_BLOCKTYPE_DATA) || 
+       (ret != GNUNET_OK) )
+    return GNUNET_SYSERR;       /* unique response or client can take no more*/
   rl = GNUNET_malloc (sizeof (struct ResponseList));
   GNUNET_hash (dblock, size, &rl->hash);
   rl->next = cls->seen;
@@ -485,6 +495,8 @@
 #endif
   fpp.sock = sock;
   fpp.seen = NULL;
+  fpp.have_more = GNUNET_NO;
+  fpp.processed = 0;
   if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
     {
       if ((1 == datastore->get (&rs->query[0],
@@ -505,7 +517,8 @@
   GNUNET_FS_QUERYMANAGER_start_query (&rs->query[0], keyCount, anonymityLevel,
                                       type, sock,
                                       have_target ? &rs->target : NULL,
-                                      fpp.seen);
+                                      fpp.seen,
+                                     fpp.have_more);
 CLEANUP:
   while (fpp.seen != NULL)
     {

Modified: GNUnet/src/applications/fs/gap/gap.c
===================================================================
--- GNUnet/src/applications/fs/gap/gap.c        2008-02-21 06:50:11 UTC (rev 
6400)
+++ GNUnet/src/applications/fs/gap/gap.c        2008-02-21 06:51:29 UTC (rev 
6401)
@@ -43,12 +43,6 @@
 #define MAX_ENTRIES_PER_SLOT 2
 
 /**
- * If, after finding local results, we abort a GET
- * iteration, we increment "have_more" by this value.
- */
-#define HAVE_MORE_INCREMENT 5
-
-/**
  * How often do we check have_more?
  */
 #define HAVE_MORE_FREQUENCY (100 * GNUNET_CRON_MILLISECONDS)

Modified: GNUnet/src/applications/fs/gap/querymanager.c
===================================================================
--- GNUnet/src/applications/fs/gap/querymanager.c       2008-02-21 06:50:11 UTC 
(rev 6400)
+++ GNUnet/src/applications/fs/gap/querymanager.c       2008-02-21 06:51:29 UTC 
(rev 6401)
@@ -64,17 +64,18 @@
 
 };
 
-static GNUNET_CoreAPIForPlugins *coreAPI;
-
 /**
  * List of all clients, their active requests and other
  * per-client information.
  */
 static struct ClientDataList *clients;
 
+static GNUNET_CoreAPIForPlugins * coreAPI;
 
 static GNUNET_Stats_ServiceAPI *stats;
 
+static GNUNET_Datastore_ServiceAPI *datastore;
+
 static int stat_gap_client_query_received;
 
 static int stat_gap_client_response_sent;
@@ -131,7 +132,8 @@
                                     unsigned int type,
                                     struct GNUNET_ClientHandle *client,
                                     const GNUNET_PeerIdentity * target,
-                                    const struct ResponseList *seen)
+                                    const struct ResponseList *seen,
+                                   int have_more)
 {
   struct ClientDataList *cl;
   struct RequestList *request;
@@ -153,6 +155,8 @@
   request->primary_target = GNUNET_FS_PT_intern (target);
   request->response_client = client;
   request->policy = GNUNET_FS_RoutingPolicy_ALL;
+  if (have_more != GNUNET_NO)
+    request->have_more = HAVE_MORE_INCREMENT;
   memcpy (&request->queries[0], query, sizeof (GNUNET_HashCode) * key_count);
   if (seen != NULL)
     {
@@ -275,13 +279,13 @@
   msg->expirationTime = GNUNET_htonll (expirationTime);
   memcpy (&msg[1], data, size);
   coreAPI->cs_send_to_client (client,
-                              &msg->header,
-                              (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA)
-                              ? GNUNET_NO : GNUNET_YES);
+                             &msg->header,
+                             (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA)
+                             ? GNUNET_NO : GNUNET_YES);
   if (stats != NULL)
     stats->change (stat_gap_client_response_sent, 1);
   GNUNET_free (msg);
-
+  
   /* update *value */
   *value += 1 + rl->value;
   GNUNET_FS_PLAN_success (sender, client, 0, rl);
@@ -431,13 +435,71 @@
   GNUNET_mutex_unlock (GNUNET_FS_lock);
 }
 
+
+struct HMClosure
+{
+  struct RequestList * request;  
+  unsigned int processed;
+  int have_more;
+};
+
 /**
+ * Any response that we get should be passed
+ * back to the client.  If the response is unique,
+ * we should about the iteration (return GNUNET_SYSERR).
+ */
+static int
+have_more_processor (const GNUNET_HashCode * key,
+                     const GNUNET_DatastoreValue *
+                     value, void *closure, unsigned long long uid)
+{
+  struct HMClosure *cls = closure;
+  const DBlock *dblock;
+  GNUNET_HashCode hc;
+  CS_fs_reply_content_MESSAGE *msg;
+  unsigned int size;
+  int ret;
+
+  size = ntohl (value->size) - sizeof (GNUNET_DatastoreValue);
+  dblock = (const DBlock *) &value[1];
+  if (GNUNET_OK == GNUNET_FS_SHARED_test_valid_new_response (cls->request,
+                                                            key,
+                                                            size,
+                                                            dblock, &hc))
+    {
+      msg = GNUNET_malloc (sizeof (CS_fs_reply_content_MESSAGE) + size);
+      msg->header.type = htons (GNUNET_CS_PROTO_GAP_RESULT);
+      msg->header.size = htons (sizeof (CS_fs_reply_content_MESSAGE) + size);
+      msg->anonymityLevel = value->anonymityLevel;
+      msg->expirationTime = value->expirationTime;
+      memcpy (&msg[1], dblock, size);
+      ret = coreAPI->cs_send_to_client (cls->request->response_client,
+                                       &msg->header,
+                                       GNUNET_YES);
+      GNUNET_free (msg);
+      if (ret != GNUNET_OK)
+       return GNUNET_SYSERR;       /* client can take no more */       
+      GNUNET_FS_SHARED_mark_response_seen(cls->request,
+                                         &hc);
+    }
+  cls->processed++;
+  if (cls->processed > MAX_ASYNC_PROCESSED)
+    {
+      cls->have_more = GNUNET_YES;
+      return GNUNET_SYSERR;
+    }
+  return GNUNET_OK;
+}
+
+
+/**
  * Cron-job to periodically check if we should
  * repeat requests.
  */
 static void
 repeat_requests_job (void *unused)
 {
+  struct HMClosure hmc;
   struct ClientDataList *client;
   struct RequestList *request;
   GNUNET_CronTime now;
@@ -450,27 +512,40 @@
       request = client->requests;
       while (request != NULL)
         {
-          if ((NULL == request->plan_entries) &&
-              ((client->client != NULL) ||
-               (request->expiration > now)) &&
-              (request->last_ttl_used * GNUNET_CRON_SECONDS +
-               request->last_request_time < now))
-            {
-              if ((GNUNET_OK ==
-                   GNUNET_FS_PLAN_request (client->client, 0, request))
-                  && (stats != NULL))
-                stats->change (stat_gap_client_query_injected, 1);
-            }
-
-          if ((request->anonymityLevel == 0) &&
-              (request->last_dht_get + request->dht_back_off < now))
-            {
-              if (request->dht_back_off * 2 > request->dht_back_off)
-                request->dht_back_off *= 2;
-              request->last_dht_get = now;
-              GNUNET_FS_DHT_execute_query (request->type,
-                                           &request->queries[0]);
-            }
+         if (request->have_more > 0)
+           {
+             request->have_more--;
+             hmc.request = request;
+             hmc.processed = 0;
+             hmc.have_more = GNUNET_NO;
+             datastore->get (&request->queries[0], request->type, 
&have_more_processor, &hmc);
+             if (hmc.have_more)
+               request->have_more += HAVE_MORE_INCREMENT;
+           }
+         else
+           {
+             if ((NULL == request->plan_entries) &&
+                 ((client->client != NULL) ||
+                  (request->expiration > now)) &&
+                 (request->last_ttl_used * GNUNET_CRON_SECONDS +
+                  request->last_request_time < now))
+               {
+                 if ((GNUNET_OK ==
+                      GNUNET_FS_PLAN_request (client->client, 0, request))
+                     && (stats != NULL))
+                   stats->change (stat_gap_client_query_injected, 1);
+               }
+             
+             if ((request->anonymityLevel == 0) &&
+                 (request->last_dht_get + request->dht_back_off < now))
+               {
+                 if (request->dht_back_off * 2 > request->dht_back_off)
+                   request->dht_back_off *= 2;
+                 request->last_dht_get = now;
+                 GNUNET_FS_DHT_execute_query (request->type,
+                                              &request->queries[0]);
+               }
+           }
           request = request->next;
         }
       client = client->next;
@@ -485,10 +560,7 @@
   GNUNET_GE_ASSERT (capi->ectx,
                     GNUNET_SYSERR !=
                     capi->cs_exit_handler_register (&handle_client_exit));
-  GNUNET_cron_add_job (capi->cron,
-                       &repeat_requests_job,
-                       CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL);
-
+  datastore = capi->request_service ("datastore");
   stats = capi->request_service ("stats");
   if (stats != NULL)
     {
@@ -504,6 +576,9 @@
         stats->
         create (gettext_noop ("# gap query bloomfilter resizing updates"));
     }
+  GNUNET_cron_add_job (capi->cron,
+                       &repeat_requests_job,
+                       CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL);
   return 0;
 }
 
@@ -518,6 +593,8 @@
                     cs_exit_handler_unregister (&handle_client_exit));
   while (clients != NULL)
     handle_client_exit (clients->client);
+  coreAPI->release_service (datastore);
+  datastore = NULL;
   if (stats != NULL)
     {
       coreAPI->release_service (stats);

Modified: GNUnet/src/applications/fs/gap/querymanager.h
===================================================================
--- GNUnet/src/applications/fs/gap/querymanager.h       2008-02-21 06:50:11 UTC 
(rev 6400)
+++ GNUnet/src/applications/fs/gap/querymanager.h       2008-02-21 06:51:29 UTC 
(rev 6401)
@@ -42,6 +42,7 @@
  * client disconnects.
  *
  * @param target peer known to have the content, maybe NULL.
+ * @param have_more do we have more results in our local datastore?
  */
 void
 GNUNET_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
@@ -50,7 +51,8 @@
                                     unsigned int type,
                                     struct GNUNET_ClientHandle *client,
                                     const GNUNET_PeerIdentity * target,
-                                    const struct ResponseList *seen);
+                                    const struct ResponseList *seen,
+                                   int have_more);
 
 /**
  * Handle the given response (by forwarding it to

Modified: GNUnet/src/applications/fs/gap/shared.h
===================================================================
--- GNUnet/src/applications/fs/gap/shared.h     2008-02-21 06:50:11 UTC (rev 
6400)
+++ GNUnet/src/applications/fs/gap/shared.h     2008-02-21 06:51:29 UTC (rev 
6401)
@@ -286,6 +286,7 @@
  */
 extern struct GNUNET_Mutex *GNUNET_FS_lock;
 
+
 /**
  * Free the request list, including the associated
  * list of pending requests, its entries in the

Modified: GNUnet/src/include/fs.h
===================================================================
--- GNUnet/src/include/fs.h     2008-02-21 06:50:11 UTC (rev 6400)
+++ GNUnet/src/include/fs.h     2008-02-21 06:51:29 UTC (rev 6401)
@@ -440,8 +440,23 @@
  */
 #define GNUNET_GAP_ESTIMATED_DATA_SIZE (32 * 1024)
 
+/**
+ * If, after finding local results, we abort a GET
+ * iteration, we increment "have_more" by this value.
+ */
+#define HAVE_MORE_INCREMENT 2
 
+/**
+ * What is the maximum number of local results
+ * that we are willing to return synchronously?
+ */
+#define MAX_SYNC_PROCESSED 8
 
+/**
+ * What is the maximum number of local results
+ * that we are willing to return synchronously?
+ */
+#define MAX_ASYNC_PROCESSED 32
 
 
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]