[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r24891 - in gnunet/src: dht include
From: |
gnunet |
Subject: |
[GNUnet-SVN] r24891 - in gnunet/src: dht include |
Date: |
Sat, 10 Nov 2012 22:21:02 +0100 |
Author: grothoff
Date: 2012-11-10 22:21:01 +0100 (Sat, 10 Nov 2012)
New Revision: 24891
Modified:
gnunet/src/dht/dht.h
gnunet/src/dht/dht_api.c
gnunet/src/dht/gnunet-service-dht_clients.c
gnunet/src/include/gnunet_dht_service.h
gnunet/src/include/gnunet_protocols.h
Log:
-implementing #2435
Modified: gnunet/src/dht/dht.h
===================================================================
--- gnunet/src/dht/dht.h 2012-11-10 20:21:45 UTC (rev 24890)
+++ gnunet/src/dht/dht.h 2012-11-10 21:21:01 UTC (rev 24891)
@@ -109,6 +109,39 @@
/**
+ * DHT GET RESULTS KNOWN message sent from clients to service. Indicates that
a GET
+ * request should exclude certain results which are already known.
+ */
+struct GNUNET_DHT_ClientGetResultSeenMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Reserved, always 0.
+ */
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * The key we are searching for (to make it easy to find the corresponding
+ * GET inside the service).
+ */
+ struct GNUNET_HashCode key;
+
+ /**
+ * Unique ID identifying this request.
+ */
+ uint64_t unique_id GNUNET_PACKED;
+
+ /* Followed by an array of the hash codes of known results */
+
+};
+
+
+
+/**
* Reply to a GET send from the service to a client.
*/
struct GNUNET_DHT_ClientResultMessage
@@ -325,7 +358,7 @@
struct GNUNET_DHT_MonitorGetMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT
+ * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET
*/
struct GNUNET_MessageHeader header;
Modified: gnunet/src/dht/dht_api.c
===================================================================
--- gnunet/src/dht/dht_api.c 2012-11-10 20:21:45 UTC (rev 24890)
+++ gnunet/src/dht/dht_api.c 2012-11-10 21:21:01 UTC (rev 24891)
@@ -173,15 +173,42 @@
struct PendingMessage *message;
/**
+ * Array of hash codes over the results that we have already
+ * seen.
+ */
+ struct GNUNET_HashCode *seen_results;
+
+ /**
* Key that this get request is for
*/
- struct GNUNET_HashCode key;
+ struct GNUNET_HashCode key;
/**
* Unique identifier for this request (for key collisions).
*/
uint64_t unique_id;
+ /**
+ * Size of the 'seen_results' array. Note that not
+ * all positions might be used (as we over-allocate).
+ */
+ unsigned int seen_results_size;
+
+ /**
+ * Offset into the 'seen_results' array marking the
+ * end of the positions that are actually used.
+ */
+ unsigned int seen_results_end;
+
+ /**
+ * Offset into the 'seen_results' array marking the
+ * position up to where we've send the hash codes to
+ * the DHT for blocking (needed as we might not be
+ * able to send all hash codes at once).
+ */
+ unsigned int seen_results_transmission_offset;
+
+
};
@@ -353,6 +380,50 @@
/**
+ * Queue messages to DHT to block certain results from the result set.
+ *
+ * @param get_handle GET to generate messages for.
+ */
+static void
+queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle)
+{
+ struct PendingMessage *pm;
+ struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
+ uint16_t msize;
+ unsigned int delta;
+ unsigned int max;
+
+ while (get_handle->seen_results_transmission_offset <
get_handle->seen_results_end)
+ {
+ delta = get_handle->seen_results_end -
get_handle->seen_results_transmission_offset;
+ max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct
GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
+ if (delta > max)
+ delta = max;
+ msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta *
sizeof (struct GNUNET_HashCode);
+
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+ msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1];
+ pm->msg = &msg->header;
+ pm->handle = get_handle->dht_handle;
+ pm->unique_id = get_handle->unique_id;
+ pm->free_on_send = GNUNET_YES;
+ pm->in_pending_queue = GNUNET_YES;
+ msg->header.type = htons
(GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
+ msg->header.size = htons (msize);
+ msg->key = get_handle->key;
+ msg->unique_id = get_handle->unique_id;
+ memcpy (&msg[1],
+
&get_handle->seen_results[get_handle->seen_results_transmission_offset],
+ sizeof (struct GNUNET_HashCode) * delta);
+ get_handle->seen_results_transmission_offset += delta;
+ GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head,
+ get_handle->dht_handle->pending_tail,
+ pm);
+ }
+}
+
+
+/**
* Add the request corresponding to the given route handle
* to the pending queue (if it is not already in there).
*
@@ -365,16 +436,18 @@
add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void
*value)
{
struct GNUNET_DHT_Handle *handle = cls;
- struct GNUNET_DHT_GetHandle *rh = value;
+ struct GNUNET_DHT_GetHandle *get_handle = value;
- if (GNUNET_NO == rh->message->in_pending_queue)
+ if (GNUNET_NO == get_handle->message->in_pending_queue)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
handle);
+ get_handle->seen_results_transmission_offset = 0;
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
- rh->message);
- rh->message->in_pending_queue = GNUNET_YES;
+ get_handle->message);
+ queue_filter_messages (get_handle);
+ get_handle->message->in_pending_queue = GNUNET_YES;
}
return GNUNET_YES;
}
@@ -578,6 +651,7 @@
struct GNUNET_DHT_GetHandle *get_handle = value;
const struct GNUNET_PeerIdentity *put_path;
const struct GNUNET_PeerIdentity *get_path;
+ struct GNUNET_HashCode hc;
uint32_t put_path_length;
uint32_t get_path_length;
size_t data_length;
@@ -614,6 +688,17 @@
put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
get_path = &put_path[put_path_length];
data = &get_path[get_path_length];
+ /* remember that we've seen this result */
+ GNUNET_CRYPTO_hash (data, data_length, &hc);
+ if (get_handle->seen_results_size == get_handle->seen_results_end)
+ GNUNET_array_grow (get_handle->seen_results,
+ get_handle->seen_results_size,
+ get_handle->seen_results_size * 2 + 1);
+ GNUNET_assert (get_handle->seen_results_end ==
get_handle->seen_results_transmission_offset);
+ get_handle->seen_results[get_handle->seen_results_end++] = hc;
+ /* no need to block it explicitly, service already knows about it! */
+ get_handle->seen_results_transmission_offset++;
+
get_handle->iter (get_handle->iter_cls,
GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
get_path, get_path_length, put_path, put_path_length,
@@ -1194,7 +1279,39 @@
}
+
/**
+ * Tell the DHT not to return any of the following known results
+ * to this client.
+ *
+ * @param get_handle get operation for which results should be filtered
+ * @param num_results number of results to be blocked that are
+ * provided in this call (size of the 'results' array)
+ * @param results array of hash codes over the 'data' of the results
+ * to be blocked
+ */
+void
+GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
+ unsigned int num_results,
+ const struct GNUNET_HashCode *results)
+{
+ unsigned int needed;
+
+ needed = get_handle->seen_results_end + num_results;
+ if (needed > get_handle->seen_results_size)
+ GNUNET_array_grow (get_handle->seen_results,
+ get_handle->seen_results_size,
+ needed);
+ memcpy (&get_handle->seen_results[get_handle->seen_results_end],
+ results,
+ num_results * sizeof (struct GNUNET_HashCode));
+ get_handle->seen_results_end += num_results;
+ queue_filter_messages (get_handle);
+ process_pending_messages (get_handle->dht_handle);
+}
+
+
+/**
* Stop async DHT-get.
*
* @param get_handle handle to the GET operation to stop
@@ -1242,8 +1359,10 @@
get_handle->message->in_pending_queue = GNUNET_NO;
}
GNUNET_free (get_handle->message);
+ GNUNET_array_grow (get_handle->seen_results,
+ get_handle->seen_results_end,
+ 0);
GNUNET_free (get_handle);
-
process_pending_messages (handle);
}
Modified: gnunet/src/dht/gnunet-service-dht_clients.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht_clients.c 2012-11-10 20:21:45 UTC (rev
24890)
+++ gnunet/src/dht/gnunet-service-dht_clients.c 2012-11-10 21:21:01 UTC (rev
24891)
@@ -551,9 +551,7 @@
/**
- * Handler for any generic DHT messages, calls the appropriate handler
- * depending on message type, sends confirmation if responses aren't otherwise
- * expected.
+ * Handler for DHT GET messages from the client.
*
* @param cls closure for the service
* @param client the client we received this message from
@@ -621,6 +619,103 @@
/**
+ * Closure for 'find_by_unique_id'.
+ */
+struct FindByUniqueIdContext
+{
+ /**
+ * Where to store the result, if found.
+ */
+ struct ClientQueryRecord *cqr;
+
+ uint64_t unique_id;
+};
+
+
+/**
+ * Function called for each existing DHT record for the given
+ * query. Checks if it matches the UID given in the closure
+ * and if so returns the entry as a result.
+ *
+ * @param cls the search context
+ * @param key query for the lookup (not used)
+ * @param value the 'struct ClientQueryRecord'
+ * @return GNUNET_YES to continue iteration (result not yet found)
+ */
+static int
+find_by_unique_id (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct FindByUniqueIdContext *fui_ctx = cls;
+ struct ClientQueryRecord *cqr = value;
+
+ if (cqr->unique_id != fui_ctx->unique_id)
+ return GNUNET_YES;
+ fui_ctx->cqr = cqr;
+ return GNUNET_NO;
+}
+
+
+/**
+ * Handler for "GET result seen" messages from the client.
+ *
+ * @param cls closure for the service
+ * @param client the client we received this message from
+ * @param message the actual message received
+ */
+static void
+handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client
*client,
+ const struct GNUNET_MessageHeader *message)
+{
+ const struct GNUNET_DHT_ClientGetResultSeenMessage *seen;
+ uint16_t size;
+ unsigned int hash_count;
+ unsigned int old_count;
+ const struct GNUNET_HashCode *hc;
+ struct FindByUniqueIdContext fui_ctx;
+ struct ClientQueryRecord *cqr;
+
+ size = ntohs (message->size);
+ if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message;
+ hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage))
/ sizeof (struct GNUNET_HashCode);
+ if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) +
hash_count * sizeof (struct GNUNET_HashCode))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ hc = (const struct GNUNET_HashCode*) &seen[1];
+ fui_ctx.unique_id = seen->unique_id;
+ fui_ctx.cqr = NULL;
+ GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
+ &seen->key,
+ &find_by_unique_id,
+ &fui_ctx);
+ if (NULL == (cqr = fui_ctx.cqr))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ /* finally, update 'seen' list */
+ old_count = cqr->seen_replies_count;
+ GNUNET_array_grow (cqr->seen_replies,
+ cqr->seen_replies_count,
+ cqr->seen_replies_count + hash_count);
+ memcpy (&cqr->seen_replies[old_count],
+ hc,
+ sizeof (struct GNUNET_HashCode) * hash_count);
+}
+
+
+/**
* Closure for 'remove_by_unique_id'.
*/
struct RemoveByUniqueIdContext
@@ -1350,6 +1445,8 @@
{&handle_dht_local_monitor_stop, NULL,
GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
+ {&handle_dht_local_get_result_seen, NULL,
+ GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0},
{NULL, NULL, 0, 0}
};
forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
Modified: gnunet/src/include/gnunet_dht_service.h
===================================================================
--- gnunet/src/include/gnunet_dht_service.h 2012-11-10 20:21:45 UTC (rev
24890)
+++ gnunet/src/include/gnunet_dht_service.h 2012-11-10 21:21:01 UTC (rev
24891)
@@ -236,14 +236,30 @@
*/
struct GNUNET_DHT_GetHandle *
GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
- enum GNUNET_BLOCK_Type type, const struct
GNUNET_HashCode * key,
+ enum GNUNET_BLOCK_Type type,
+ const struct GNUNET_HashCode *key,
uint32_t desired_replication_level,
- enum GNUNET_DHT_RouteOption options, const void *xquery,
- size_t xquery_size, GNUNET_DHT_GetIterator iter,
- void *iter_cls);
+ enum GNUNET_DHT_RouteOption options,
+ const void *xquery, size_t xquery_size,
+ GNUNET_DHT_GetIterator iter, void *iter_cls);
/**
+ * Tell the DHT not to return any of the following known results
+ * to this client.
+ *
+ * @param get_handle get operation for which results should be filtered
+ * @param num_results number of results to be blocked that are
+ * provided in this call (size of the 'results' array)
+ * @param results array of hash codes over the 'data' of the results
+ * to be blocked
+ */
+void
+GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
+ unsigned int num_results,
+ const struct GNUNET_HashCode *results);
+
+/**
* Stop async DHT-get. Frees associated resources.
*
* @param get_handle GET operation to stop.
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2012-11-10 20:21:45 UTC (rev
24890)
+++ gnunet/src/include/gnunet_protocols.h 2012-11-10 21:21:01 UTC (rev
24891)
@@ -558,7 +558,12 @@
*/
#define GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK 155
+/**
+ * Certain results are already known to the client, filter those.
+ */
+#define GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN 156
+
/*******************************************************************************
* HOSTLIST message types
******************************************************************************/
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r24891 - in gnunet/src: dht include,
gnunet <=