[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r6401 - in GNUnet/src: applications/fs/ecrs applications/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r6401 - in GNUnet/src: applications/fs/ecrs applications/fs/gap include |
Date: |
Wed, 20 Feb 2008 23:51:29 -0700 (MST) |
Author: grothoff
Date: 2008-02-20 23:51:29 -0700 (Wed, 20 Feb 2008)
New Revision: 6401
Modified:
GNUnet/src/applications/fs/ecrs/search.c
GNUnet/src/applications/fs/gap/fs.c
GNUnet/src/applications/fs/gap/gap.c
GNUnet/src/applications/fs/gap/querymanager.c
GNUnet/src/applications/fs/gap/querymanager.h
GNUnet/src/applications/fs/gap/shared.h
GNUnet/src/include/fs.h
Log:
better async processing
Modified: GNUnet/src/applications/fs/ecrs/search.c
===================================================================
--- GNUnet/src/applications/fs/ecrs/search.c 2008-02-21 06:50:11 UTC (rev
6400)
+++ GNUnet/src/applications/fs/ecrs/search.c 2008-02-21 06:51:29 UTC (rev
6401)
@@ -330,7 +330,10 @@
GNUNET_EC_file_block_check_and_get_query (size,
(const DBlock *) &value[1],
GNUNET_YES, &query))
- return GNUNET_SYSERR;
+ {
+ GNUNET_GE_BREAK(NULL, 0);
+ return GNUNET_SYSERR;
+ }
if (!((0 == memcmp (&query,
(GNUNET_HashCode *) & ps[1], sizeof (GNUNET_HashCode)))
&& ((ps->type == type) || (ps->type == GNUNET_ECRS_BLOCKTYPE_ANY))
@@ -341,7 +344,9 @@
ps->keyCount,
(GNUNET_HashCode *) &
ps[1]))))
- return GNUNET_OK; /* not a match */
+ {
+ return GNUNET_OK; /* not a match */
+ }
switch (type)
{
@@ -355,7 +360,10 @@
int j;
if (size < sizeof (KBlock))
- return GNUNET_SYSERR;
+ {
+ GNUNET_GE_BREAK(NULL, 0);
+ return GNUNET_SYSERR;
+ }
kb = GNUNET_malloc (size);
memcpy (kb, &value[1], size);
#if DEBUG_SEARCH
Modified: GNUnet/src/applications/fs/gap/fs.c
===================================================================
--- GNUnet/src/applications/fs/gap/fs.c 2008-02-21 06:50:11 UTC (rev 6400)
+++ GNUnet/src/applications/fs/gap/fs.c 2008-02-21 06:51:29 UTC (rev 6401)
@@ -392,6 +392,8 @@
{
struct GNUNET_ClientHandle *sock;
struct ResponseList *seen;
+ unsigned int processed;
+ int have_more;
};
/**
@@ -413,7 +415,14 @@
GNUNET_DatastoreValue *enc;
const GNUNET_DatastoreValue *use;
unsigned int type;
+ int ret;
+ if (cls->processed > MAX_SYNC_PROCESSED)
+ {
+ cls->have_more = GNUNET_YES;
+ return GNUNET_SYSERR;
+ }
+ cls->processed++;
size = ntohl (value->size) - sizeof (GNUNET_DatastoreValue);
dblock = (const DBlock *) &value[1];
enc = NULL;
@@ -435,11 +444,12 @@
memcpy (&msg[1], dblock, size);
type = ntohl (dblock->type);
GNUNET_free_non_null (enc);
- coreAPI->cs_send_to_client (sock, &msg->header,
- type != GNUNET_ECRS_BLOCKTYPE_DATA ? GNUNET_NO :
GNUNET_YES);
+ ret = coreAPI->cs_send_to_client (sock, &msg->header,
+ type != GNUNET_ECRS_BLOCKTYPE_DATA ?
GNUNET_NO : GNUNET_YES);
GNUNET_free (msg);
- if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
- return GNUNET_SYSERR; /* unique response */
+ if ( (type == GNUNET_ECRS_BLOCKTYPE_DATA) ||
+ (ret != GNUNET_OK) )
+ return GNUNET_SYSERR; /* unique response or client can take no more*/
rl = GNUNET_malloc (sizeof (struct ResponseList));
GNUNET_hash (dblock, size, &rl->hash);
rl->next = cls->seen;
@@ -485,6 +495,8 @@
#endif
fpp.sock = sock;
fpp.seen = NULL;
+ fpp.have_more = GNUNET_NO;
+ fpp.processed = 0;
if (type == GNUNET_ECRS_BLOCKTYPE_DATA)
{
if ((1 == datastore->get (&rs->query[0],
@@ -505,7 +517,8 @@
GNUNET_FS_QUERYMANAGER_start_query (&rs->query[0], keyCount, anonymityLevel,
type, sock,
have_target ? &rs->target : NULL,
- fpp.seen);
+ fpp.seen,
+ fpp.have_more);
CLEANUP:
while (fpp.seen != NULL)
{
Modified: GNUnet/src/applications/fs/gap/gap.c
===================================================================
--- GNUnet/src/applications/fs/gap/gap.c 2008-02-21 06:50:11 UTC (rev
6400)
+++ GNUnet/src/applications/fs/gap/gap.c 2008-02-21 06:51:29 UTC (rev
6401)
@@ -43,12 +43,6 @@
#define MAX_ENTRIES_PER_SLOT 2
/**
- * If, after finding local results, we abort a GET
- * iteration, we increment "have_more" by this value.
- */
-#define HAVE_MORE_INCREMENT 5
-
-/**
* How often do we check have_more?
*/
#define HAVE_MORE_FREQUENCY (100 * GNUNET_CRON_MILLISECONDS)
Modified: GNUnet/src/applications/fs/gap/querymanager.c
===================================================================
--- GNUnet/src/applications/fs/gap/querymanager.c 2008-02-21 06:50:11 UTC
(rev 6400)
+++ GNUnet/src/applications/fs/gap/querymanager.c 2008-02-21 06:51:29 UTC
(rev 6401)
@@ -64,17 +64,18 @@
};
-static GNUNET_CoreAPIForPlugins *coreAPI;
-
/**
* List of all clients, their active requests and other
* per-client information.
*/
static struct ClientDataList *clients;
+static GNUNET_CoreAPIForPlugins * coreAPI;
static GNUNET_Stats_ServiceAPI *stats;
+static GNUNET_Datastore_ServiceAPI *datastore;
+
static int stat_gap_client_query_received;
static int stat_gap_client_response_sent;
@@ -131,7 +132,8 @@
unsigned int type,
struct GNUNET_ClientHandle *client,
const GNUNET_PeerIdentity * target,
- const struct ResponseList *seen)
+ const struct ResponseList *seen,
+ int have_more)
{
struct ClientDataList *cl;
struct RequestList *request;
@@ -153,6 +155,8 @@
request->primary_target = GNUNET_FS_PT_intern (target);
request->response_client = client;
request->policy = GNUNET_FS_RoutingPolicy_ALL;
+ if (have_more != GNUNET_NO)
+ request->have_more = HAVE_MORE_INCREMENT;
memcpy (&request->queries[0], query, sizeof (GNUNET_HashCode) * key_count);
if (seen != NULL)
{
@@ -275,13 +279,13 @@
msg->expirationTime = GNUNET_htonll (expirationTime);
memcpy (&msg[1], data, size);
coreAPI->cs_send_to_client (client,
- &msg->header,
- (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA)
- ? GNUNET_NO : GNUNET_YES);
+ &msg->header,
+ (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA)
+ ? GNUNET_NO : GNUNET_YES);
if (stats != NULL)
stats->change (stat_gap_client_response_sent, 1);
GNUNET_free (msg);
-
+
/* update *value */
*value += 1 + rl->value;
GNUNET_FS_PLAN_success (sender, client, 0, rl);
@@ -431,13 +435,71 @@
GNUNET_mutex_unlock (GNUNET_FS_lock);
}
+
+struct HMClosure
+{
+ struct RequestList * request;
+ unsigned int processed;
+ int have_more;
+};
+
/**
+ * Any response that we get should be passed
+ * back to the client. If the response is unique,
+ * we should about the iteration (return GNUNET_SYSERR).
+ */
+static int
+have_more_processor (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue *
+ value, void *closure, unsigned long long uid)
+{
+ struct HMClosure *cls = closure;
+ const DBlock *dblock;
+ GNUNET_HashCode hc;
+ CS_fs_reply_content_MESSAGE *msg;
+ unsigned int size;
+ int ret;
+
+ size = ntohl (value->size) - sizeof (GNUNET_DatastoreValue);
+ dblock = (const DBlock *) &value[1];
+ if (GNUNET_OK == GNUNET_FS_SHARED_test_valid_new_response (cls->request,
+ key,
+ size,
+ dblock, &hc))
+ {
+ msg = GNUNET_malloc (sizeof (CS_fs_reply_content_MESSAGE) + size);
+ msg->header.type = htons (GNUNET_CS_PROTO_GAP_RESULT);
+ msg->header.size = htons (sizeof (CS_fs_reply_content_MESSAGE) + size);
+ msg->anonymityLevel = value->anonymityLevel;
+ msg->expirationTime = value->expirationTime;
+ memcpy (&msg[1], dblock, size);
+ ret = coreAPI->cs_send_to_client (cls->request->response_client,
+ &msg->header,
+ GNUNET_YES);
+ GNUNET_free (msg);
+ if (ret != GNUNET_OK)
+ return GNUNET_SYSERR; /* client can take no more */
+ GNUNET_FS_SHARED_mark_response_seen(cls->request,
+ &hc);
+ }
+ cls->processed++;
+ if (cls->processed > MAX_ASYNC_PROCESSED)
+ {
+ cls->have_more = GNUNET_YES;
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
* Cron-job to periodically check if we should
* repeat requests.
*/
static void
repeat_requests_job (void *unused)
{
+ struct HMClosure hmc;
struct ClientDataList *client;
struct RequestList *request;
GNUNET_CronTime now;
@@ -450,27 +512,40 @@
request = client->requests;
while (request != NULL)
{
- if ((NULL == request->plan_entries) &&
- ((client->client != NULL) ||
- (request->expiration > now)) &&
- (request->last_ttl_used * GNUNET_CRON_SECONDS +
- request->last_request_time < now))
- {
- if ((GNUNET_OK ==
- GNUNET_FS_PLAN_request (client->client, 0, request))
- && (stats != NULL))
- stats->change (stat_gap_client_query_injected, 1);
- }
-
- if ((request->anonymityLevel == 0) &&
- (request->last_dht_get + request->dht_back_off < now))
- {
- if (request->dht_back_off * 2 > request->dht_back_off)
- request->dht_back_off *= 2;
- request->last_dht_get = now;
- GNUNET_FS_DHT_execute_query (request->type,
- &request->queries[0]);
- }
+ if (request->have_more > 0)
+ {
+ request->have_more--;
+ hmc.request = request;
+ hmc.processed = 0;
+ hmc.have_more = GNUNET_NO;
+ datastore->get (&request->queries[0], request->type,
&have_more_processor, &hmc);
+ if (hmc.have_more)
+ request->have_more += HAVE_MORE_INCREMENT;
+ }
+ else
+ {
+ if ((NULL == request->plan_entries) &&
+ ((client->client != NULL) ||
+ (request->expiration > now)) &&
+ (request->last_ttl_used * GNUNET_CRON_SECONDS +
+ request->last_request_time < now))
+ {
+ if ((GNUNET_OK ==
+ GNUNET_FS_PLAN_request (client->client, 0, request))
+ && (stats != NULL))
+ stats->change (stat_gap_client_query_injected, 1);
+ }
+
+ if ((request->anonymityLevel == 0) &&
+ (request->last_dht_get + request->dht_back_off < now))
+ {
+ if (request->dht_back_off * 2 > request->dht_back_off)
+ request->dht_back_off *= 2;
+ request->last_dht_get = now;
+ GNUNET_FS_DHT_execute_query (request->type,
+ &request->queries[0]);
+ }
+ }
request = request->next;
}
client = client->next;
@@ -485,10 +560,7 @@
GNUNET_GE_ASSERT (capi->ectx,
GNUNET_SYSERR !=
capi->cs_exit_handler_register (&handle_client_exit));
- GNUNET_cron_add_job (capi->cron,
- &repeat_requests_job,
- CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL);
-
+ datastore = capi->request_service ("datastore");
stats = capi->request_service ("stats");
if (stats != NULL)
{
@@ -504,6 +576,9 @@
stats->
create (gettext_noop ("# gap query bloomfilter resizing updates"));
}
+ GNUNET_cron_add_job (capi->cron,
+ &repeat_requests_job,
+ CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL);
return 0;
}
@@ -518,6 +593,8 @@
cs_exit_handler_unregister (&handle_client_exit));
while (clients != NULL)
handle_client_exit (clients->client);
+ coreAPI->release_service (datastore);
+ datastore = NULL;
if (stats != NULL)
{
coreAPI->release_service (stats);
Modified: GNUnet/src/applications/fs/gap/querymanager.h
===================================================================
--- GNUnet/src/applications/fs/gap/querymanager.h 2008-02-21 06:50:11 UTC
(rev 6400)
+++ GNUnet/src/applications/fs/gap/querymanager.h 2008-02-21 06:51:29 UTC
(rev 6401)
@@ -42,6 +42,7 @@
* client disconnects.
*
* @param target peer known to have the content, maybe NULL.
+ * @param have_more do we have more results in our local datastore?
*/
void
GNUNET_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query,
@@ -50,7 +51,8 @@
unsigned int type,
struct GNUNET_ClientHandle *client,
const GNUNET_PeerIdentity * target,
- const struct ResponseList *seen);
+ const struct ResponseList *seen,
+ int have_more);
/**
* Handle the given response (by forwarding it to
Modified: GNUnet/src/applications/fs/gap/shared.h
===================================================================
--- GNUnet/src/applications/fs/gap/shared.h 2008-02-21 06:50:11 UTC (rev
6400)
+++ GNUnet/src/applications/fs/gap/shared.h 2008-02-21 06:51:29 UTC (rev
6401)
@@ -286,6 +286,7 @@
*/
extern struct GNUNET_Mutex *GNUNET_FS_lock;
+
/**
* Free the request list, including the associated
* list of pending requests, its entries in the
Modified: GNUnet/src/include/fs.h
===================================================================
--- GNUnet/src/include/fs.h 2008-02-21 06:50:11 UTC (rev 6400)
+++ GNUnet/src/include/fs.h 2008-02-21 06:51:29 UTC (rev 6401)
@@ -440,8 +440,23 @@
*/
#define GNUNET_GAP_ESTIMATED_DATA_SIZE (32 * 1024)
+/**
+ * If, after finding local results, we abort a GET
+ * iteration, we increment "have_more" by this value.
+ */
+#define HAVE_MORE_INCREMENT 2
+/**
+ * What is the maximum number of local results
+ * that we are willing to return synchronously?
+ */
+#define MAX_SYNC_PROCESSED 8
+/**
+ * What is the maximum number of local results
+ * that we are willing to return synchronously?
+ */
+#define MAX_ASYNC_PROCESSED 32
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r6401 - in GNUnet/src: applications/fs/ecrs applications/fs/gap include,
gnunet <=