gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3856 - GNUnet/src/applications/dht/module


From: grothoff
Subject: [GNUnet-SVN] r3856 - GNUnet/src/applications/dht/module
Date: Sat, 2 Dec 2006 20:37:48 -0800 (PST)

Author: grothoff
Date: 2006-12-02 20:37:46 -0800 (Sat, 02 Dec 2006)
New Revision: 3856

Modified:
   GNUnet/src/applications/dht/module/routing.c
Log:
s

Modified: GNUnet/src/applications/dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dht/module/routing.c        2006-12-03 03:51:44 UTC 
(rev 3855)
+++ GNUnet/src/applications/dht/module/routing.c        2006-12-03 04:37:46 UTC 
(rev 3856)
@@ -23,20 +23,16 @@
  * @brief state for active DHT routing operations
  * @author Christian Grothoff
  *
- * CRITICAL:
- * - tracking of get/put opertations
- * - reply routing
- *
  * RC:
  * - add support for GET retry (or delayed initial GET ops)
  * - fix problem with possible GET/PUT routing loops!
  *   (not convinced that current design even probabilistically
  *    prevents loops; also check for routing loops by
  *    checking pending queries)
- * - stats
  *
  * LATER:
  * - prioritization
+ * - delay selection
  */
 
 #include "platform.h"
@@ -148,16 +144,36 @@
  */
 typedef struct {
 
+  /**
+   * Information about where to send the results back to.
+   */ 
   DHT_Source_Route source;
 
+  /**
+   * GET message of this record.
+   */
   DHT_GET_MESSAGE * get;
 
+  /**
+   * Hashcodes of the results that we have send back
+   * so far.
+   */
+  HashCode512 * results;
+
+  /**
+   * Number of entries in results.
+   */
+  unsigned int result_count;
+
 } DHTQueryRecord;
 
 static unsigned int rt_size;
 
 static unsigned int rt_pos;
 
+/**
+ * rt_size records of active queries
+ */ 
 static DHTQueryRecord ** records;
 
 /**
@@ -182,18 +198,109 @@
                        unsigned int size,
                        const char * data,
                        void * cls) {
+  DHTQueryRecord * q;
+  int i;
+  int j;
+  int found;
+  HashCode512 hc;
+  DHT_RESULT_MESSAGE * result;
+
+  if (cls != NULL) {
+    result = cls;
+  } else {
+    result = MALLOC(sizeof(DHT_RESULT_MESSAGE) + size);
+    result->header.size = htons(sizeof(DHT_RESULT_MESSAGE) + size);
+    result->header.type = htons(P2P_PROTO_DHT_RESULT);
+    result->type = htonl(type);
+    result->key = *key;
+    memcpy(&result[1],
+          data,
+          size);
+  }
+  hash(data,
+       size,
+       &hc);
   MUTEX_LOCK(lock);
-  /* FIXME */
+  for (i=0;i<rt_size;i++) {
+    q = records[i];
+    if (q == NULL)
+      continue;
+    if ( (ntohl(q->get->type) != type) ||
+        (0 != memcmp(key,
+                     &q->get->key,
+                     sizeof(HashCode512))) )
+      continue;
+    found = NO;
+    for (j=0;j<q->result_count;j++)
+      if (0 == memcmp(&hc,
+                     &q->results[j],
+                     sizeof(HashCode512))) {
+       found = YES;
+       break;
+      }
+    if (found == YES)
+      continue;
+    GROW(q->results,
+        q->result_count,
+        q->result_count + 1);
+    q->results[q->result_count-1] = hc;
+    if (0 != memcmp(&q->source.source,
+                   coreAPI->myIdentity,
+                   sizeof(PeerIdentity))) {
+      coreAPI->unicast(&q->source.source,
+                      &result->header,
+                      0, /* FIXME: priority */
+                      5 * cronSECONDS); /* FIXME */
+      if (stats != NULL)
+       stats->change(stat_replies_routed, 1);
+    } else if (q->source.receiver != NULL) {
+      q->source.receiver(key,
+                        type,
+                        size,
+                        data,
+                        q->source.receiver_closure);
+      if (stats != NULL)
+       stats->change(stat_replies_routed, 1);
+    }
+  }
   MUTEX_UNLOCK(lock);
+  if (cls == NULL)
+    FREE(result);
 }
 
 static void addRoute(const PeerIdentity * sender,
                     ResultHandler handler,
                     void * cls,
                     const DHT_GET_MESSAGE * get) {
+  DHTQueryRecord * q;
+
   MUTEX_LOCK(lock);
-  /* FIXME */
+  if (records[rt_pos] != NULL) {
+    FREE(records[rt_pos]->get);
+    GROW(records[rt_pos]->results,
+        records[rt_pos]->result_count,
+        0); 
+  } else {
+    records[rt_pos] = MALLOC(sizeof(DHTQueryRecord));
+  }
+  q = records[rt_pos];
+  memset(q,
+        0,
+        sizeof(DHTQueryRecord));
+  q->get = MALLOC(ntohs(get->header.size));
+  memcpy(q->get,
+        get,
+        ntohs(get->header.size));
+  if (sender != NULL)
+    q->source.source = *sender;
+  else
+    q->source.source = *coreAPI->myIdentity;
+  q->source.receiver = handler;
+  q->source.receiver_closure = cls;
+  rt_pos = (rt_pos + 1) % rt_size;
   MUTEX_UNLOCK(lock);
+  if (stats != NULL)
+    stats->change(stat_requests_routed, 1);
 }
 
 /**
@@ -318,7 +425,7 @@
              ntohl(result->type),
              ntohs(result->header.size) - sizeof(DHT_RESULT_MESSAGE),
              (const char*) &result[1],
-             NULL);
+             (void*) msg);
   return OK;
 }
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]