gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r14408 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r14408 - gnunet/src/fs
Date: Tue, 15 Feb 2011 14:07:14 +0100

Author: grothoff
Date: 2011-02-15 14:07:14 +0100 (Tue, 15 Feb 2011)
New Revision: 14408

Modified:
   gnunet/src/fs/gnunet-service-fs_cp.c
   gnunet/src/fs/gnunet-service-fs_cp.h
   gnunet/src/fs/gnunet-service-fs_lc.c
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_pr.h
Log:
stuff

Modified: gnunet/src/fs/gnunet-service-fs_cp.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.c        2011-02-15 12:19:20 UTC (rev 
14407)
+++ gnunet/src/fs/gnunet-service-fs_cp.c        2011-02-15 13:07:14 UTC (rev 
14408)
@@ -33,7 +33,6 @@
 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MINUTES, 5)
 
 
-
 /**
  * Handle to cancel a transmission request.
  */
@@ -124,15 +123,25 @@
   struct GSF_PeerTransmitHandle *pth_tail;
 
   /**
+   * Migration stop message in our queue, or NULL if we have none pending.
+   */
+  struct GSF_PeerTransmitHandle *migration_pth;
+
+  /**
    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
    * NULL if we have successfully reserved 32k, otherwise non-NULL.
    */
   struct GNUNET_CORE_InformationRequestContext *irc;
 
   /**
+   * Active requests from this neighbour.
+   */
+  struct GNUNET_CONTAINER_MulitHashMap *request_map;
+
+  /**
    * ID of delay task for scheduling transmission.
    */
-  GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: 
unused!
+  GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task; // FIXME: 
used in 'push' (ugh!)
 
   /**
    * Increase in traffic preference still to be submitted
@@ -282,12 +291,12 @@
   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
                               cp->pth_tail,
                               pth);
-  if (pth->is_query)
+  if (GNUNET_YES == pth->is_query)
     {
       cp->ppd.last_request_times[(cp->last_request_times_off++) % 
MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
       GNUNET_assert (0 < cp->ppd.pending_queries--);    
     }
-  else
+  else if (GNUNET_NO == pth->is_query)
     {
       GNUNET_assert (0 < cp->ppd.pending_replies--);
     }
@@ -389,6 +398,7 @@
       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
     cp->disk_trust = cp->trust = ntohl (trust);
   GNUNET_free (fn);
+  cp->request_map = GNUNET_CONTAINER_multihashmap_create (128);
   GNUNET_break (GNUNET_OK ==
                GNUNET_CONTAINER_multihashmap_put (cp_map,
                                                   &peer->hashPubKey,
@@ -442,7 +452,8 @@
  * and will also not be called anymore after a call signalling
  * expiration.
  *
- * @param cls user-specified closure
+ * @param cls 'struct GSF_ConnectedPeer' of the peer that would
+ *            have liked an answer to the request
  * @param pr handle to the original pending request
  * @param data response data, NULL on request expiration
  * @param data_len number of bytes in data
@@ -453,13 +464,23 @@
                  const void *data,
                  size_t data_len)
 {
+  struct GSF_ConnectedPeer *cp = cls;
+
 #if SUPPORT_DELAYS  
   struct GNUNET_TIME_Relative art_delay;
 #endif
 
   /* FIXME: adapt code fragments below to new API! */
+  if (NULL == data)
+    {
+      /* FIXME: request expired! clean up! */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# P2P searches active"),
+                               -1,
+                               GNUNET_NO);
+      return;
+    }
 
-
   /* reply will go over the network, check for cover traffic */
   if ( (prq->anonymity_level >  1) &&
        (cover_content_count < prq->anonymity_level - 1) )
@@ -515,9 +536,11 @@
 }
 
 
-
 /**
- * Handle P2P "QUERY" message.
+ * Handle P2P "QUERY" message.  Creates the pending request entry
+ * and sets up all of the data structures to that we will
+ * process replies properly.  Does not initiate forwarding or
+ * local database lookups.
  *
  * @param other the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)
@@ -528,11 +551,13 @@
 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
                       const struct GNUNET_MessageHeader *message)
 {
-  /* FIXME: adapt old code to new API! */
-  struct PendingRequest *pr;
-  struct ConnectedPeer *cp;
-  struct ConnectedPeer *cps;
-  struct CheckDuplicateRequestClosure cdc;
+  struct GSF_PendingRequest *pr;
+  struct GSF_PendingRequestData *prd;
+  struct GSF_ConnectedPeer *cp;
+  struct GSF_ConnectedPeer *cps;
+  GNUNET_HashCode *namespace;
+  struct GNUNET_PeerIdentity *target;
+  enum GSF_PendingRequestOptions options;                           
   struct GNUNET_TIME_Relative timeout;
   uint16_t msize;
   const struct GetMessage *gm;
@@ -542,9 +567,10 @@
   size_t bfsize;
   uint32_t ttl_decrement;
   int32_t priority;
+  int32_t ttl;
   enum GNUNET_BLOCK_Type type;
-  int have_ns;
 
+
   msize = ntohs(message->size);
   if (msize < sizeof (struct GetMessage))
     {
@@ -615,7 +641,6 @@
                                gettext_noop ("# requests dropped due to 
missing reverse route"),
                                1,
                                GNUNET_NO);
-     /* FIXME: try connect? */
       return GNUNET_OK;
     }
   /* note that we can really only check load here since otherwise
@@ -639,14 +664,9 @@
              GNUNET_i2s (other),
              (unsigned int) bm);
 #endif
-  have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
-  pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
-                     (have_ns ? sizeof(GNUNET_HashCode) : 0));
-  if (have_ns)
-    {
-      pr->namespace = (GNUNET_HashCode*) &pr[1];
-      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
-    }
+  namespace = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) ? &opt[bits++] : 
NULL;
+  target = (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? ((const struct 
GNUNET_PeerIdentity*) &opt[bits++]) : NULL;
+  options = 0;
   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
        GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average 
(rt_entry_lifetime)) )
@@ -654,28 +674,21 @@
       /* don't have BW to send to peer, or would likely take longer than we 
have for it,
         so at best indirect the query */
       priority = 0;
-      pr->forward_only = GNUNET_YES;
+      options |= GSF_PRO_FORWARD_ONLY;
     }
-  pr->type = type;
-  pr->mingle = ntohl (gm->filter_mutator);
-  if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
-    pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) 
&opt[bits++]);
-  pr->anonymity_level = 1;
-  pr->priority = (uint32_t) priority;
-  pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
-  pr->query = gm->query;
+  ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
   /* decrement ttl (always) */
   ttl_decrement = 2 * TTL_DECREMENT +
     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                              TTL_DECREMENT);
-  if ( (pr->ttl < 0) &&
-       (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
+  if ( (ttl < 0) &&
+       (((int32_t)(ttl - ttl_decrement)) > 0) )
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
                  GNUNET_i2s (other),
-                 pr->ttl,
+                 ttl,
                  ttl_decrement);
 #endif
       GNUNET_STATISTICS_update (stats,
@@ -683,74 +696,66 @@
                                1,
                                GNUNET_NO);
       /* integer underflow => drop (should be very rare)! */      
-      GNUNET_free (pr);
       return GNUNET_OK;
     } 
-  pr->ttl -= ttl_decrement;
-  pr->start_time = GNUNET_TIME_absolute_get ();
+  ttl -= ttl_decrement;
 
-  /* get bloom filter */
-  if (bfsize > 0)
-    {
-      pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
-                                                 bfsize,
-                                                 BLOOMFILTER_K);
-      pr->bf_size = bfsize;
-    }
-  cdc.have = NULL;
-  cdc.pr = pr;
-  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
-                                             &gm->query,
-                                             &check_duplicate_request_peer,
-                                             &cdc);
-  if (cdc.have != NULL)
-    {
-      if (cdc.have->start_time.abs_value + cdc.have->ttl >=
-         pr->start_time.abs_value + pr->ttl)
+  /* test if the request already exists */
+  pr = GNUNET_CONTAINER_multihashmap_get (cp->request_map,
+                                         &gm->query);
+  if (pr != NULL) 
+    {      
+      prd = GSF_pending_request_get_data_ (pr);
+      if ( (prd->type == type) &&
+          ( (type != GNUNET_BLOCK_TYPE_SBLOCK) ||
+            (0 == memcmp (prd->namespace,
+                          namespace,
+                          sizeof (GNUNET_HashCode))) ) )
        {
-         /* existing request has higher TTL, drop new one! */
-         cdc.have->priority += pr->priority;
-         destroy_pending_request (pr);
+         if (prd->ttl.abs_value >= GNUNET_TIME_absolute_get().abs_value + ttl)
+           {
+             /* existing request has higher TTL, drop new one! */
+             prd->priority += priority;
 #if DEBUG_FS
-         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                     "Have existing request with higher TTL, dropping new 
request.\n",
-                     GNUNET_i2s (other));
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Have existing request with higher TTL, dropping new 
request.\n",
+                         GNUNET_i2s (other));
 #endif
-         GNUNET_STATISTICS_update (stats,
-                                   gettext_noop ("# requests dropped due to 
higher-TTL request"),
-                                   1,
-                                   GNUNET_NO);
-         return GNUNET_OK;
-       }
-      else
-       {
+             GNUNET_STATISTICS_update (stats,
+                                       gettext_noop ("# requests dropped due 
to higher-TTL request"),
+                                       1,
+                                       GNUNET_NO);
+             return GNUNET_OK;
+           }
          /* existing request has lower TTL, drop old one! */
-         pr->priority += cdc.have->priority;
-         /* Possible optimization: if we have applicable pending
-            replies in 'cdc.have', we might want to move those over
-            (this is a really rare special-case, so it is not clear
-            that this would be worth it) */
-         destroy_pending_request (cdc.have);
-         /* keep processing 'pr'! */
+         pr->priority += prd->priority;
+         GSF_pending_request_cancel_ (pr);
+         GNUNET_assert (GNUNET_YES ==
+                        GNUNET_CONTAINER_multihashmap_remove (cp->request_map,
+                                                              &gm->query,
+                                                              pr));
        }
     }
-
-  pr->cp = cp;
+  
+  pr = GSF_pending_request_create (options,
+                                  type,
+                                  &gm->query,
+                                  namespace,
+                                  target,
+                                  (bf_size > 0) ? (const char*)&opt[bits] : 
NULL,
+                                  bf_size,
+                                  ntohl (gm->filter_mutator),
+                                  1 /* anonymity */
+                                  (uint32_t) priority,
+                                  ttl,
+                                  NULL, 0, /* replies_seen */
+                                  &handle_p2p_reply,
+                                  cp);
   GNUNET_break (GNUNET_OK ==
-               GNUNET_CONTAINER_multihashmap_put (query_request_map,
+               GNUNET_CONTAINER_multihashmap_put (cp->request_map,
                                                   &gm->query,
                                                   pr,
                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  GNUNET_break (GNUNET_OK ==
-               GNUNET_CONTAINER_multihashmap_put (peer_request_map,
-                                                  &other->hashPubKey,
-                                                  pr,
-                                                  
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
-  
-  pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
-                                           pr,
-                                           pr->start_time.abs_value + pr->ttl);
-
   GNUNET_STATISTICS_update (stats,
                            gettext_noop ("# P2P searches received"),
                            1,
@@ -759,83 +764,7 @@
                            gettext_noop ("# P2P searches active"),
                            1,
                            GNUNET_NO);
-
-  /* calculate change in traffic preference */
-  cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
-  /* process locally */
-  if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
-    type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
-  timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
-                                          (pr->priority + 1)); 
-  if (GNUNET_YES != pr->forward_only)
-    {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Handing request for `%s' to datastore\n",
-                 GNUNET_h2s (&gm->query));
-#endif
-      pr->qe = GNUNET_DATASTORE_get (dsh,
-                                    &gm->query,
-                                    type,                             
-                                    pr->priority + 1,
-                                    MAX_DATASTORE_QUEUE,                       
         
-                                    timeout,
-                                    &process_local_reply,
-                                    pr);
-      if (NULL == pr->qe)
-       {
-         GNUNET_STATISTICS_update (stats,
-                                   gettext_noop ("# requests dropped by 
datastore (queue length limit)"),
-                                   1,
-                                   GNUNET_NO);
-       }
-    }
-  else
-    {
-      GNUNET_STATISTICS_update (stats,
-                               gettext_noop ("# requests forwarded due to high 
load"),
-                               1,
-                               GNUNET_NO);
-    }
-
-  /* Are multiple results possible (and did we look locally)?  If so, start 
processing remotely now! */
-  switch (pr->type)
-    {
-    case GNUNET_BLOCK_TYPE_FS_DBLOCK:
-    case GNUNET_BLOCK_TYPE_FS_IBLOCK:
-      /* only one result, wait for datastore */
-      if (GNUNET_YES != pr->forward_only)
-       {
-         GNUNET_STATISTICS_update (stats,
-                                   gettext_noop ("# requests not instantly 
forwarded (waiting for datastore)"),
-                                   1,
-                                   GNUNET_NO);
-         break;
-       }
-    default:
-      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
-       pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
-                                            pr);
-    }
-
-  /* make sure we don't track too many requests */
-  if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > 
max_pending_requests)
-    {
-      pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
-      GNUNET_assert (pr != NULL);
-      destroy_pending_request (pr);
-    }
-  return GNUNET_OK;
-
-
-
-  // FIXME!
-  // parse request
-  // setup pending request (use 'handle_p2p_reply')
-  // track pending request to cancel it on peer disconnect (!)
-  // return it!
-  // (actual planning & execution up to caller!)
-  return NULL;
+  return pr;
 }
 
 
@@ -858,9 +787,9 @@
   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
                               cp->pth_tail,
                               pth);
-  if (pth->is_query)
+  if (GNUNET_YES == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_queries--);    
-  else
+  else if (GNUNET_NO == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_replies--);
   GNUNET_LOAD_update (cp->ppd.transmission_delay,
                      UINT64_MAX);
@@ -876,7 +805,7 @@
  * the callback is invoked with a 'NULL' buffer.
  *
  * @param peer target peer
- * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO)
+ * @param is_query is this a query (GNUNET_YES) or content (GNUNET_NO) or 
neither (GNUNET_SYSERR)
  * @param priority how important is this request?
  * @param timeout when does this request timeout (call gmc with error)
  * @param size number of bytes we would like to send to the peer
@@ -933,9 +862,10 @@
                                       pth);
   GNUNET_PEER_resolve (cp->pid,
                       &target);
-  if (is_query)
+  if (GNUNET_YES == is_query)
     {
       /* query, need reservation */
+      cp->ppd.pending_queries++;
       if (NULL == cp->irc)
        {
          /* reservation already done! */
@@ -957,11 +887,17 @@
          is_ready = GNUNET_NO;
        }
     }
-  else
+  else if (GNUNET_NO == is_query)
     {
       /* no reservation needed for content */
+      cp->ppd.pending_replies++;
       is_ready = GNUNET_YES;
     }
+  else
+    {
+      /* not a query or content, no reservation needed */
+      is_ready = GNUNET_YES;
+    }
   if (is_ready)
     {
       pth->cth = GNUNET_CORE_notify_transmit_ready (core,
@@ -1011,9 +947,9 @@
   GNUNET_CONTAINER_DLL_remove (cp->pth_head,
                               cp->pth_tail,
                               pth);
-  if (pth->is_query)
+  if (GNUNET_YES == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_queries--);    
-  else
+  else if (GNUNET_NO == pth->is_query)
     GNUNET_assert (0 < cp->ppd.pending_replies--);
   GNUNET_free (pth);
 }
@@ -1085,6 +1021,26 @@
 
 
 /**
+ * Cancel all requests associated with the peer.
+ *
+ * @param cls unused
+ * @param query hash code of the request
+ * @param value the 'struct GSF_PendingRequest'
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+cancel_pending_request (void *cls,
+                       const GNUNET_HashCode *query,
+                       void *value)
+{
+  struct GSF_PendingRequest *pr = value;
+
+  GSF_pending_request_cancel_ (pr);
+  return GNUNET_OK;
+}
+
+
+/**
  * A peer disconnected from us.  Tear down the connected peer
  * record.
  *
@@ -1104,11 +1060,21 @@
   GNUNET_CONTAINER_multihashmap_remove (cp_map,
                                        &peer->hashPubKey,
                                        cp);
+  if (NULL != cp->migration_pth)
+    {
+      GSF_peer_transmit_cancel_ (cp->migration_pth);
+      cp->migration_pth = NULL;
+    }
   if (NULL != cp->irc)
     {
       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
       cp->irc = NULL;
     }
+  GNUNET_CONTAINER_multihashmap_iterate (cp->request_map,
+                                        &cancel_pending_request,
+                                        cp);
+  GNUNET_CONTAINER_multihashmap_destroy (cp->request_map);
+  cp->request_map = NULL;
   GSF_plan_notify_peer_disconnect_ (cp);
   GNUNET_LOAD_value_free (cp->ppd.transmission_delay);
   GNUNET_PEER_decrement_rcs (cp->ppd.last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
@@ -1206,6 +1172,34 @@
 
 
 /**
+ * Assemble a migration stop message for transmission.
+ *
+ * @param cls the 'struct GSF_ConnectedPeer' to use
+ * @param size number of bytes we're allowed to write to buf
+ * @param buf where to copy the message
+ * @return number of bytes copied to buf
+ */
+static size_t
+create_migration_stop_message (void *cls,
+                              size_t size,
+                              void *buf)
+{
+  struct GSF_ConnectedPeer *cp = cls;
+  struct MigrationStopMessage msm;
+
+  cp->migration_pth = NULL;
+  if (NULL == buf)
+    return 0;
+  GNUNET_assert (size > sizeof (struct MigrationStopMessage));
+  msm.header.size = htons (sizeof (struct MigrationStopMessage));
+  msm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+  msm.duration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining 
(cp->last_migration_block));
+  memcpy (buf, &msm, sizeof (struct MigrationStopMessage));
+  return sizeof (struct MigrationStopMessage);
+}
+
+
+/**
  * Ask a peer to stop migrating data to us until the given point
  * in time.
  * 
@@ -1216,30 +1210,22 @@
 GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
                           struct GNUNET_TIME_Relative block_time)
 {
-  struct PendingMessage *pm;
-  struct MigrationStopMessage *msm;
-
   if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > 
block_time.rel_value)
     return; /* already blocked */
   cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
-
-  /* FIXME: adapt old code below to new API! */
-  pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
-                     sizeof (struct MigrationStopMessage));
-  pm->msize = sizeof (struct MigrationStopMessage);
-  pm->priority = UINT32_MAX;
-  msm = (struct MigrationStopMessage*) &pm[1];
-  msm->header.size = htons (sizeof (struct MigrationStopMessage));
-  msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
-  msm->duration = GNUNET_TIME_relative_hton (block_time);
-  add_to_pending_messages_for_peer (cp,
-                                   pm,
-                                   NULL);
+  if (cp->migration_pth != NULL)
+    GSF_peer_transmit_cancel_ (cp->migration_pth);
+  cp->migration_pth 
+    = GSF_peer_transmit_ (cp,
+                         GNUNET_SYSERR,
+                         UINT32_MAX,
+                         GNUNET_TIME_UNIT_FOREVER_REL,
+                         sizeof (struct MigrationStopMessage),
+                         &create_migration_stop_message,
+                         cp);
 }
 
 
-
-
 /**
  * Write host-trust information to a file - flush the buffer entry!
  *

Modified: gnunet/src/fs/gnunet-service-fs_cp.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.h        2011-02-15 12:19:20 UTC (rev 
14407)
+++ gnunet/src/fs/gnunet-service-fs_cp.h        2011-02-15 13:07:14 UTC (rev 
14408)
@@ -257,7 +257,7 @@
  * Handle P2P "QUERY" message.  Only responsible for creating the
  * request entry itself and setting up reply callback and cancellation
  * on peer disconnect.  Does NOT execute the actual request strategy
- * (planning).
+ * (planning) or local database operations.
  *
  * @param other the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)

Modified: gnunet/src/fs/gnunet-service-fs_lc.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_lc.c        2011-02-15 12:19:20 UTC (rev 
14407)
+++ gnunet/src/fs/gnunet-service-fs_lc.c        2011-02-15 13:07:14 UTC (rev 
14408)
@@ -341,7 +341,7 @@
                                                      sizeof (GNUNET_HashCode)))
                                        ? &sm->target,
                                        : NULL,
-                                       NULL /* bf */, 0 /* mingle */,
+                                       NULL, 0, 0 /* bf */, 
                                        ntohl (sm->anonymity_level),
                                        0 /* priority */,
                                        &sm[1], sc,

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2011-02-15 12:19:20 UTC (rev 
14407)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2011-02-15 13:07:14 UTC (rev 
14408)
@@ -182,10 +182,12 @@
  * @param query key for the lookup
  * @param namespace namespace to lookup, NULL for no namespace
  * @param target preferred target for the request, NULL for none
- * @param bf bloom filter for known replies, can be NULL
+ * @param bf_data raw data for bloom filter for known replies, can be NULL
+ * @param bf_size number of bytes in bf_data
  * @param mingle mingle value for bf
  * @param anonymity_level desired anonymity level
  * @param priority maximum outgoing cummulative request priority to use
+ * @param ttl current time-to-live for the request
  * @param replies_seen hash codes of known local replies
  * @param replies_seen_count size of the 'replies_seen' array
  * @param rh handle to call when we get a reply
@@ -198,10 +200,12 @@
                             const GNUNET_HashCode *query,
                             const GNUNET_HashCode *namespace,
                             const struct GNUNET_PeerIdentity *target,
-                            const struct GNUNET_CONTAINER_BloomFilter *bf,
+                            const char *bf_data,
+                            size_t bf_size,
                             int32_t mingle,
                             uint32_t anonymity_level,
                             uint32_t priority,
+                            int32_t ttl,
                             const GNUNET_HashCode *replies_seen,
                             unsigned int replies_seen_count,
                             GSF_PendingRequestReplyHandler rh,
@@ -226,8 +230,16 @@
   pr->public_data.priority = priority;
   pr->public_data.options = options;
   pr->public_data.type = type;  
+  pr->public_data.start_time = GNUNET_TIME_absolute_get ();
   pr->rh = rh;
   pr->rh_cls = rh_cls;
+  if (ttl >= 0)
+    pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS,
+                                                                              
(uint32_t) ttl));
+  else
+    pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
+                                            GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS,
+                                                                           
(uint32_t) (- ttl)));
   if (replies_seen_count > 0)
     {
       pr->replies_seen_size = replies_seen_count;
@@ -237,9 +249,11 @@
              replies_seen_count * sizeof (struct GNUNET_HashCode));
       pr->replies_seen_count = replies_seen_count;
     }
-  if (NULL != bf)    
+  if (NULL != bf_data)    
     {
-      pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf);
+      pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
+                                                 bf_size,
+                                                 BLOOMFILTER_K);
       pr->mingle = mingle;
     }
   else if ( (replies_seen_count > 0) &&
@@ -254,11 +268,40 @@
   // FIXME: if not a local query, we also need to track the
   // total number of external queries we currently have and
   // bound it => need an additional heap!
+
+  pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
+                                           pr,
+                                           pr->start_time.abs_value + pr->ttl);
+
+
+
+  /* make sure we don't track too many requests */
+  if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > 
max_pending_requests)
+    {
+      pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+      GNUNET_assert (pr != NULL);
+      destroy_pending_request (pr);
+    }
+
+
   return pr;
 }
 
 
 /**
+ * Obtain the public data associated with a pending request
+ * 
+ * @param pr pending request
+ * @return associated public data
+ */
+struct GSF_PendingRequestData *
+GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
+{
+  return &pr->public_data;
+}
+
+
+/**
  * Update a given pending request with additional replies
  * that have been seen.
  *

Modified: gnunet/src/fs/gnunet-service-fs_pr.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.h        2011-02-15 12:19:20 UTC (rev 
14407)
+++ gnunet/src/fs/gnunet-service-fs_pr.h        2011-02-15 13:07:14 UTC (rev 
14408)
@@ -92,6 +92,16 @@
   struct GNUNET_PeerIdentity target;
 
   /**
+   * Current TTL for the request.
+   */
+  struct GNUNET_TIME_Absolute ttl;
+
+  /**
+   * When did we start with the request.
+   */
+  struct GNUNET_TIME_Absolute start_time;
+
+  /**
    * Desired anonymity level.
    */
   uint32_t anonymity_level;
@@ -146,10 +156,12 @@
  * @param query key for the lookup
  * @param namespace namespace to lookup, NULL for no namespace
  * @param target preferred target for the request, NULL for none
- * @param bf bloom filter for known replies, can be NULL
+ * @param bf_data raw data for bloom filter for known replies, can be NULL
+ * @param bf_size number of bytes in bf_data
  * @param mingle mingle value for bf
  * @param anonymity_level desired anonymity level
  * @param priority maximum outgoing cummulative request priority to use
+ * @param ttl current time-to-live for the request
  * @param replies_seen hash codes of known local replies
  * @param replies_seen_count size of the 'replies_seen' array
  * @param rh handle to call when we get a reply
@@ -162,10 +174,12 @@
                             const GNUNET_HashCode *query,
                             const GNUNET_HashCode *namespace,
                             const struct GNUNET_PeerIdentity *target,
-                            const struct GNUNET_CONTAINER_BloomFilter *bf,
+                            const char *bf_data,
+                            size_t bf_size,
                             int32_t mingle,
                             uint32_t anonymity_level,
                             uint32_t priority,
+                            int32_t ttl,
                             const GNUNET_HashCode *replies_seen,
                             unsigned int replies_seen_count,
                             GSF_PendingRequestReplyHandler rh,
@@ -187,6 +201,16 @@
 
 
 /**
+ * Obtain the public data associated with a pending request
+ * 
+ * @param pr pending request
+ * @return associated public data
+ */
+struct GSF_PendingRequestData *
+GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr);
+
+
+/**
  * Generate the message corresponding to the given pending request for
  * transmission to other peers (or at least determine its size).
  *




reply via email to

[Prev in Thread] Current Thread [Next in Thread]