gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r6179 - GNUnet/src/applications/fs/gap


From: gnunet
Subject: [GNUnet-SVN] r6179 - GNUnet/src/applications/fs/gap
Date: Wed, 6 Feb 2008 23:02:29 -0700 (MST)

Author: grothoff
Date: 2008-02-06 23:02:29 -0700 (Wed, 06 Feb 2008)
New Revision: 6179

Modified:
   GNUnet/src/applications/fs/gap/TODO
   GNUnet/src/applications/fs/gap/fs_dht.c
   GNUnet/src/applications/fs/gap/migration.c
Log:
dht-push code integrated

Modified: GNUnet/src/applications/fs/gap/TODO
===================================================================
--- GNUnet/src/applications/fs/gap/TODO 2008-02-07 05:44:15 UTC (rev 6178)
+++ GNUnet/src/applications/fs/gap/TODO 2008-02-07 06:02:29 UTC (rev 6179)
@@ -1,9 +1 @@
-DETAILS:
-1) make sure that anonymity-level zero content is pushed into
-   the DHT by both fs (on insert/index) and by migration (for refresh)
 
-
-OUT-OF-SCOPE:
-1) modify datastore to return diverse subsets of large response sets,
-   except when processing for loopback!
-2) make sure core polls whenever outbound bandwidth is available

Modified: GNUnet/src/applications/fs/gap/fs_dht.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dht.c     2008-02-07 05:44:15 UTC (rev 
6178)
+++ GNUnet/src/applications/fs/gap/fs_dht.c     2008-02-07 06:02:29 UTC (rev 
6179)
@@ -27,6 +27,8 @@
 
 #include "platform.h"
 #include "gnunet_dht_service.h"
+#include "gnunet_sqstore_service.h"
+#include "gnunet_stats_service.h"
 #include "gnunet_protocols.h"
 #include "ecrs_core.h"
 #include "fs.h"
@@ -52,13 +54,37 @@
 
 static GNUNET_DHT_ServiceAPI *dht;
 
+static GNUNET_SQstore_ServiceAPI *sqstore;
+
 static GNUNET_CoreAPIForPlugins *coreAPI;
 
+static GNUNET_Stats_ServiceAPI *stats;
+
+static int stat_push_count;
+
 static struct GNUNET_Mutex *lock;
 
 static struct ActiveRequestRecords *records;
 
 /**
+ * Thread that does the pushing.
+ */
+static struct GNUNET_ThreadHandle *thread;
+
+/**
+ * Should the thread terminate?
+ */
+static int shutdown_requested;
+
+/**
+ * Total number of entries with anonymity 0.
+ * Used to calculate how long we should wait
+ * between iterations.
+ */
+static unsigned int total;
+
+
+/**
  * Cancel all requests with the DHT that
  * are older than a certain time limit.
  */
@@ -166,22 +192,100 @@
   GNUNET_mutex_unlock (lock);
 }
 
+/**
+ * Callback invoked on zero-anonymity content
+ * (used to push that content into the DHT).
+ */
+static int
+push_callback (const GNUNET_HashCode * key,
+               const GNUNET_DatastoreValue * value, void *closure,
+               unsigned long long uid)
+{
+  GNUNET_CronTime delay;
 
+  if (GNUNET_YES == shutdown_requested)
+    return GNUNET_SYSERR;
+  /* try pushing out everything every 6h,
+     but do not push more often than every 5s */
+  delay = 6 * GNUNET_CRON_HOURS / total;
+  if (delay < 5 * GNUNET_CRON_SECONDS)
+    delay = 5 * GNUNET_CRON_SECONDS;
+  if (delay > 60 * GNUNET_CRON_SECONDS)
+    delay = 60 * GNUNET_CRON_SECONDS;
+  GNUNET_thread_sleep (delay);
+  if (GNUNET_YES == shutdown_requested)
+    return GNUNET_SYSERR;
+  dht->put (key,
+            ntohl (value->type),
+            ntohl (value->size) - sizeof (GNUNET_DatastoreValue),
+            (const char *) &value[1]);
+  if (stats != NULL)
+    stats->change (stat_push_count, 1);
+  if (GNUNET_YES == shutdown_requested)
+    return GNUNET_SYSERR;
+  return GNUNET_OK;
+}
+
+/**
+ * Main method of the thread responsible for pushing
+ * out the content.
+ */
+static void *
+push_thread (void *cls)
+{
+  while ( (shutdown_requested == GNUNET_NO) &&
+         (dht != NULL) && 
+         (sqstore != NULL) )
+    {
+      if (total == 0)
+        total = 1;
+      total = sqstore->iterateNonAnonymous (0, &push_callback, NULL);
+      if ( (shutdown_requested == GNUNET_NO) &&
+          (total == 0) )
+        GNUNET_thread_sleep (5 * GNUNET_CRON_MINUTES);
+    }
+  return NULL;
+}
+
+
 int
 GNUNET_FS_DHT_init (GNUNET_CoreAPIForPlugins * capi)
 {
   coreAPI = capi;
   lock = GNUNET_mutex_create (GNUNET_YES);
   dht = capi->request_service ("dht");
+  sqstore = capi->request_service ("sqstore");
+  stats = capi->request_service ("stats");
+  if (stats != NULL)
+    stat_push_count
+      = stats->create (gettext_noop ("# blocks pushed into DHT"));
+  if ( (dht != NULL) &&
+       (sqstore != NULL) )
+    {
+      shutdown_requested = GNUNET_NO;
+      thread = GNUNET_thread_create (&push_thread, NULL, 1024 * 128);
+    }
   return 0;
 }
 
 int
 GNUNET_FS_DHT_done ()
 {
+  void * unused;
+
   purge_old_records (-1);
+  if (thread != NULL)
+    {
+      shutdown_requested = GNUNET_YES;
+      GNUNET_thread_stop_sleep (thread);
+      GNUNET_thread_join (thread, &unused);
+    }
   if (dht != NULL)
     coreAPI->release_service (dht);
+  dht = NULL;
+  if (sqstore != NULL)
+    coreAPI->release_service (sqstore);
+  sqstore = NULL;
   coreAPI = NULL;
   GNUNET_mutex_destroy (lock);
   lock = NULL;

Modified: GNUnet/src/applications/fs/gap/migration.c
===================================================================
--- GNUnet/src/applications/fs/gap/migration.c  2008-02-07 05:44:15 UTC (rev 
6178)
+++ GNUnet/src/applications/fs/gap/migration.c  2008-02-07 06:02:29 UTC (rev 
6179)
@@ -29,7 +29,6 @@
 #include "migration.h"
 #include "fs.h"
 #include "gnunet_datastore_service.h"
-#include "gnunet_dht_service.h"
 #include "gnunet_stats_service.h"
 #include "gnunet_protocols.h"
 #include "anonymity.h"
@@ -70,11 +69,6 @@
  */
 static GNUNET_CoreAPIForPlugins *coreAPI;
 
-/**
- * DHT service.  Maybe NULL!
- */
-static GNUNET_DHT_ServiceAPI *dht;
-
 static GNUNET_Stats_ServiceAPI *stats;
 
 static int stat_migration_count;
@@ -347,7 +341,6 @@
     (GNUNET_GAP_ESTIMATED_DATA_SIZE,
      GNUNET_FS_GAP_CONTENT_MIGRATION_PRIORITY, &activeMigrationCallback);
   datastore = capi->request_service ("datastore");
-  dht = capi->request_service ("dht");
   stats = capi->request_service ("stats");
   if (stats != NULL)
     {
@@ -375,8 +368,6 @@
     }
   coreAPI->release_service (datastore);
   datastore = NULL;
-  coreAPI->release_service (dht);
-  dht = NULL;
   coreAPI = NULL;
   for (i = 0; i < MAX_RECORDS; i++)
     {





reply via email to

[Prev in Thread] Current Thread [Next in Thread]