[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r6179 - GNUnet/src/applications/fs/gap
From: |
gnunet |
Subject: |
[GNUnet-SVN] r6179 - GNUnet/src/applications/fs/gap |
Date: |
Wed, 6 Feb 2008 23:02:29 -0700 (MST) |
Author: grothoff
Date: 2008-02-06 23:02:29 -0700 (Wed, 06 Feb 2008)
New Revision: 6179
Modified:
GNUnet/src/applications/fs/gap/TODO
GNUnet/src/applications/fs/gap/fs_dht.c
GNUnet/src/applications/fs/gap/migration.c
Log:
dht-push code integrated
Modified: GNUnet/src/applications/fs/gap/TODO
===================================================================
--- GNUnet/src/applications/fs/gap/TODO 2008-02-07 05:44:15 UTC (rev 6178)
+++ GNUnet/src/applications/fs/gap/TODO 2008-02-07 06:02:29 UTC (rev 6179)
@@ -1,9 +1 @@
-DETAILS:
-1) make sure that anonymity-level zero content is pushed into
- the DHT by both fs (on insert/index) and by migration (for refresh)
-
-OUT-OF-SCOPE:
-1) modify datastore to return diverse subsets of large response sets,
- except when processing for loopback!
-2) make sure core polls whenever outbound bandwidth is available
Modified: GNUnet/src/applications/fs/gap/fs_dht.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs_dht.c 2008-02-07 05:44:15 UTC (rev
6178)
+++ GNUnet/src/applications/fs/gap/fs_dht.c 2008-02-07 06:02:29 UTC (rev
6179)
@@ -27,6 +27,8 @@
#include "platform.h"
#include "gnunet_dht_service.h"
+#include "gnunet_sqstore_service.h"
+#include "gnunet_stats_service.h"
#include "gnunet_protocols.h"
#include "ecrs_core.h"
#include "fs.h"
@@ -52,13 +54,37 @@
static GNUNET_DHT_ServiceAPI *dht;
+static GNUNET_SQstore_ServiceAPI *sqstore;
+
static GNUNET_CoreAPIForPlugins *coreAPI;
+static GNUNET_Stats_ServiceAPI *stats;
+
+static int stat_push_count;
+
static struct GNUNET_Mutex *lock;
static struct ActiveRequestRecords *records;
/**
+ * Thread that does the pushing.
+ */
+static struct GNUNET_ThreadHandle *thread;
+
+/**
+ * Should the thread terminate?
+ */
+static int shutdown_requested;
+
+/**
+ * Total number of entries with anonymity 0.
+ * Used to calculate how long we should wait
+ * between iterations.
+ */
+static unsigned int total;
+
+
+/**
* Cancel all requests with the DHT that
* are older than a certain time limit.
*/
@@ -166,22 +192,100 @@
GNUNET_mutex_unlock (lock);
}
+/**
+ * Callback invoked on zero-anonymity content
+ * (used to push that content into the DHT).
+ */
+static int
+push_callback (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue * value, void *closure,
+ unsigned long long uid)
+{
+ GNUNET_CronTime delay;
+ if (GNUNET_YES == shutdown_requested)
+ return GNUNET_SYSERR;
+ /* try pushing out everything every 6h,
+ but do not push more often than every 5s */
+ delay = 6 * GNUNET_CRON_HOURS / total;
+ if (delay < 5 * GNUNET_CRON_SECONDS)
+ delay = 5 * GNUNET_CRON_SECONDS;
+ if (delay > 60 * GNUNET_CRON_SECONDS)
+ delay = 60 * GNUNET_CRON_SECONDS;
+ GNUNET_thread_sleep (delay);
+ if (GNUNET_YES == shutdown_requested)
+ return GNUNET_SYSERR;
+ dht->put (key,
+ ntohl (value->type),
+ ntohl (value->size) - sizeof (GNUNET_DatastoreValue),
+ (const char *) &value[1]);
+ if (stats != NULL)
+ stats->change (stat_push_count, 1);
+ if (GNUNET_YES == shutdown_requested)
+ return GNUNET_SYSERR;
+ return GNUNET_OK;
+}
+
+/**
+ * Main method of the thread responsible for pushing
+ * out the content.
+ */
+static void *
+push_thread (void *cls)
+{
+ while ( (shutdown_requested == GNUNET_NO) &&
+ (dht != NULL) &&
+ (sqstore != NULL) )
+ {
+ if (total == 0)
+ total = 1;
+ total = sqstore->iterateNonAnonymous (0, &push_callback, NULL);
+ if ( (shutdown_requested == GNUNET_NO) &&
+ (total == 0) )
+ GNUNET_thread_sleep (5 * GNUNET_CRON_MINUTES);
+ }
+ return NULL;
+}
+
+
int
GNUNET_FS_DHT_init (GNUNET_CoreAPIForPlugins * capi)
{
coreAPI = capi;
lock = GNUNET_mutex_create (GNUNET_YES);
dht = capi->request_service ("dht");
+ sqstore = capi->request_service ("sqstore");
+ stats = capi->request_service ("stats");
+ if (stats != NULL)
+ stat_push_count
+ = stats->create (gettext_noop ("# blocks pushed into DHT"));
+ if ( (dht != NULL) &&
+ (sqstore != NULL) )
+ {
+ shutdown_requested = GNUNET_NO;
+ thread = GNUNET_thread_create (&push_thread, NULL, 1024 * 128);
+ }
return 0;
}
int
GNUNET_FS_DHT_done ()
{
+ void * unused;
+
purge_old_records (-1);
+ if (thread != NULL)
+ {
+ shutdown_requested = GNUNET_YES;
+ GNUNET_thread_stop_sleep (thread);
+ GNUNET_thread_join (thread, &unused);
+ }
if (dht != NULL)
coreAPI->release_service (dht);
+ dht = NULL;
+ if (sqstore != NULL)
+ coreAPI->release_service (sqstore);
+ sqstore = NULL;
coreAPI = NULL;
GNUNET_mutex_destroy (lock);
lock = NULL;
Modified: GNUnet/src/applications/fs/gap/migration.c
===================================================================
--- GNUnet/src/applications/fs/gap/migration.c 2008-02-07 05:44:15 UTC (rev
6178)
+++ GNUnet/src/applications/fs/gap/migration.c 2008-02-07 06:02:29 UTC (rev
6179)
@@ -29,7 +29,6 @@
#include "migration.h"
#include "fs.h"
#include "gnunet_datastore_service.h"
-#include "gnunet_dht_service.h"
#include "gnunet_stats_service.h"
#include "gnunet_protocols.h"
#include "anonymity.h"
@@ -70,11 +69,6 @@
*/
static GNUNET_CoreAPIForPlugins *coreAPI;
-/**
- * DHT service. Maybe NULL!
- */
-static GNUNET_DHT_ServiceAPI *dht;
-
static GNUNET_Stats_ServiceAPI *stats;
static int stat_migration_count;
@@ -347,7 +341,6 @@
(GNUNET_GAP_ESTIMATED_DATA_SIZE,
GNUNET_FS_GAP_CONTENT_MIGRATION_PRIORITY, &activeMigrationCallback);
datastore = capi->request_service ("datastore");
- dht = capi->request_service ("dht");
stats = capi->request_service ("stats");
if (stats != NULL)
{
@@ -375,8 +368,6 @@
}
coreAPI->release_service (datastore);
datastore = NULL;
- coreAPI->release_service (dht);
- dht = NULL;
coreAPI = NULL;
for (i = 0; i < MAX_RECORDS; i++)
{
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r6179 - GNUnet/src/applications/fs/gap,
gnunet <=