gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r5126 - GNUnet/src/applications/fs/module


From: gnunet
Subject: [GNUnet-SVN] r5126 - GNUnet/src/applications/fs/module
Date: Sat, 23 Jun 2007 03:41:27 -0600 (MDT)

Author: grothoff
Date: 2007-06-23 03:41:27 -0600 (Sat, 23 Jun 2007)
New Revision: 5126

Modified:
   GNUnet/src/applications/fs/module/dht_push.c
   GNUnet/src/applications/fs/module/migration.c
Log:
Experimental (!) patch to reduce CPU load by improving
migration efficiency.  I want to eliminate the DHT 
part from the measurement, so that thread is deactivated.

Now I want to find out just how much this patch improves
things.  Feedback welcome.


Modified: GNUnet/src/applications/fs/module/dht_push.c
===================================================================
--- GNUnet/src/applications/fs/module/dht_push.c        2007-06-22 09:39:24 UTC 
(rev 5125)
+++ GNUnet/src/applications/fs/module/dht_push.c        2007-06-23 09:41:27 UTC 
(rev 5126)
@@ -31,6 +31,16 @@
 #include "gnunet_sqstore_service.h"
 
 /**
+ * Disable DHT pushing?  Set to 1 to essentially disable
+ * the code in this file.  Used to study its performance
+ * impact.  Useful also for users that do not want to 
+ * use non-anonymous file-sharing (since it eliminates
+ * some of the processing cost which would otherwise go
+ * to waste).
+ */
+#define NO_PUSH 1
+
+/**
  * DHT service.  Set to NULL to terminate
  */
 static DHT_ServiceAPI * dht;
@@ -75,6 +85,8 @@
   delay = 6 * cronHOURS / total;
   if (delay < 5 * cronSECONDS)
     delay = 5 * cronSECONDS;
+  if (delay > 60 * cronSECONDS)
+    delay = 60 * cronSECONDS;
   PTHREAD_SLEEP(delay);
   if (dht == NULL)
     return SYSERR;
@@ -121,9 +133,11 @@
   if (stats != NULL)
     stat_push_count
       = stats->create(gettext_noop("# blocks pushed into DHT"));
-  thread = PTHREAD_CREATE(&push_thread,
-                         NULL,
-                         1024 * 64);
+  if (! NO_PUSH) {
+    thread = PTHREAD_CREATE(&push_thread,
+                           NULL,
+                           1024 * 64);
+  }
 }
 
 void done_dht_push(void) {

Modified: GNUnet/src/applications/fs/module/migration.c
===================================================================
--- GNUnet/src/applications/fs/module/migration.c       2007-06-22 09:39:24 UTC 
(rev 5125)
+++ GNUnet/src/applications/fs/module/migration.c       2007-06-23 09:41:27 UTC 
(rev 5126)
@@ -36,6 +36,24 @@
 #define DEBUG_MIGRATION NO
 
 /**
+ * To how many peers may we migrate the same piece of content during
+ * one iteration?  Higher values mean less IO, but also migration
+ * becomes quickly much less effective (everyone has the same
+ * content!).  Also, numbers larger than the number of connections are
+ * simply a waste of memory.
+ */
+#define MAX_RECEIVERS 16
+
+/**
+ * How many migration records do we keep in memory
+ * at the same time?  Each record is about 32k, so
+ * 32 records will use about 1 MB of memory.
+ * We might want to allow users to specify larger
+ * values in the configuration file some day.
+ */
+#define MAX_RECORDS 32
+
+/**
  * Datastore service.
  */
 static Datastore_ServiceAPI * datastore;
@@ -64,6 +82,8 @@
 
 static int stat_migration_count;
 
+static int stat_migration_factor;
+
 static int stat_on_demand_migration_attempts;
 
 /**
@@ -71,14 +91,15 @@
  */
 static struct MUTEX * lock;
 
-/**
- * The content that we are currently trying
- * to migrate (used to give us more than one
- * chance should we fail for some reason the
- * first time).
- */
-static Datastore_Value * content;
-                       
+struct MigrationRecord {
+  Datastore_Value * value;
+  HashCode512 key;
+  unsigned int receiverIndices[MAX_RECEIVERS];
+  unsigned int sentCount;
+};
+
+static struct MigrationRecord content[MAX_RECORDS];
+
 static struct GE_Context * ectx;
        
 /**
@@ -100,30 +121,65 @@
 activeMigrationCallback(const PeerIdentity * receiver,
                        void * position,
                        unsigned int padding) {
-  /** key corresponding to content (if content != NULL);
-      yes, must be static! */
-  static HashCode512 key;
   unsigned int ret;
   GapWrapper * gw;
   unsigned int size;
   cron_t et;
   cron_t now;
   unsigned int anonymity;
-  Datastore_Value *enc;
+  Datastore_Value * enc;
+  Datastore_Value * value;
+  unsigned int index;
+  int entry;
+  int discard_entry;
+  int discard_match;
+  int i;
+  int j;
+  int match;
 
+  index = coreAPI->computeIndex(receiver);
   MUTEX_LOCK(lock);
-  if (content != NULL) {
-    size = sizeof(GapWrapper) + ntohl(content->size) - sizeof(Datastore_Value);
-    if (size > padding) {
-      FREE(content);
-      content = NULL;
+  entry = -1;
+  discard_entry = -1;
+  discard_match = -1;
+  for (i=0;i<MAX_RECORDS;i++) {
+    if (content[i].value == NULL) {
+      discard_entry = i;
+      discard_match = MAX_RECEIVERS + 1;
+      continue;
     }
+    if (ntohl(content[i].value->size) + sizeof(GapWrapper) - 
sizeof(Datastore_Value) <= padding) {
+      match = 0;
+      for (j=0;j<content[i].sentCount;j++) {
+       if (content[i].receiverIndices[j] == index) {
+         match = 1;
+         break;
+       }
+      }
+      if (match == 0) {
+       /* TODO: consider key proximity in matching as 
+          well! */
+       entry = i;
+       break;
+      } else {
+       if (content[i].sentCount > discard_match) {
+         discard_match = content[i].sentCount;
+         discard_entry = i;
+       }
+      }
+    }
   }
-  if (content == NULL) {
+  if (entry == -1) {
+    entry = discard_entry;
+    GE_ASSERT(NULL,
+             entry != -1);
+    FREENONNULL(content[entry].value);    
+    content[entry].value = NULL;
+    content[entry].sentCount = 0;
     if (OK != datastore->getRandom(&receiver->hashPubKey,
                                   padding,
-                                  &key,
-                                  &content,
+                                  &content[entry].key,
+                                  &content[entry].value,
                                   0)) {
       MUTEX_UNLOCK(lock);
 #if DEBUG_MIGRATION
@@ -132,68 +188,73 @@
             "Migration: random lookup in datastore failed.\n");
 #endif
       return 0;
-    }
+    }    
+    if (stats != NULL)
+      stats->change(stat_migration_factor, 1);
   }
-
+  value = content[entry].value;
+  if (value == NULL) {
+    GE_ASSERT(NULL, 0);
+    MUTEX_UNLOCK(lock);
+    return 0;
+  } 
+  size = sizeof(GapWrapper) + ntohl(value->size) - sizeof(Datastore_Value);
+  if (size > padding) {
+    MUTEX_UNLOCK(lock);
+    return 0;
+  }
 #if DEBUG_MIGRATION
   GE_LOG(ectx,
         GE_DEBUG | GE_BULK | GE_USER,
         "Migration: random lookup in datastore returned type %d.\n",
-        ntohl(content->type));
+        ntohl(value->type));
 #endif
-  if (ntohl(content->type) == ONDEMAND_BLOCK) {
+  if (ntohl(value->type) == ONDEMAND_BLOCK) {
     if (ONDEMAND_getIndexed(datastore,
-                           content,
-                           &key,
+                           value,
+                           &content[entry].key,
                            &enc) != OK) {
-      FREE(content);
-      content = NULL;
+      FREENONNULL(value);    
+      content[entry].value = NULL;
       MUTEX_UNLOCK(lock);
       return 0;
     }
     if (stats != NULL)
       stats->change(stat_on_demand_migration_attempts, 1);
-
-    FREE(content);
-    content = enc;
+    content[entry].value = enc;
+    FREE(value);
+    value = enc;
   }
 
-  size = sizeof(GapWrapper) + ntohl(content->size) - sizeof(Datastore_Value);
+  size = sizeof(GapWrapper) + ntohl(value->size) - sizeof(Datastore_Value);
   if (size > padding) {
     MUTEX_UNLOCK(lock);
-#if DEBUG_MIGRATION
-    GE_LOG(ectx,
-          GE_DEBUG | GE_REQUEST | GE_USER,
-          "Available content of size %u too big for available space (%u)\n",
-          size,
-          padding);
-#endif
     return 0;
   }
-  et = ntohll(content->expirationTime);
+  et = ntohll(value->expirationTime);
   now = get_time();
   if (et > now) {
     et -= now;
     et = et % MAX_MIGRATION_EXP;
     et += now;
   }
-  anonymity = ntohl(content->anonymityLevel);
+  anonymity = ntohl(value->anonymityLevel);
   ret = 0;
   if (anonymity == 0) {
-    /* ret > 0; (if DHT succeeds) fixme for DHT */
+    value->anonymityLevel = htonl(1);
+    anonymity = 1;
   }
-  if ( (ret == 0) &&
-       (OK == checkCoverTraffic(ectx,
-                               traffic,
-                               anonymity)) ) {
+  if (OK == checkCoverTraffic(ectx,
+                             traffic,
+                             anonymity)) {
     gw = MALLOC(size);
     gw->dc.size = htonl(size);
     gw->timeout = htonll(et);
     memcpy(&gw[1],
-          &content[1],
+          &value[1],
           size - sizeof(GapWrapper));
     ret = gap->tryMigrate(&gw->dc,
-                         &key,
+                         &content[entry].key,
                          position,
                          padding);
     FREE(gw);
@@ -203,21 +264,14 @@
           "gap's tryMigrate returned %u\n",
           ret);
 #endif
-  } else {
-#if DEBUG_MIGRATION
-    GE_LOG(ectx,
-          GE_DEBUG | GE_REQUEST | GE_USER,
-          "Migration: anonymity requirements not satisfied.\n");
-#endif
+    if (ret != 0) {
+      content[entry].receiverIndices[content[entry].sentCount++] = index;
+    }
   }
-  if (ret > 0) {
-    FREE(content);
-    content = NULL;
-  }
   MUTEX_UNLOCK(lock);
   if ( (ret > 0)&&
        (stats != NULL) )
-      stats->change(stat_migration_count, 1);
+    stats->change(stat_migration_count, 1);
   GE_BREAK(NULL, ret <= padding);
   return ret;
 }
@@ -240,6 +294,8 @@
   if (stats != NULL) {
     stat_migration_count
       = stats->create(gettext_noop("# blocks migrated"));
+    stat_migration_factor
+      = stats->create(gettext_noop("# blocks fetched for migration"));
     stat_on_demand_migration_attempts
       = stats->create(gettext_noop("# on-demand block migration attempts"));
   }
@@ -247,6 +303,7 @@
 }
 
 void doneMigration() {
+  int i;
   coreAPI->unregisterSendCallback(GAP_ESTIMATED_DATA_SIZE,
                                  &activeMigrationCallback);
   if (stats != NULL) {
@@ -258,8 +315,10 @@
   dht = NULL;
   coreAPI = NULL;
   traffic = NULL;
-  FREENONNULL(content);
-  content = NULL;
+  for (i=0;i<MAX_RECORDS;i++) {
+    FREENONNULL(content[i].value);
+    content[i].value = NULL;
+  }
   MUTEX_DESTROY(lock);
   lock = NULL;
 }





reply via email to

[Prev in Thread] Current Thread [Next in Thread]