[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r14408 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r14408 - gnunet/src/fs |
Date: |
Tue, 15 Feb 2011 14:07:14 +0100 |
Author: grothoff
Date: 2011-02-15 14:07:14 +0100 (Tue, 15 Feb 2011)
New Revision: 14408
Modified:
gnunet/src/fs/gnunet-service-fs_cp.c
gnunet/src/fs/gnunet-service-fs_cp.h
gnunet/src/fs/gnunet-service-fs_lc.c
gnunet/src/fs/gnunet-service-fs_pr.c
gnunet/src/fs/gnunet-service-fs_pr.h
Log:
stuff
Modified: gnunet/src/fs/gnunet-service-fs_cp.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.c 2011-02-15 12:19:20 UTC (rev
14407)
+++ gnunet/src/fs/gnunet-service-fs_cp.c 2011-02-15 13:07:14 UTC (rev
14408)
@@ -33,7 +33,6 @@
#define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_MINUTES, 5)
-
/**
* Handle to cancel a transmission request.
*/
@@ -124,15 +123,25 @@
struct GSF_PeerTransmitHandle *pth_tail;
/**
+ * Migration stop message in our queue, or NULL if we have none pending.
+ */
+ struct GSF_PeerTransmitHandle *migration_pth;
+
+ /**
* Context of our GNUNET_CORE_peer_change_preference call (or NULL).
* NULL if we have successfully reserved 32k, otherwise non-NULL.
*/
struct GNUNET_CORE_InformationRequestContext *irc;
/**
+ * Active requests from this neighbour.
+ */
+ struct GNUNET_CONTAINER_MulitHashMap *request_map;
+
+ /**
* ID of delay task for scheduling transmission.
*/
- GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME:
unused!
+ GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME:
used in 'push' (ugh!)
/**
* Increase in traffic preference still to be submitted
@@ -282,12 +291,12 @@
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
- if (pth->is_query)
+ if (GNUNET_YES == pth->is_query)
{
cp->ppd.last_request_times[(cp->last_request_times_off++) %
MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
GNUNET_assert (0 < cp->ppd.pending_queries--);
}
- else
+ else if (GNUNET_NO == pth->is_query)
{
GNUNET_assert (0 < cp->ppd.pending_replies--);
}
@@ -389,6 +398,7 @@
(sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
cp->disk_trust = cp->trust = ntohl (trust);
GNUNET_free (fn);
+ cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
GNUNET_break (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (cp_map,
&peer->hashPubKey,
@@ -442,7 +452,8 @@
* and will also not be called anymore after a call signalling
* expiration.
*
- * @param cls user-specified closure
+ * @param cls 'struct GSF_ConnectedPeer' of the peer that would
+ * have liked an answer to the request
* @param pr handle to the original pending request
* @param data response data, NULL on request expiration
* @param data_len number of bytes in data
@@ -453,13 +464,23 @@
const void *data,
size_t data_len)
{
+ struct GSF_ConnectedPeer *cp = cls;
+
#if SUPPORT_DELAYS
struct GNUNET_TIME_Relative art_delay;
#endif
/* FIXME: adapt code fragments below to new API! */
+ if (NULL == data)
+ {
+ /* FIXME: request expired! clean up! */
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# P2P searches active"),
+ -1,
+ GNUNET_NO);
+ return;
+ }
-
/* reply will go over the network, check for cover traffic */
if ( (prq->anonymity_level > 1) &&
(cover_content_count < prq->anonymity_level - 1) )
@@ -515,9 +536,11 @@
}
-
/**
- * Handle P2P "QUERY" message.
+ * Handle P2P "QUERY" message. Creates the pending request entry
+ * and sets up all of the data structures to that we will
+ * process replies properly. Does not initiate forwarding or
+ * local database lookups.
*
* @param other the other peer involved (sender or receiver, NULL
* for loopback messages where we are both sender and receiver)
@@ -528,11 +551,13 @@
GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
const struct GNUNET_MessageHeader *message)
{
- /* FIXME: adapt old code to new API! */
- struct PendingRequest *pr;
- struct ConnectedPeer *cp;
- struct ConnectedPeer *cps;
- struct CheckDuplicateRequestClosure cdc;
+ struct GSF_PendingRequest *pr;
+ struct GSF_PendingRequestData *prd;
+ struct GSF_ConnectedPeer *cp;
+ struct GSF_ConnectedPeer *cps;
+ GNUNET_HashCode *namespace;
+ struct GNUNET_PeerIdentity *target;
+ enum GSF_PendingRequestOptions options;
struct GNUNET_TIME_Relative timeout;
uint16_t msize;
const struct GetMessage *gm;
@@ -542,9 +567,10 @@
size_t bfsize;
uint32_t ttl_decrement;
int32_t priority;
+ int32_t ttl;
enum GNUNET_BLOCK_Type type;
- int have_ns;
+
msize = ntohs(message->size);
if (msize < sizeof (struct GetMessage))
{
@@ -615,7 +641,6 @@
gettext_noop ("# requests dropped due to
missing reverse route"),
1,
GNUNET_NO);
- /* FIXME: try connect? */
return GNUNET_OK;
}
/* note that we can really only check load here since otherwise
@@ -639,14 +664,9 @@
GNUNET_i2s (other),
(unsigned int) bm);
#endif
- have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
- pr = GNUNET_malloc (sizeof (struct PendingRequest) +
- (have_ns ? sizeof(GNUNET_HashCode) : 0));
- if (have_ns)
- {
- pr->namespace = (GNUNET_HashCode*) &pr[1];
- memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
- }
+ namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] :
NULL;
+ target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct
GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
+ options = 0;
if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
(GNUNET_LOAD_get_average (cp->transmission_delay) >
GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average
(rt_entry_lifetime)) )
@@ -654,28 +674,21 @@
/* don't have BW to send to peer, or would likely take longer than we
have for it,
so at best indirect the query */
priority = 0;
- pr->forward_only = GNUNET_YES;
+ options |= GSF_PRO_FORWARD_ONLY;
}
- pr->type = type;
- pr->mingle = ntohl (gm->filter_mutator);
- if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
- pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*)
&opt[bits++]);
- pr->anonymity_level = 1;
- pr->priority = (uint32_t) priority;
- pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
- pr->query = gm->query;
+ ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
/* decrement ttl (always) */
ttl_decrement = 2 * TTL_DECREMENT +
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
TTL_DECREMENT);
- if ( (pr->ttl < 0) &&
- (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
+ if ( (ttl < 0) &&
+ (((int32_t)(ttl - ttl_decrement)) > 0) )
{
#if DEBUG_FS
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Dropping query from `%s' due to TTL underflow (%d - %u).\n",
GNUNET_i2s (other),
- pr->ttl,
+ ttl,
ttl_decrement);
#endif
GNUNET_STATISTICS_update (stats,
@@ -683,74 +696,66 @@
1,
GNUNET_NO);
/* integer underflow => drop (should be very rare)! */
- GNUNET_free (pr);
return GNUNET_OK;
}
- pr->ttl -= ttl_decrement;
- pr->start_time = GNUNET_TIME_absolute_get ();
+ ttl -= ttl_decrement;
- /* get bloom filter */
- if (bfsize > 0)
- {
- pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
- bfsize,
- BLOOMFILTER_K);
- pr->bf_size = bfsize;
- }
- cdc.have = NULL;
- cdc.pr = pr;
- GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
- &gm->query,
- &check_duplicate_request_peer,
- &cdc);
- if (cdc.have != NULL)
- {
- if (cdc.have->start_time.abs_value + cdc.have->ttl >=
- pr->start_time.abs_value + pr->ttl)
+ /* test if the request already exists */
+ pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
+ &gm->query);
+ if (pr != NULL)
+ {
+ prd = GSF_pending_request_get_data_ (pr);
+ if ( (prd->type == type) &&
+ ( (type != GNUNET_BLOCK_TYPE_SBLOCK) ||
+ (0 == memcmp (prd->namespace,
+ namespace,
+ sizeof (GNUNET_HashCode))) ) )
{
- /* existing request has higher TTL, drop new one! */
- cdc.have->priority += pr->priority;
- destroy_pending_request (pr);
+ if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
+ {
+ /* existing request has higher TTL, drop new one! */
+ prd->priority += priority;
#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Have existing request with higher TTL, dropping new
request.\n",
- GNUNET_i2s (other));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Have existing request with higher TTL, dropping new
request.\n",
+ GNUNET_i2s (other));
#endif
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests dropped due to
higher-TTL request"),
- 1,
- GNUNET_NO);
- return GNUNET_OK;
- }
- else
- {
+ GNUNET_STATISTICS_update (stats,
+ gettext_noop ("# requests dropped due
to higher-TTL request"),
+ 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
/* existing request has lower TTL, drop old one! */
- pr->priority += cdc.have->priority;
- /* Possible optimization: if we have applicable pending
- replies in 'cdc.have', we might want to move those over
- (this is a really rare special-case, so it is not clear
- that this would be worth it) */
- destroy_pending_request (cdc.have);
- /* keep processing 'pr'! */
+ pr->priority += prd->priority;
+ GSF_pending_request_cancel_ (pr);
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
+ &gm->query,
+ pr));
}
}
-
- pr->cp = cp;
+
+ pr = GSF_pending_request_create (options,
+ type,
+ &gm->query,
+ namespace,
+ target,
+ (bf_size > 0) ? (const char*)&opt[bits] :
NULL,
+ bf_size,
+ ntohl (gm->filter_mutator),
+ 1 /* anonymity */
+ (uint32_t) priority,
+ ttl,
+ NULL, 0, /* replies_seen */
+ &handle_p2p_reply,
+ cp);
GNUNET_break (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (query_request_map,
+ GNUNET_CONTAINER_multihashmap_put (cp->request_map,
&gm->query,
pr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- GNUNET_break (GNUNET_OK ==
- GNUNET_CONTAINER_multihashmap_put (peer_request_map,
- &other->hashPubKey,
- pr,
-
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-
- pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
- pr,
- pr->start_time.abs_value + pr->ttl);
-
GNUNET_STATISTICS_update (stats,
gettext_noop ("# P2P searches received"),
1,
@@ -759,83 +764,7 @@
gettext_noop ("# P2P searches active"),
1,
GNUNET_NO);
-
- /* calculate change in traffic preference */
- cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
- /* process locally */
- if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
- type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
- timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
- (pr->priority + 1));
- if (GNUNET_YES != pr->forward_only)
- {
-#if DEBUG_FS
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Handing request for `%s' to datastore\n",
- GNUNET_h2s (&gm->query));
-#endif
- pr->qe = GNUNET_DATASTORE_get (dsh,
- &gm->query,
- type,
- pr->priority + 1,
- MAX_DATASTORE_QUEUE,
- timeout,
- &process_local_reply,
- pr);
- if (NULL == pr->qe)
- {
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests dropped by
datastore (queue length limit)"),
- 1,
- GNUNET_NO);
- }
- }
- else
- {
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests forwarded due to high
load"),
- 1,
- GNUNET_NO);
- }
-
- /* Are multiple results possible (and did we look locally)? If so, start
processing remotely now! */
- switch (pr->type)
- {
- case GNUNET_BLOCK_TYPE_FS_DBLOCK:
- case GNUNET_BLOCK_TYPE_FS_IBLOCK:
- /* only one result, wait for datastore */
- if (GNUNET_YES != pr->forward_only)
- {
- GNUNET_STATISTICS_update (stats,
- gettext_noop ("# requests not instantly
forwarded (waiting for datastore)"),
- 1,
- GNUNET_NO);
- break;
- }
- default:
- if (pr->task == GNUNET_SCHEDULER_NO_TASK)
- pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
- pr);
- }
-
- /* make sure we don't track too many requests */
- if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
max_pending_requests)
- {
- pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
- GNUNET_assert (pr != NULL);
- destroy_pending_request (pr);
- }
- return GNUNET_OK;
-
-
-
- // FIXME!
- // parse request
- // setup pending request (use 'handle_p2p_reply')
- // track pending request to cancel it on peer disconnect (!)
- // return it!
- // (actual planning & execution up to caller!)
- return NULL;
+ return pr;
}
@@ -858,9 +787,9 @@
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
- if (pth->is_query)
+ if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
- else
+ else if (GNUNET_NO == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_replies--);
GNUNET_LOAD_update (cp->ppd.transmission_delay,
UINT64_MAX);
@@ -876,7 +805,7 @@
* the callback is invoked with a 'NULL' buffer.
*
* @param peer target peer
- * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
+ * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or
neither (GNUNET_SYSERR)
* @param priority how important is this request?
* @param timeout when does this request timeout (call gmc with error)
* @param size number of bytes we would like to send to the peer
@@ -933,9 +862,10 @@
pth);
GNUNET_PEER_resolve (cp->pid,
&target);
- if (is_query)
+ if (GNUNET_YES == is_query)
{
/* query, need reservation */
+ cp->ppd.pending_queries++;
if (NULL == cp->irc)
{
/* reservation already done! */
@@ -957,11 +887,17 @@
is_ready = GNUNET_NO;
}
}
- else
+ else if (GNUNET_NO == is_query)
{
/* no reservation needed for content */
+ cp->ppd.pending_replies++;
is_ready = GNUNET_YES;
}
+ else
+ {
+ /* not a query or content, no reservation needed */
+ is_ready = GNUNET_YES;
+ }
if (is_ready)
{
pth->cth = GNUNET_CORE_notify_transmit_ready (core,
@@ -1011,9 +947,9 @@
GNUNET_CONTAINER_DLL_remove (cp->pth_head,
cp->pth_tail,
pth);
- if (pth->is_query)
+ if (GNUNET_YES == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_queries--);
- else
+ else if (GNUNET_NO == pth->is_query)
GNUNET_assert (0 < cp->ppd.pending_replies--);
GNUNET_free (pth);
}
@@ -1085,6 +1021,26 @@
/**
+ * Cancel all requests associated with the peer.
+ *
+ * @param cls unused
+ * @param query hash code of the request
+ * @param value the 'struct GSF_PendingRequest'
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+cancel_pending_request (void *cls,
+ const GNUNET_HashCode *query,
+ void *value)
+{
+ struct GSF_PendingRequest *pr = value;
+
+ GSF_pending_request_cancel_ (pr);
+ return GNUNET_OK;
+}
+
+
+/**
* A peer disconnected from us. Tear down the connected peer
* record.
*
@@ -1104,11 +1060,21 @@
GNUNET_CONTAINER_multihashmap_remove (cp_map,
&peer->hashPubKey,
cp);
+ if (NULL != cp->migration_pth)
+ {
+ GSF_peer_transmit_cancel_ (cp->migration_pth);
+ cp->migration_pth = NULL;
+ }
if (NULL != cp->irc)
{
GNUNET_CORE_peer_change_preference_cancel (cp->irc);
cp->irc = NULL;
}
+ GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
+ &cancel_pending_request,
+ cp);
+ GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
+ cp->request_map = NULL;
GSF_plan_notify_peer_disconnect_ (cp);
GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
@@ -1206,6 +1172,34 @@
/**
+ * Assemble a migration stop message for transmission.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' to use
+ * @param size number of bytes we're allowed to write to buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
+static size_t
+create_migration_stop_message (void *cls,
+ size_t size,
+ void *buf)
+{
+ struct GSF_ConnectedPeer *cp = cls;
+ struct MigrationStopMessage msm;
+
+ cp->migration_pth = NULL;
+ if (NULL == buf)
+ return 0;
+ GNUNET_assert (size > sizeof (struct MigrationStopMessage));
+ msm.header.size = htons (sizeof (struct MigrationStopMessage));
+ msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+ msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
(cp->last_migration_block));
+ memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
+ return sizeof (struct MigrationStopMessage);
+}
+
+
+/**
* Ask a peer to stop migrating data to us until the given point
* in time.
*
@@ -1216,30 +1210,22 @@
GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
struct GNUNET_TIME_Relative block_time)
{
- struct PendingMessage *pm;
- struct MigrationStopMessage *msm;
-
if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value >
block_time.rel_value)
return; /* already blocked */
cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
-
- /* FIXME: adapt old code below to new API! */
- pm = GNUNET_malloc (sizeof (struct PendingMessage) +
- sizeof (struct MigrationStopMessage));
- pm->msize = sizeof (struct MigrationStopMessage);
- pm->priority = UINT32_MAX;
- msm = (struct MigrationStopMessage*) &pm[1];
- msm->header.size = htons (sizeof (struct MigrationStopMessage));
- msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
- msm->duration = GNUNET_TIME_relative_hton (block_time);
- add_to_pending_messages_for_peer (cp,
- pm,
- NULL);
+ if (cp->migration_pth != NULL)
+ GSF_peer_transmit_cancel_ (cp->migration_pth);
+ cp->migration_pth
+ = GSF_peer_transmit_ (cp,
+ GNUNET_SYSERR,
+ UINT32_MAX,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ sizeof (struct MigrationStopMessage),
+ &create_migration_stop_message,
+ cp);
}
-
-
/**
* Write host-trust information to a file - flush the buffer entry!
*
Modified: gnunet/src/fs/gnunet-service-fs_cp.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.h 2011-02-15 12:19:20 UTC (rev
14407)
+++ gnunet/src/fs/gnunet-service-fs_cp.h 2011-02-15 13:07:14 UTC (rev
14408)
@@ -257,7 +257,7 @@
* Handle P2P "QUERY" message. Only responsible for creating the
* request entry itself and setting up reply callback and cancellation
* on peer disconnect. Does NOT execute the actual request strategy
- * (planning).
+ * (planning) or local database operations.
*
* @param other the other peer involved (sender or receiver, NULL
* for loopback messages where we are both sender and receiver)
Modified: gnunet/src/fs/gnunet-service-fs_lc.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_lc.c 2011-02-15 12:19:20 UTC (rev
14407)
+++ gnunet/src/fs/gnunet-service-fs_lc.c 2011-02-15 13:07:14 UTC (rev
14408)
@@ -341,7 +341,7 @@
sizeof (GNUNET_HashCode)))
? &sm->target,
: NULL,
- NULL /* bf */, 0 /* mingle */,
+ NULL, 0, 0 /* bf */,
ntohl (sm->anonymity_level),
0 /* priority */,
&sm[1], sc,
Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c 2011-02-15 12:19:20 UTC (rev
14407)
+++ gnunet/src/fs/gnunet-service-fs_pr.c 2011-02-15 13:07:14 UTC (rev
14408)
@@ -182,10 +182,12 @@
* @param query key for the lookup
* @param namespace namespace to lookup, NULL for no namespace
* @param target preferred target for the request, NULL for none
- * @param bf bloom filter for known replies, can be NULL
+ * @param bf_data raw data for bloom filter for known replies, can be NULL
+ * @param bf_size number of bytes in bf_data
* @param mingle mingle value for bf
* @param anonymity_level desired anonymity level
* @param priority maximum outgoing cummulative request priority to use
+ * @param ttl current time-to-live for the request
* @param replies_seen hash codes of known local replies
* @param replies_seen_count size of the 'replies_seen' array
* @param rh handle to call when we get a reply
@@ -198,10 +200,12 @@
const GNUNET_HashCode *query,
const GNUNET_HashCode *namespace,
const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_CONTAINER_BloomFilter *bf,
+ const char *bf_data,
+ size_t bf_size,
int32_t mingle,
uint32_t anonymity_level,
uint32_t priority,
+ int32_t ttl,
const GNUNET_HashCode *replies_seen,
unsigned int replies_seen_count,
GSF_PendingRequestReplyHandler rh,
@@ -226,8 +230,16 @@
pr->public_data.priority = priority;
pr->public_data.options = options;
pr->public_data.type = type;
+ pr->public_data.start_time = GNUNET_TIME_absolute_get ();
pr->rh = rh;
pr->rh_cls = rh_cls;
+ if (ttl >= 0)
+ pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS,
+
(uint32_t) ttl));
+ else
+ pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
+ GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS,
+
(uint32_t) (- ttl)));
if (replies_seen_count > 0)
{
pr->replies_seen_size = replies_seen_count;
@@ -237,9 +249,11 @@
replies_seen_count * sizeof (struct GNUNET_HashCode));
pr->replies_seen_count = replies_seen_count;
}
- if (NULL != bf)
+ if (NULL != bf_data)
{
- pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf);
+ pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
+ bf_size,
+ BLOOMFILTER_K);
pr->mingle = mingle;
}
else if ( (replies_seen_count > 0) &&
@@ -254,11 +268,40 @@
// FIXME: if not a local query, we also need to track the
// total number of external queries we currently have and
// bound it => need an additional heap!
+
+ pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
+ pr,
+ pr->start_time.abs_value + pr->ttl);
+
+
+
+ /* make sure we don't track too many requests */
+ if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
max_pending_requests)
+ {
+ pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+ GNUNET_assert (pr != NULL);
+ destroy_pending_request (pr);
+ }
+
+
return pr;
}
/**
+ * Obtain the public data associated with a pending request
+ *
+ * @param pr pending request
+ * @return associated public data
+ */
+struct GSF_PendingRequestData *
+GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
+{
+ return &pr->public_data;
+}
+
+
+/**
* Update a given pending request with additional replies
* that have been seen.
*
Modified: gnunet/src/fs/gnunet-service-fs_pr.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.h 2011-02-15 12:19:20 UTC (rev
14407)
+++ gnunet/src/fs/gnunet-service-fs_pr.h 2011-02-15 13:07:14 UTC (rev
14408)
@@ -92,6 +92,16 @@
struct GNUNET_PeerIdentity target;
/**
+ * Current TTL for the request.
+ */
+ struct GNUNET_TIME_Absolute ttl;
+
+ /**
+ * When did we start with the request.
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
* Desired anonymity level.
*/
uint32_t anonymity_level;
@@ -146,10 +156,12 @@
* @param query key for the lookup
* @param namespace namespace to lookup, NULL for no namespace
* @param target preferred target for the request, NULL for none
- * @param bf bloom filter for known replies, can be NULL
+ * @param bf_data raw data for bloom filter for known replies, can be NULL
+ * @param bf_size number of bytes in bf_data
* @param mingle mingle value for bf
* @param anonymity_level desired anonymity level
* @param priority maximum outgoing cummulative request priority to use
+ * @param ttl current time-to-live for the request
* @param replies_seen hash codes of known local replies
* @param replies_seen_count size of the 'replies_seen' array
* @param rh handle to call when we get a reply
@@ -162,10 +174,12 @@
const GNUNET_HashCode *query,
const GNUNET_HashCode *namespace,
const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_CONTAINER_BloomFilter *bf,
+ const char *bf_data,
+ size_t bf_size,
int32_t mingle,
uint32_t anonymity_level,
uint32_t priority,
+ int32_t ttl,
const GNUNET_HashCode *replies_seen,
unsigned int replies_seen_count,
GSF_PendingRequestReplyHandler rh,
@@ -187,6 +201,16 @@
/**
+ * Obtain the public data associated with a pending request
+ *
+ * @param pr pending request
+ * @return associated public data
+ */
+struct GSF_PendingRequestData *
+GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr);
+
+
+/**
* Generate the message corresponding to the given pending request for
* transmission to other peers (or at least determine its size).
*
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r14408 - gnunet/src/fs,
gnunet <=