[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3856 - GNUnet/src/applications/dht/module
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3856 - GNUnet/src/applications/dht/module |
Date: |
Sat, 2 Dec 2006 20:37:48 -0800 (PST) |
Author: grothoff
Date: 2006-12-02 20:37:46 -0800 (Sat, 02 Dec 2006)
New Revision: 3856
Modified:
GNUnet/src/applications/dht/module/routing.c
Log:
s
Modified: GNUnet/src/applications/dht/module/routing.c
===================================================================
--- GNUnet/src/applications/dht/module/routing.c 2006-12-03 03:51:44 UTC
(rev 3855)
+++ GNUnet/src/applications/dht/module/routing.c 2006-12-03 04:37:46 UTC
(rev 3856)
@@ -23,20 +23,16 @@
* @brief state for active DHT routing operations
* @author Christian Grothoff
*
- * CRITICAL:
- * - tracking of get/put opertations
- * - reply routing
- *
* RC:
* - add support for GET retry (or delayed initial GET ops)
* - fix problem with possible GET/PUT routing loops!
* (not convinced that current design even probabilistically
* prevents loops; also check for routing loops by
* checking pending queries)
- * - stats
*
* LATER:
* - prioritization
+ * - delay selection
*/
#include "platform.h"
@@ -148,16 +144,36 @@
*/
typedef struct {
+ /**
+ * Information about where to send the results back to.
+ */
DHT_Source_Route source;
+ /**
+ * GET message of this record.
+ */
DHT_GET_MESSAGE * get;
+ /**
+ * Hashcodes of the results that we have send back
+ * so far.
+ */
+ HashCode512 * results;
+
+ /**
+ * Number of entries in results.
+ */
+ unsigned int result_count;
+
} DHTQueryRecord;
static unsigned int rt_size;
static unsigned int rt_pos;
+/**
+ * rt_size records of active queries
+ */
static DHTQueryRecord ** records;
/**
@@ -182,18 +198,109 @@
unsigned int size,
const char * data,
void * cls) {
+ DHTQueryRecord * q;
+ int i;
+ int j;
+ int found;
+ HashCode512 hc;
+ DHT_RESULT_MESSAGE * result;
+
+ if (cls != NULL) {
+ result = cls;
+ } else {
+ result = MALLOC(sizeof(DHT_RESULT_MESSAGE) + size);
+ result->header.size = htons(sizeof(DHT_RESULT_MESSAGE) + size);
+ result->header.type = htons(P2P_PROTO_DHT_RESULT);
+ result->type = htonl(type);
+ result->key = *key;
+ memcpy(&result[1],
+ data,
+ size);
+ }
+ hash(data,
+ size,
+ &hc);
MUTEX_LOCK(lock);
- /* FIXME */
+ for (i=0;i<rt_size;i++) {
+ q = records[i];
+ if (q == NULL)
+ continue;
+ if ( (ntohl(q->get->type) != type) ||
+ (0 != memcmp(key,
+ &q->get->key,
+ sizeof(HashCode512))) )
+ continue;
+ found = NO;
+ for (j=0;j<q->result_count;j++)
+ if (0 == memcmp(&hc,
+ &q->results[j],
+ sizeof(HashCode512))) {
+ found = YES;
+ break;
+ }
+ if (found == YES)
+ continue;
+ GROW(q->results,
+ q->result_count,
+ q->result_count + 1);
+ q->results[q->result_count-1] = hc;
+ if (0 != memcmp(&q->source.source,
+ coreAPI->myIdentity,
+ sizeof(PeerIdentity))) {
+ coreAPI->unicast(&q->source.source,
+ &result->header,
+ 0, /* FIXME: priority */
+ 5 * cronSECONDS); /* FIXME */
+ if (stats != NULL)
+ stats->change(stat_replies_routed, 1);
+ } else if (q->source.receiver != NULL) {
+ q->source.receiver(key,
+ type,
+ size,
+ data,
+ q->source.receiver_closure);
+ if (stats != NULL)
+ stats->change(stat_replies_routed, 1);
+ }
+ }
MUTEX_UNLOCK(lock);
+ if (cls == NULL)
+ FREE(result);
}
static void addRoute(const PeerIdentity * sender,
ResultHandler handler,
void * cls,
const DHT_GET_MESSAGE * get) {
+ DHTQueryRecord * q;
+
MUTEX_LOCK(lock);
- /* FIXME */
+ if (records[rt_pos] != NULL) {
+ FREE(records[rt_pos]->get);
+ GROW(records[rt_pos]->results,
+ records[rt_pos]->result_count,
+ 0);
+ } else {
+ records[rt_pos] = MALLOC(sizeof(DHTQueryRecord));
+ }
+ q = records[rt_pos];
+ memset(q,
+ 0,
+ sizeof(DHTQueryRecord));
+ q->get = MALLOC(ntohs(get->header.size));
+ memcpy(q->get,
+ get,
+ ntohs(get->header.size));
+ if (sender != NULL)
+ q->source.source = *sender;
+ else
+ q->source.source = *coreAPI->myIdentity;
+ q->source.receiver = handler;
+ q->source.receiver_closure = cls;
+ rt_pos = (rt_pos + 1) % rt_size;
MUTEX_UNLOCK(lock);
+ if (stats != NULL)
+ stats->change(stat_requests_routed, 1);
}
/**
@@ -318,7 +425,7 @@
ntohl(result->type),
ntohs(result->header.size) - sizeof(DHT_RESULT_MESSAGE),
(const char*) &result[1],
- NULL);
+ (void*) msg);
return OK;
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3856 - GNUnet/src/applications/dht/module,
grothoff <=