gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r14384 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r14384 - gnunet/src/fs
Date: Thu, 10 Feb 2011 13:59:38 +0100

Author: grothoff
Date: 2011-02-10 13:59:38 +0100 (Thu, 10 Feb 2011)
New Revision: 14384

Added:
   gnunet/src/fs/gnunet-service-fs_push.c
   gnunet/src/fs/gnunet-service-fs_put.c
Modified:
   gnunet/src/fs/gnunet-service-fs.h
   gnunet/src/fs/gnunet-service-fs_cp.c
   gnunet/src/fs/gnunet-service-fs_cp.h
   gnunet/src/fs/gnunet-service-fs_lc.c
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_pr.h
Log:
stuff

Modified: gnunet/src/fs/gnunet-service-fs.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs.h   2011-02-09 21:01:47 UTC (rev 14383)
+++ gnunet/src/fs/gnunet-service-fs.h   2011-02-10 12:59:38 UTC (rev 14384)
@@ -31,18 +31,44 @@
  */
 struct GSF_ConnectedPeer;
 
-
 /**
  * An active request.
  */
 struct GSF_PendingRequest;
 
-
 /**
  * A local client.
  */
 struct GSF_LocalClient;
 
 
+/**
+ * Our connection to the datastore.
+ */
+extern struct GNUNET_DATASTORE_Handle *GSF_dsh;
+
+/**
+ * Our configuration.
+ */
+extern const struct GNUNET_CONFIGURATION_Handle *GSF_cfg;
+
+/**
+ * Handle for reporting statistics.
+ */
+extern struct GNUNET_STATISTICS_Handle *GSF_stats;
+
+/**
+ * Pointer to handle to the core service (points to NULL until we've
+ * connected to it).
+ */
+extern struct GNUNET_CORE_Handle *GSF_core;
+
+/**
+ * Handle for DHT operations.
+ */
+static struct GNUNET_DHT_Handle *GSF_dht;
+
+
+
 #endif
 /* end of gnunet-service-fs.h */

Modified: gnunet/src/fs/gnunet-service-fs_cp.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.c        2011-02-09 21:01:47 UTC (rev 
14383)
+++ gnunet/src/fs/gnunet-service-fs_cp.c        2011-02-10 12:59:38 UTC (rev 
14384)
@@ -141,11 +141,6 @@
   uint64_t inc_preference;
 
   /**
-   * Trust rating for this peer
-   */
-  uint32_t trust;
-
-  /**
    * Trust rating for this peer on disk.
    */
   uint32_t disk_trust;
@@ -249,6 +244,19 @@
 
 
 /**
+ * Return the performance data record for the given peer
+ * 
+ * @param cp peer to query
+ * @return performance data record for the peer
+ */
+struct GSF_PeerPerformanceData *
+GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp)
+{
+  return &cp->ppd;
+}
+
+
+/**
  * Core is ready to transmit to a peer, get the message.
  *
  * @param cls the 'struct GSF_PeerTransmitHandle' of the message
@@ -420,13 +428,95 @@
       GNUNET_break (0);
       return GNUNET_OK;
     }
-  cp->ppd.migration_blocked = GNUNET_TIME_relative_to_absolute 
(GNUNET_TIME_relative_ntoh (msm->duration));
+  cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute 
(GNUNET_TIME_relative_ntoh (msm->duration));
   update_atsi (cp, atsi);
   return GNUNET_OK;
 }
 
 
 /**
+ * Handle a reply to a pending request.  Also called if a request
+ * expires (then with data == NULL).  The handler may be called
+ * many times (depending on the request type), but will not be
+ * called during or after a call to GSF_pending_request_cancel 
+ * and will also not be called anymore after a call signalling
+ * expiration.
+ *
+ * @param cls user-specified closure
+ * @param pr handle to the original pending request
+ * @param data response data, NULL on request expiration
+ * @param data_len number of bytes in data
+ */
+static void
+handle_p2p_reply (void *cls,
+                 struct GSF_PendingRequest *pr,
+                 const void *data,
+                 size_t data_len)
+{
+#if SUPPORT_DELAYS  
+  struct GNUNET_TIME_Relative art_delay;
+#endif
+
+  /* FIXME: adapt code fragments below to new API! */
+
+
+  /* reply will go over the network, check for cover traffic */
+  if ( (prq->anonymity_level >  1) &&
+       (cover_content_count < prq->anonymity_level - 1) )
+    {
+      /* insufficient cover traffic, skip */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# replies suppressed due to lack 
of cover traffic"),
+                               1,
+                               GNUNET_NO);
+      return GNUNET_YES;
+    }  
+  if (prq->anonymity_level >  1) 
+    cover_content_count -= prq->anonymity_level - 1;
+
+
+      cp = pr->cp;
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Transmitting result for query `%s' to other peer (PID=%u)\n",
+                 GNUNET_h2s (key),
+                 (unsigned int) cp->pid);
+#endif  
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# replies received for other 
peers"),
+                               1,
+                               GNUNET_NO);
+      msize = sizeof (struct PutMessage) + prq->size;
+      reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
+      reply->cont = &transmit_reply_continuation;
+      reply->cont_cls = pr;
+#if SUPPORT_DELAYS
+      art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+                                                GNUNET_CRYPTO_random_u32 
(GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                          
TTL_DECREMENT));
+      reply->delay_until 
+       = GNUNET_TIME_relative_to_absolute (art_delay);
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("cummulative artificial delay 
introduced (ms)"),
+                               art_delay.abs_value,
+                               GNUNET_NO);
+#endif
+      reply->msize = msize;
+      reply->priority = UINT32_MAX; /* send replies first! */
+      pm = (struct PutMessage*) &reply[1];
+      pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+      pm->header.size = htons (msize);
+      pm->type = htonl (prq->type);
+      pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+      memcpy (&pm[1], prq->data, prq->size);
+      add_to_pending_messages_for_peer (cp, reply, pr);
+
+
+}
+
+
+
+/**
  * Handle P2P "QUERY" message.
  *
  * @param other the other peer involved (sender or receiver, NULL
@@ -438,9 +528,310 @@
 GSF_handle_p2p_query_ (const struct GNUNET_PeerIdentity *other,
                       const struct GNUNET_MessageHeader *message)
 {
+  /* FIXME: adapt old code to new API! */
+  struct PendingRequest *pr;
+  struct ConnectedPeer *cp;
+  struct ConnectedPeer *cps;
+  struct CheckDuplicateRequestClosure cdc;
+  struct GNUNET_TIME_Relative timeout;
+  uint16_t msize;
+  const struct GetMessage *gm;
+  unsigned int bits;
+  const GNUNET_HashCode *opt;
+  uint32_t bm;
+  size_t bfsize;
+  uint32_t ttl_decrement;
+  int32_t priority;
+  enum GNUNET_BLOCK_Type type;
+  int have_ns;
+
+  msize = ntohs(message->size);
+  if (msize < sizeof (struct GetMessage))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+  gm = (const struct GetMessage*) message;
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received request for `%s'\n",
+             GNUNET_h2s (&gm->query));
+#endif
+  type = ntohl (gm->type);
+  bm = ntohl (gm->hash_bitmap);
+  bits = 0;
+  while (bm > 0)
+    {
+      if (1 == (bm & 1))
+       bits++;
+      bm >>= 1;
+    }
+  if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }  
+  opt = (const GNUNET_HashCode*) &gm[1];
+  bfsize = msize - sizeof (struct GetMessage) - bits * sizeof 
(GNUNET_HashCode);
+  /* bfsize must be power of 2, check! */
+  if (0 != ( (bfsize - 1) & bfsize))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+  cover_query_count++;
+  bm = ntohl (gm->hash_bitmap);
+  bits = 0;
+  cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                          &other->hashPubKey);
+  if (NULL == cps)
+    {
+      /* peer must have just disconnected */
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due to 
initiator not being connected"),
+                               1,
+                               GNUNET_NO);
+      return GNUNET_SYSERR;
+    }
+  if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
+    cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
+                                           &opt[bits++]);
+  else
+    cp = cps;
+  if (cp == NULL)
+    {
+#if DEBUG_FS
+      if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                   "Failed to find RETURN-TO peer `%4s' in connection set. 
Dropping query.\n",
+                   GNUNET_i2s ((const struct GNUNET_PeerIdentity*) 
&opt[bits-1]));
+      
+      else
+       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                   "Failed to find peer `%4s' in connection set. Dropping 
query.\n",
+                   GNUNET_i2s (other));
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due to 
missing reverse route"),
+                               1,
+                               GNUNET_NO);
+     /* FIXME: try connect? */
+      return GNUNET_OK;
+    }
+  /* note that we can really only check load here since otherwise
+     peers could find out that we are overloaded by not being
+     disconnected after sending us a malformed query... */
+  priority = bound_priority (ntohl (gm->priority), cps);
+  if (priority < 0)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Dropping query from `%s', this peer is too busy.\n",
+                 GNUNET_i2s (other));
+#endif
+      return GNUNET_OK;
+    }
+#if DEBUG_FS 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received request for `%s' of type %u from peer `%4s' with flags 
%u\n",
+             GNUNET_h2s (&gm->query),
+             (unsigned int) type,
+             GNUNET_i2s (other),
+             (unsigned int) bm);
+#endif
+  have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
+  pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
+                     (have_ns ? sizeof(GNUNET_HashCode) : 0));
+  if (have_ns)
+    {
+      pr->namespace = (GNUNET_HashCode*) &pr[1];
+      memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
+    }
+  if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
+       (GNUNET_LOAD_get_average (cp->transmission_delay) > 
+       GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average 
(rt_entry_lifetime)) )
+    {
+      /* don't have BW to send to peer, or would likely take longer than we 
have for it,
+        so at best indirect the query */
+      priority = 0;
+      pr->forward_only = GNUNET_YES;
+    }
+  pr->type = type;
+  pr->mingle = ntohl (gm->filter_mutator);
+  if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
+    pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) 
&opt[bits++]);
+  pr->anonymity_level = 1;
+  pr->priority = (uint32_t) priority;
+  pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
+  pr->query = gm->query;
+  /* decrement ttl (always) */
+  ttl_decrement = 2 * TTL_DECREMENT +
+    GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                             TTL_DECREMENT);
+  if ( (pr->ttl < 0) &&
+       (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
+                 GNUNET_i2s (other),
+                 pr->ttl,
+                 ttl_decrement);
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests dropped due TTL 
underflow"),
+                               1,
+                               GNUNET_NO);
+      /* integer underflow => drop (should be very rare)! */      
+      GNUNET_free (pr);
+      return GNUNET_OK;
+    } 
+  pr->ttl -= ttl_decrement;
+  pr->start_time = GNUNET_TIME_absolute_get ();
+
+  /* get bloom filter */
+  if (bfsize > 0)
+    {
+      pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
+                                                 bfsize,
+                                                 BLOOMFILTER_K);
+      pr->bf_size = bfsize;
+    }
+  cdc.have = NULL;
+  cdc.pr = pr;
+  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+                                             &gm->query,
+                                             &check_duplicate_request_peer,
+                                             &cdc);
+  if (cdc.have != NULL)
+    {
+      if (cdc.have->start_time.abs_value + cdc.have->ttl >=
+         pr->start_time.abs_value + pr->ttl)
+       {
+         /* existing request has higher TTL, drop new one! */
+         cdc.have->priority += pr->priority;
+         destroy_pending_request (pr);
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Have existing request with higher TTL, dropping new 
request.\n",
+                     GNUNET_i2s (other));
+#endif
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests dropped due to 
higher-TTL request"),
+                                   1,
+                                   GNUNET_NO);
+         return GNUNET_OK;
+       }
+      else
+       {
+         /* existing request has lower TTL, drop old one! */
+         pr->priority += cdc.have->priority;
+         /* Possible optimization: if we have applicable pending
+            replies in 'cdc.have', we might want to move those over
+            (this is a really rare special-case, so it is not clear
+            that this would be worth it) */
+         destroy_pending_request (cdc.have);
+         /* keep processing 'pr'! */
+       }
+    }
+
+  pr->cp = cp;
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (query_request_map,
+                                                  &gm->query,
+                                                  pr,
+                                                  
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  GNUNET_break (GNUNET_OK ==
+               GNUNET_CONTAINER_multihashmap_put (peer_request_map,
+                                                  &other->hashPubKey,
+                                                  pr,
+                                                  
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
+  
+  pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
+                                           pr,
+                                           pr->start_time.abs_value + pr->ttl);
+
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# P2P searches received"),
+                           1,
+                           GNUNET_NO);
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# P2P searches active"),
+                           1,
+                           GNUNET_NO);
+
+  /* calculate change in traffic preference */
+  cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
+  /* process locally */
+  if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
+    type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
+  timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
+                                          (pr->priority + 1)); 
+  if (GNUNET_YES != pr->forward_only)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Handing request for `%s' to datastore\n",
+                 GNUNET_h2s (&gm->query));
+#endif
+      pr->qe = GNUNET_DATASTORE_get (dsh,
+                                    &gm->query,
+                                    type,                             
+                                    pr->priority + 1,
+                                    MAX_DATASTORE_QUEUE,                       
         
+                                    timeout,
+                                    &process_local_reply,
+                                    pr);
+      if (NULL == pr->qe)
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests dropped by 
datastore (queue length limit)"),
+                                   1,
+                                   GNUNET_NO);
+       }
+    }
+  else
+    {
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# requests forwarded due to high 
load"),
+                               1,
+                               GNUNET_NO);
+    }
+
+  /* Are multiple results possible (and did we look locally)?  If so, start 
processing remotely now! */
+  switch (pr->type)
+    {
+    case GNUNET_BLOCK_TYPE_FS_DBLOCK:
+    case GNUNET_BLOCK_TYPE_FS_IBLOCK:
+      /* only one result, wait for datastore */
+      if (GNUNET_YES != pr->forward_only)
+       {
+         GNUNET_STATISTICS_update (stats,
+                                   gettext_noop ("# requests not instantly 
forwarded (waiting for datastore)"),
+                                   1,
+                                   GNUNET_NO);
+         break;
+       }
+    default:
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
+                                            pr);
+    }
+
+  /* make sure we don't track too many requests */
+  if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > 
max_pending_requests)
+    {
+      pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
+      GNUNET_assert (pr != NULL);
+      destroy_pending_request (pr);
+    }
+  return GNUNET_OK;
+
+
+
   // FIXME!
   // parse request
-  // setup pending request
+  // setup pending request (use 'handle_p2p_reply')
   // track pending request to cancel it on peer disconnect (!)
   // return it!
   // (actual planning & execution up to caller!)
@@ -815,6 +1206,41 @@
 
 
 /**
+ * Ask a peer to stop migrating data to us until the given point
+ * in time.
+ * 
+ * @param cp peer to ask
+ * @param block_time until when to block
+ */
+void
+GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
+                          struct GNUNET_TIME_Relative block_time)
+{
+  struct PendingMessage *pm;
+  struct MigrationStopMessage *msm;
+
+  if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value > 
block_time.rel_value)
+    return; /* already blocked */
+  cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
+
+  /* FIXME: adapt old code below to new API! */
+  pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
+                     sizeof (struct MigrationStopMessage));
+  pm->msize = sizeof (struct MigrationStopMessage);
+  pm->priority = UINT32_MAX;
+  msm = (struct MigrationStopMessage*) &pm[1];
+  msm->header.size = htons (sizeof (struct MigrationStopMessage));
+  msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
+  msm->duration = GNUNET_TIME_relative_hton (block_time);
+  add_to_pending_messages_for_peer (cp,
+                                   pm,
+                                   NULL);
+}
+
+
+
+
+/**
  * Write host-trust information to a file - flush the buffer entry!
  *
  * @param cls closure, not used
@@ -965,9 +1391,9 @@
  * @return GNUNET_YES (we should continue to iterate)
  */
 static int 
-clean_peer (void *cls,
-           const GNUNET_HashCode * key,
-           void *value)
+clean_local_client (void *cls,
+                   const GNUNET_HashCode * key,
+                   void *value)
 {
   const struct GSF_LocalClient *lc = cls;
   struct GSF_ConnectedPeer *cp = value;

Modified: gnunet/src/fs/gnunet-service-fs_cp.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.h        2011-02-09 21:01:47 UTC (rev 
14383)
+++ gnunet/src/fs/gnunet-service-fs_cp.h        2011-02-10 12:59:38 UTC (rev 
14384)
@@ -90,6 +90,11 @@
   double avg_priority;
 
   /**
+   * Trust rating for this peer
+   */
+  uint32_t trust;
+
+  /**
    * Number of pending queries (replies are not counted)
    */
   unsigned int pending_queries;
@@ -265,6 +270,28 @@
 
 
 /**
+ * Return the performance data record for the given peer
+ * 
+ * @param cp peer to query
+ * @return performance data record for the peer
+ */
+struct GSF_PeerPerformanceData *
+GSF_get_peer_performance_data_ (struct GSF_ConnectedPeer *cp);
+
+
+/**
+ * Ask a peer to stop migrating data to us until the given point
+ * in time.
+ * 
+ * @param cp peer to ask
+ * @param block_time until when to block
+ */
+void
+GSF_block_peer_migration_ (struct GSF_ConnectedPeer *cp,
+                          struct GNUNET_TIME_Relative block_time);
+
+
+/**
  * A peer disconnected from us.  Tear down the connected peer
  * record.
  *

Modified: gnunet/src/fs/gnunet-service-fs_lc.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_lc.c        2011-02-09 21:01:47 UTC (rev 
14383)
+++ gnunet/src/fs/gnunet-service-fs_lc.c        2011-02-10 12:59:38 UTC (rev 
14384)
@@ -179,6 +179,70 @@
 
 
 /**
+ * Handle a reply to a pending request.  Also called if a request
+ * expires (then with data == NULL).  The handler may be called
+ * many times (depending on the request type), but will not be
+ * called during or after a call to GSF_pending_request_cancel 
+ * and will also not be called anymore after a call signalling
+ * expiration.
+ *
+ * @param cls user-specified closure
+ * @param pr handle to the original pending request
+ * @param data response data, NULL on request expiration
+ * @param data_len number of bytes in data
+ */
+static void
+client_response_handler (void *cls,
+                        struct GSF_PendingRequest *pr,
+                        const void *data,
+                        size_t data_len)
+{
+  /* FIXME: adapt old code below to new API! */
+
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# replies received for local 
clients"),
+                               1,
+                               GNUNET_NO);
+      cl = pr->client_request_list->client_list;
+      msize = sizeof (struct PutMessage) + prq->size;
+      creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
+      creply->msize = msize;
+      creply->client_list = cl;
+      GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
+                                        cl->res_tail,
+                                        cl->res_tail,
+                                        creply);      
+      pm = (struct PutMessage*) &creply[1];
+      pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+      pm->header.size = htons (msize);
+      pm->type = htonl (prq->type);
+      pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
+      memcpy (&pm[1], prq->data, prq->size);      
+      if (NULL == cl->th)
+       {
+#if DEBUG_FS
+         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                     "Transmitting result for query `%s' to client\n",
+                     GNUNET_h2s (key));
+#endif  
+         cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
+                                                       msize,
+                                                       
GNUNET_TIME_UNIT_FOREVER_REL,
+                                                       &transmit_to_client,
+                                                       cl);
+       }
+      GNUNET_break (cl->th != NULL);
+      if (pr->do_remove)               
+       {
+         prq->finished = GNUNET_YES;
+         destroy_pending_request (pr);         
+       }
+
+}
+
+
+
+/**
  * Handle START_SEARCH-message (search request from local client).
  *
  * @param client identification of the client

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2011-02-09 21:01:47 UTC (rev 
14383)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2011-02-10 12:59:38 UTC (rev 
14384)
@@ -37,16 +37,39 @@
    */ 
   struct GSF_PendingRequestData public_data;
 
+  /**
+   * Function to call if we encounter a reply.
+   */
   GSF_PendingRequestReplyHandler rh;
 
+  /**
+   * Closure for 'rh'
+   */
   void *rh_cls;
 
-  const GNUNET_HashCode *replies_seen;
+  /**
+   * Array of hash codes of replies we've already seen.
+   */
+  GNUNET_HashCode *replies_seen;
 
+  /**
+   * Bloomfilter masking replies we've already seen.
+   */
   struct GNUNET_CONTAINER_BloomFilter *bf;
 
+  /**
+   * Number of valid entries in the 'replies_seen' array.
+   */
   unsigned int replies_seen_count;
 
+  /**
+   * Length of the 'replies_seen' array.
+   */
+  unsigned int replies_seen_size;
+
+  /**
+   * Mingle value we currently use for the bf.
+   */
   int32_t mingle;
                            
 };
@@ -56,10 +79,102 @@
  * All pending requests, ordered by the query.  Entries
  * are of type 'struct GSF_PendingRequest*'.
  */
-static struct GNUNET_CONTAINER_MultiHashMap *requests;
+static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
 
 
 /**
+ * Datastore 'PUT' load tracking.
+ */
+static struct GNUNET_LOAD_Value *datastore_put_load;
+
+
+/**
+ * Are we allowed to migrate content to this peer.
+ */
+static int active_to_migration;
+
+
+/**
+ * Heap with the request that will expire next at the top.  Contains
+ * pointers of type "struct PendingRequest*"; these will *also* be
+ * aliased from the "requests_by_peer" data structures and the
+ * "requests_by_query" table.  Note that requests from our clients
+ * don't expire and are thus NOT in the "requests_by_expiration"
+ * (or the "requests_by_peer" tables).
+ */
+static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
+
+
+/**
+ * How many bytes should a bloomfilter be if we have already seen
+ * entry_count responses?  Note that BLOOMFILTER_K gives us the number
+ * of bits set per entry.  Furthermore, we should not re-size the
+ * filter too often (to keep it cheap).
+ *
+ * Since other peers will also add entries but not resize the filter,
+ * we should generally pick a slightly larger size than what the
+ * strict math would suggest.
+ *
+ * @return must be a power of two and smaller or equal to 2^15.
+ */
+static size_t
+compute_bloomfilter_size (unsigned int entry_count)
+{
+  size_t size;
+  unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
+  uint16_t max = 1 << 15;
+
+  if (entry_count > max)
+    return max;
+  size = 8;
+  while ((size < max) && (size < ideal))
+    size *= 2;
+  if (size > max)
+    return max;
+  return size;
+}
+
+
+/**
+ * Recalculate our bloom filter for filtering replies.  This function
+ * will create a new bloom filter from scratch, so it should only be
+ * called if we have no bloomfilter at all (and hence can create a
+ * fresh one of minimal size without problems) OR if our peer is the
+ * initiator (in which case we may resize to larger than mimimum size).
+ *
+ * @param pr request for which the BF is to be recomputed
+ * @return GNUNET_YES if a refresh actually happened
+ */
+static int
+refresh_bloomfilter (struct GSF_PendingRequest *pr)
+{
+  unsigned int i;
+  size_t nsize;
+  GNUNET_HashCode mhash;
+
+  nsize = compute_bloomfilter_size (pr->replies_seen_off);
+  if ( (bf != NULL) &&
+       (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
+    return GNUNET_NO; /* size not changed */
+  if (pr->bf != NULL)
+    GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+  pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
+                                                  UINT32_MAX);
+  pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
+                                             nsize,
+                                             BLOOMFILTER_K);
+  for (i=0;i<pr->replies_seen_count;i++)
+    {
+      GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
+                               pr->mingle,
+                               &mhash);
+      GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
+    }
+  return GNUNET_YES;
+}
+
+
+/**
  * Create a new pending request.  
  *
  * @param options request options
@@ -92,7 +207,54 @@
                             GSF_PendingRequestReplyHandler rh,
                             void *rh_cls)
 {
-  return NULL; // FIXME
+  struct GSF_PendingRequest *pr;
+
+  
+  pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
+  pr->public_data.query = *query;
+  if (GNUNET_BLOCK_TYPE_SBLOCK == type)
+    {
+      GNUNET_assert (NULL != namespace);
+      pr->public_data.namespace = *namespace;
+    }
+  if (NULL != target)
+    {
+      pr->public_data.target = *target;
+      pr->has_target = GNUNET_YES;
+    }
+  pr->public_data.anonymity_level = anonymity_data;
+  pr->public_data.priority = priority;
+  pr->public_data.options = options;
+  pr->public_data.type = type;  
+  pr->rh = rh;
+  pr->rh_cls = rh_cls;
+  if (replies_seen_count > 0)
+    {
+      pr->replies_seen_size = replies_seen_count;
+      pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * 
pr->replies_seen_size);
+      memcpy (pr->replies_seen,
+             replies_seen,
+             replies_seen_count * sizeof (struct GNUNET_HashCode));
+      pr->replies_seen_count = replies_seen_count;
+    }
+  if (NULL != bf)    
+    {
+      pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf);
+      pr->mingle = mingle;
+    }
+  else if ( (replies_seen_count > 0) &&
+           (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
+    {
+      GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
+    }
+  GNUNET_CONTAINER_multihashmap_put (pr_map,
+                                    query,
+                                    pr,
+                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  // FIXME: if not a local query, we also need to track the
+  // total number of external queries we currently have and
+  // bound it => need an additional heap!
+  return pr;
 }
 
 
@@ -109,56 +271,162 @@
                             const GNUNET_HashCode *replies_seen,
                             unsigned int replies_seen_count)
 {
-  // FIXME
-}
+  unsigned int i;
+  GNUNET_HashCode mhash;
 
-
-
-/**
- * Get the query for a given pending request.
- *
- * @param pr the request
- * @return pointer to the query (only valid as long as pr is valid)
- */
-const GNUNET_HashCode *
-GSF_pending_request_get_query_ (const struct GSF_PendingRequest *pr)
-{
-  return NULL; // FIXME
+  if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
+    return; /* integer overflow */
+  if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
+    {
+      /* we're responsible for the BF, full refresh */
+      if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
+       GNUNET_array_grow (pr->replies_seen,
+                          pr->replies_seen_size,
+                          replies_seen_count + pr->replies_seen_count);
+      memcpy (&pr->replies_seen[pr->replies_seen_count],
+             replies_seen,
+             sizeof (GNUNET_HashCode) * replies_seen_count);
+      pr->replies_seen_count += replies_seen;
+      if (GNUNET_NO == refresh_bloomfilter (pr))
+       {
+         /* bf not recalculated, simply extend it with new bits */
+         for (i=0;i<pr->replies_seen_count;i++)
+           {
+             GNUNET_BLOCK_mingle_hash (&replies_seen[i],
+                                       pr->mingle,
+                                       &mhash);
+             GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
+           }
+       }
+    }
+  else
+    {
+      if (NULL == pr->bf)
+       {
+         /* we're not the initiator, but the initiator did not give us
+            any bloom-filter, so we need to create one on-the-fly */
+         pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 
(GNUNET_CRYPTO_QUALITY_WEAK, 
+                                                          UINT32_MAX);
+         pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size 
(replies_seen_count),
+                                                     pr->mingle,
+                                                     BLOOMFILTER_K);
+       }
+      for (i=0;i<pr->replies_seen_count;i++)
+       {
+         GNUNET_BLOCK_mingle_hash (&replies_seen[i],
+                                   pr->mingle,
+                                   &mhash);
+         GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
+       }
+    }
 }
 
 
 /**
- * Get the type of a given pending request.
- *
- * @param pr the request
- * @return query type
- */
-enum GNUNET_BLOCK_Type
-GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr)
-{
-  return 0; // FIXME
-}
-
-
-/**
  * Generate the message corresponding to the given pending request for
  * transmission to other peers (or at least determine its size).
  *
  * @param pr request to generate the message for
+ * @param do_route are we routing the reply
  * @param buf_size number of bytes available in buf
  * @param buf where to copy the message (can be NULL)
  * @return number of bytes needed (if > buf_size) or used
  */
 size_t
 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
+                                 int do_route,
                                  size_t buf_size,
                                  void *buf)
 {
-  return 0; // FIXME
+  struct PendingMessage *pm;
+  char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
+  struct GetMessage *gm;
+  GNUNET_HashCode *ext;
+  size_t msize;
+  unsigned int k;
+  int no_route;
+  uint32_t bm;
+  uint32_t prio;
+  size_t bf_size;
+
+  k = 0;
+  bm = 0;
+  if (GNUNET_YES != do_route)
+    {
+      bm |= GET_MESSAGE_BIT_RETURN_TO;
+      k++;      
+    }
+  if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
+    {
+      bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
+      k++;
+    }
+  if (GNUNET_YES == pr->has_target)
+    {
+      bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
+      k++;
+    }
+  bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
+  msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
+  GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
+  if (buf_size < msize)
+    return msize;  
+  gm = (struct GetMessage*) lbuf;
+  gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
+  gm->header.size = htons (msize);
+  gm->type = htonl (pr->type);
+  if (GNUNET_YES == do_route)
+    prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                    pr->public_data.priority + 1);
+  else
+    prio = 0;
+  pr->public_data.priority -= prio;
+  gm->priority = htonl (prio);
+  gm->ttl = htonl (pr->ttl);
+  gm->filter_mutator = htonl(pr->mingle); 
+  gm->hash_bitmap = htonl (bm);
+  gm->query = pr->query;
+  ext = (GNUNET_HashCode*) &gm[1];
+  k = 0;
+  if (GNUNET_YES != do_route)
+    GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
+  if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
+    memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
+  if (GNUNET_YES == pr->has_target)
+    GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) 
&ext[k++]);
+  if (pr->bf != NULL)
+    GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
+                                              (char*) &ext[k],
+                                              bf_size);
+  memcpy (buf, gm, msize);
+  return msize;
 }
 
 
 /**
+ * Iterator to free pending requests.
+ *
+ * @param cls closure, unused
+ * @param key current key code
+ * @param value value in the hash map (pending request)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int 
+clean_request (void *cls,
+              const GNUNET_HashCode * key,
+              void *value)
+{
+  struct GSF_PendingRequest *pr = value;
+  
+  GNUNET_free_non_null (pr->replies_seen);
+  if (NULL != pr->bf)
+    GNUNET_CONTAINER_bloomfilter_free (pr->bf);
+  GNUNET_free (pr);
+  return GNUNET_YES;
+}
+
+
+/**
  * Explicitly cancel a pending request.
  *
  * @param pr request to cancel
@@ -166,6 +434,12 @@
 void
 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
 {
+  GNUNET_assert (GNUNET_OK ==
+                GNUNET_CONTAINER_multihashmap_remove (pr_map,
+                                                      &pr->public_data.query,
+                                                      pr));
+  GNUNET_assert (GNUNET_YES ==
+                clean_request (NULL, &pr->public_data.query, pr));  
 }
 
 
@@ -176,31 +450,478 @@
  * @param cls closure for it
  */
 void
-GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
-                              void *cls)
+GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
+                            void *cls)
 {
-  // FIXME
+  GNUNET_CONTAINER_multihashmap_iterate (pr_map,
+                                        (GNUNET_CONTAINER_HashMapIterator) it,
+                                        cls);
 }
 
 
+
+
 /**
+ * Closure for "process_reply" function.
+ */
+struct ProcessReplyClosure
+{
+  /**
+   * The data for the reply.
+   */
+  const void *data;
+
+  /**
+   * Who gave us this reply? NULL for local host (or DHT)
+   */
+  struct ConnectedPeer *sender;
+
+  /**
+   * When the reply expires.
+   */
+  struct GNUNET_TIME_Absolute expiration;
+
+  /**
+   * Size of data.
+   */
+  size_t size;
+
+  /**
+   * Type of the block.
+   */
+  enum GNUNET_BLOCK_Type type;
+
+  /**
+   * How much was this reply worth to us?
+   */
+  uint32_t priority;
+
+  /**
+   * Anonymity requirements for this reply.
+   */
+  uint32_t anonymity_level;
+
+  /**
+   * Evaluation result (returned).
+   */
+  enum GNUNET_BLOCK_EvaluationResult eval;
+
+  /**
+   * Did we finish processing the associated request?
+   */ 
+  int finished;
+
+  /**
+   * Did we find a matching request?
+   */
+  int request_found;
+};
+
+
+/**
+ * Update the performance data for the sender (if any) since
+ * the sender successfully answered one of our queries.
+ *
+ * @param prq information about the sender
+ * @param pr request that was satisfied
+ */
+static void
+update_request_performance_data (struct ProcessReplyClosure *prq,
+                                struct GSF_PendingRequest *pr)
+{
+  unsigned int i;
+  struct GNUNET_TIME_Relative cur_delay;
+
+  if (prq->sender == NULL)
+    return;      
+  /* FIXME: adapt code to new API... */
+  for (i=0;i<pr->used_targets_off;i++)
+    if (pr->used_targets[i].pid == prq->sender->pid)
+      break;
+  if (i < pr->used_targets_off)
+    {
+      cur_delay = GNUNET_TIME_absolute_get_duration 
(pr->used_targets[i].last_request_time);      
+      prq->sender->avg_delay.rel_value
+       = (prq->sender->avg_delay.rel_value * 
+          (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
+      prq->sender->avg_priority
+       = (prq->sender->avg_priority * 
+          (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
+    }
+  if (pr->cp != NULL)
+    {
+      GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
+                            [prq->sender->last_p2p_replies_woff % 
P2P_SUCCESS_LIST_SIZE], 
+                            -1);
+      GNUNET_PEER_change_rc (pr->cp->pid, 1);
+      prq->sender->last_p2p_replies
+       [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
+       = pr->cp->pid;
+    }
+  else
+    {
+      if (NULL != prq->sender->last_client_replies
+         [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
+       GNUNET_SERVER_client_drop (prq->sender->last_client_replies
+                                  [(prq->sender->last_client_replies_woff) % 
CS2P_SUCCESS_LIST_SIZE]);
+      prq->sender->last_client_replies
+       [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
+       = pr->client_request_list->client_list->client;
+      GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
+    }
+}
+                               
+
+
+/**
+ * We have received a reply; handle it!
+ *
+ * @param cls response (struct ProcessReplyClosure)
+ * @param key our query
+ * @param value value in the hash map (info about the query)
+ * @return GNUNET_YES (we should continue to iterate)
+ */
+static int
+process_reply (void *cls,
+              const GNUNET_HashCode * key,
+              void *value)
+{
+  struct ProcessReplyClosure *prq = cls;
+  struct GSF_PendingRequest *pr = value;
+  struct PendingMessage *reply;
+  struct ClientResponseMessage *creply;
+  struct ClientList *cl;
+  struct PutMessage *pm;
+  struct ConnectedPeer *cp;
+  size_t msize;
+
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Matched result (type %u) for query `%s' with pending request\n",
+             (unsigned int) prq->type,
+             GNUNET_h2s (key));
+#endif  
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# replies received and matched"),
+                           1,
+                           GNUNET_NO);
+  prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
+                                    prq->type,
+                                    key,
+                                    &pr->bf,
+                                    pr->mingle,
+                                    pr->namespace, (pr->namespace != NULL) ? 
sizeof (GNUNET_HashCode) : 0,
+                                    prq->data,
+                                    prq->size);
+  switch (prq->eval)
+    {
+    case GNUNET_BLOCK_EVALUATION_OK_MORE:
+      update_request_performance_data (prq, pr);
+      break;
+    case GNUNET_BLOCK_EVALUATION_OK_LAST:
+      update_request_performance_data (prq, pr);
+      /* FIXME: adapt code to new API! */
+      while (NULL != pr->pending_head)
+       destroy_pending_message_list_entry (pr->pending_head);
+      if (pr->qe != NULL)
+       {
+         if (pr->client_request_list != NULL)
+           GNUNET_SERVER_receive_done 
(pr->client_request_list->client_list->client, 
+                                       GNUNET_YES);
+         GNUNET_DATASTORE_cancel (pr->qe);
+         pr->qe = NULL;
+       }
+      pr->do_remove = GNUNET_YES;
+      if (pr->task != GNUNET_SCHEDULER_NO_TASK)
+       {
+         GNUNET_SCHEDULER_cancel (pr->task);
+         pr->task = GNUNET_SCHEDULER_NO_TASK;
+       }
+      GNUNET_break (GNUNET_YES ==
+                   GNUNET_CONTAINER_multihashmap_remove (query_request_map,
+                                                         key,
+                                                         pr));
+      GNUNET_LOAD_update (rt_entry_lifetime,
+                         GNUNET_TIME_absolute_get_duration 
(pr->start_time).rel_value);
+      break;
+    case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# duplicate replies discarded 
(bloomfilter)"),
+                               1,
+                               GNUNET_NO);
+#if DEBUG_FS && 0
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Duplicate response `%s', discarding.\n",
+                 GNUNET_h2s (&mhash));
+#endif
+      return GNUNET_YES; /* duplicate */
+    case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
+      return GNUNET_YES; /* wrong namespace */ 
+    case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
+      GNUNET_break (0);
+      return GNUNET_YES;
+    case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
+      GNUNET_break (0);
+      return GNUNET_YES;
+    case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 _("Unsupported block type %u\n"),
+                 prq->type);
+      return GNUNET_NO;
+    }
+  /* FIXME: adapt code to new API! */
+  if (pr->client_request_list != NULL)
+    {
+      if (pr->replies_seen_size == pr->replies_seen_off)
+       GNUNET_array_grow (pr->replies_seen,
+                          pr->replies_seen_size,
+                          pr->replies_seen_size * 2 + 4);      
+      GNUNET_CRYPTO_hash (prq->data,
+                         prq->size,
+                         &pr->replies_seen[pr->replies_seen_off++]);         
+      refresh_bloomfilter (pr);
+    }
+  if (NULL == prq->sender)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Found result for query `%s' in local datastore\n",
+                 GNUNET_h2s (key));
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# results found locally"),
+                               1,
+                               GNUNET_NO);      
+    }
+  prq->priority += pr->remaining_priority;
+  pr->remaining_priority = 0;
+  pr->results_found++;
+  prq->request_found = GNUNET_YES;
+  /* finally, pass on to other peers / local clients */
+  pr->rh (pr->rh_cls, pr, prq->data, prq->size);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Continuation called to notify client about result of the
+ * operation.
+ *
+ * @param cls closure
+ * @param success GNUNET_SYSERR on failure
+ * @param msg NULL on success, otherwise an error message
+ */
+static void 
+put_migration_continuation (void *cls,
+                           int success,
+                           const char *msg)
+{
+  struct GNUNET_TIME_Absolute *start = cls;
+  struct GNUNET_TIME_Relative delay;
+  
+  delay = GNUNET_TIME_absolute_get_duration (*start);
+  GNUNET_free (start);
+  /* FIXME: should we really update the load value on failure? */
+  GNUNET_LOAD_update (datastore_put_load,
+                     delay.rel_value);
+  if (GNUNET_OK == success)
+    return;
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# datastore 'put' failures"),
+                           1,
+                           GNUNET_NO);
+}
+
+
+/**
+ * Test if the DATABASE (PUT) load on this peer is too high
+ * to even consider processing the query at
+ * all.  
+ * 
+ * @return GNUNET_YES if the load is too high to do anything (load high)
+ *         GNUNET_NO to process normally (load normal or low)
+ */
+static int
+test_put_load_too_high (uint32_t priority)
+{
+  double ld;
+
+  if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
+    return GNUNET_NO; /* very fast */
+  ld = GNUNET_LOAD_get_load (datastore_put_load);
+  if (ld < 2.0 * (1 + priority))
+    return GNUNET_NO;
+  GNUNET_STATISTICS_update (stats,
+                           gettext_noop ("# storage requests dropped due to 
high load"),
+                           1,
+                           GNUNET_NO);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Iterator called on each result obtained for a DHT
+ * operation that expects a reply
+ *
+ * @param cls closure
+ * @param exp when will this value expire
+ * @param key key of the result
+ * @param get_path NULL-terminated array of pointers
+ *                 to the peers on reverse GET path (or NULL if not recorded)
+ * @param put_path NULL-terminated array of pointers
+ *                 to the peers on the PUT path (or NULL if not recorded)
+ * @param type type of the result
+ * @param size number of bytes in data
+ * @param data pointer to the result data
+ */
+void
+GSF_handle_dht_reply_ (void *cls,
+                      struct GNUNET_TIME_Absolute exp,
+                      const GNUNET_HashCode * key,
+                      const struct GNUNET_PeerIdentity * const *get_path,
+                      const struct GNUNET_PeerIdentity * const *put_path,
+                      enum GNUNET_BLOCK_Type type,
+                      size_t size,
+                      const void *data)
+{
+  struct GSF_PendingRequest *pr = cls;
+  struct ProcessReplyClosure prq;
+
+  memset (&prq, 0, sizeof (prq));
+  prq.data = data;
+  prq.expiration = exp;
+  prq.size = size;  
+  prq.type = type;
+  process_reply (&prq, key, pr);
+  if ( (GNUNET_YES == active_to_migration) &&
+       (GNUNET_NO == test_put_load_too_high (prq.priority)) )
+    {      
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Replicating result for query `%s' with priority %u\n",
+                 GNUNET_h2s (&query),
+                 prq.priority);
+#endif
+      start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+      *start = GNUNET_TIME_absolute_get ();
+      GNUNET_DATASTORE_put (dsh,
+                           0, &query, dsize, &put[1],
+                           type, prq.priority, 1 /* anonymity */, 
+                           expiration, 
+                           1 + prq.priority, MAX_DATASTORE_QUEUE,
+                           GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+                           &put_migration_continuation, 
+                           start);
+    }
+}
+
+
+/**
  * Handle P2P "CONTENT" message.  Checks that the message is
  * well-formed and then checks if there are any pending requests for
  * this content and possibly passes it on (to local clients or other
  * peers).  Does NOT perform migration (content caching at this peer).
  *
- * @param other the other peer involved (sender or receiver, NULL
+ * @param cp the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)
  * @param message the actual message
- * @return how valueable was the content to us (0 for not at all),
+ * @return GNUNET_OK if the message was well-formed,
  *         GNUNET_SYSERR if the message was malformed (close connection,
  *         do not cache under any circumstances)
  */
 int
-GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other,
+GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
                         const struct GNUNET_MessageHeader *message)
 {
-  return GNUNET_SYSERR; // FIXME
+  const struct PutMessage *put;
+  uint16_t msize;
+  size_t dsize;
+  enum GNUNET_BLOCK_Type type;
+  struct GNUNET_TIME_Absolute expiration;
+  GNUNET_HashCode query;
+  struct ProcessReplyClosure prq;
+  struct GNUNET_TIME_Relative block_time;  
+  double putl;
+  struct GNUNET_TIME_Absolute *start;
+
+  msize = ntohs (message->size);
+  if (msize < sizeof (struct PutMessage))
+    {
+      GNUNET_break_op(0);
+      return GNUNET_SYSERR;
+    }
+  put = (const struct PutMessage*) message;
+  dsize = msize - sizeof (struct PutMessage);
+  type = ntohl (put->type);
+  expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
+  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+    return GNUNET_SYSERR;
+  if (GNUNET_OK !=
+      GNUNET_BLOCK_get_key (block_ctx,
+                           type,
+                           &put[1],
+                           dsize,
+                           &query))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+  /* now, lookup 'query' */
+  prq.data = (const void*) &put[1];
+  if (NULL != cp)
+    prq.sender = cp;
+  else
+    prq.sender = NULL;
+  prq.size = dsize;
+  prq.type = type;
+  prq.expiration = expiration;
+  prq.priority = 0;
+  prq.anonymity_level = 1;
+  prq.finished = GNUNET_NO;
+  prq.request_found = GNUNET_NO;
+  GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+                                             &query,
+                                             &process_reply,
+                                             &prq);
+  if (NULL != cp)
+    {
+      GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 
* prq.priority);
+      GSF_get_peer_performance_data (cp)->trust += prq.priority;
+    }
+  if ( (GNUNET_YES == active_to_migration) &&
+       (GNUNET_NO == test_put_load_too_high (prq.priority)) )
+    {      
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Replicating result for query `%s' with priority %u\n",
+                 GNUNET_h2s (&query),
+                 prq.priority);
+#endif
+      start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
+      *start = GNUNET_TIME_absolute_get ();
+      GNUNET_DATASTORE_put (dsh,
+                           0, &query, dsize, &put[1],
+                           type, prq.priority, 1 /* anonymity */, 
+                           expiration, 
+                           1 + prq.priority, MAX_DATASTORE_QUEUE,
+                           GNUNET_CONSTANTS_SERVICE_TIMEOUT,
+                           &put_migration_continuation, 
+                           start);
+    }
+  putl = GNUNET_LOAD_get_load (datastore_put_load);
+  if ( (NULL != (cp = prq.sender)) &&
+       (GNUNET_NO == prq.request_found) &&
+       ( (GNUNET_YES != active_to_migration) ||
+        (putl > 2.5 * (1 + prq.priority)) ) ) 
+    {
+      if (GNUNET_YES != active_to_migration) 
+       putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
+      block_time = GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MILLISECONDS,
+                                                 5000 + 
GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+                                                                               
   (unsigned int) (60000 * putl * putl)));
+      GSF_block_peer_migration (cp, block_time);
+    }
+  return GNUNET_OK;
 }
 
 
@@ -210,7 +931,7 @@
 void
 GSF_pending_request_init_ ()
 {
-  // FIXME
+  pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
 }
 
 
@@ -220,7 +941,11 @@
 void
 GSF_pending_request_done_ ()
 {
-  // FIXME
+  GNUNET_CONTAINER_multihashmap_iterate (pr_map,
+                                        &clean_request,
+                                        NULL);
+  GNUNET_CONTAINER_multihashmap_destroy (pr_map);
+  pr_map = NULL;
 }
 
 

Modified: gnunet/src/fs/gnunet-service-fs_pr.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.h        2011-02-09 21:01:47 UTC (rev 
14383)
+++ gnunet/src/fs/gnunet-service-fs_pr.h        2011-02-10 12:59:38 UTC (rev 
14384)
@@ -68,7 +68,9 @@
 
 
 /**
- * Public data associated with each pending request.
+ * Public data (in the sense of not encapsulated within
+ * 'gnunet-service-fs_pr', not in the sense of network-wide
+ * known) associated with each pending request.
  */
 struct GSF_PendingRequestData
 {
@@ -185,36 +187,18 @@
 
 
 /**
- * Get the query for a given pending request.
- *
- * @param pr the request
- * @return pointer to the query (only valid as long as pr is valid)
- */
-const GNUNET_HashCode *
-GSF_pending_request_get_query_ (const struct GSF_PendingRequest *pr);
-
-
-/**
- * Get the type of a given pending request.
- *
- * @param pr the request
- * @return query type
- */
-enum GNUNET_BLOCK_Type
-GSF_pending_request_get_type_ (const struct GSF_PendingRequest *pr);
-
-
-/**
  * Generate the message corresponding to the given pending request for
  * transmission to other peers (or at least determine its size).
  *
  * @param pr request to generate the message for
+ * @param do_route are we routing the reply
  * @param buf_size number of bytes available in buf
  * @param buf where to copy the message (can be NULL)
- * @return number of bytes needed (if > buf_size) or used
+ * @return number of bytes needed (if buf_size too small) or used
  */
 size_t
 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
+                                 int do_route,
                                  size_t buf_size,
                                  void *buf);
 
@@ -230,10 +214,12 @@
 
 /**
  * Signature of function called on each request.
+ * (Note: 'subtype' of GNUNET_CONTAINER_HashMapIterator).
  *
  * @param cls closure
  * @param key query for the request
  * @param pr handle to the pending request
+ * @return GNUNET_YES to continue to iterate
  */
 typedef int (*GSF_PendingRequestIterator)(void *cls,
                                          const GNUNET_HashCode *key,
@@ -257,19 +243,45 @@
  * this content and possibly passes it on (to local clients or other
  * peers).  Does NOT perform migration (content caching at this peer).
  *
- * @param other the other peer involved (sender or receiver, NULL
+ * @param cp the other peer involved (sender or receiver, NULL
  *        for loopback messages where we are both sender and receiver)
  * @param message the actual message
- * @return how valueable was the content to us (0 for not at all),
+ * @return GNUNET_OK if the message was well-formed,
  *         GNUNET_SYSERR if the message was malformed (close connection,
  *         do not cache under any circumstances)
  */
 int
-GSF_handle_p2p_content_ (const struct GNUNET_PeerIdentity *other,
+GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
                         const struct GNUNET_MessageHeader *message);
 
 
 /**
+ * Iterator called on each result obtained for a DHT
+ * operation that expects a reply
+ *
+ * @param cls closure, the 'struct GSF_PendingRequest *'.
+ * @param exp when will this value expire
+ * @param key key of the result
+ * @param get_path NULL-terminated array of pointers
+ *                 to the peers on reverse GET path (or NULL if not recorded)
+ * @param put_path NULL-terminated array of pointers
+ *                 to the peers on the PUT path (or NULL if not recorded)
+ * @param type type of the result
+ * @param size number of bytes in data
+ * @param data pointer to the result data
+ */
+void
+GSF_handle_dht_reply_ (void *cls,
+                      struct GNUNET_TIME_Absolute exp,
+                      const GNUNET_HashCode * key,
+                      const struct GNUNET_PeerIdentity * const *get_path,
+                      const struct GNUNET_PeerIdentity * const *put_path,
+                      enum GNUNET_BLOCK_Type type,
+                      size_t size,
+                      const void *data);
+
+
+/**
  * Setup the subsystem.
  */
 void

Added: gnunet/src/fs/gnunet-service-fs_push.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_push.c                              (rev 0)
+++ gnunet/src/fs/gnunet-service-fs_push.c      2011-02-10 12:59:38 UTC (rev 
14384)
@@ -0,0 +1,475 @@
+/*
+     This file is part of GNUnet.
+     (C) 2011 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_push.c
+ * @brief API to push content from our datastore to other peers
+ *            ('anonymous'-content P2P migration)
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet-service-fs_push.h"
+
+
+/* FIXME: below are only old code fragments to use... */
+
+/**
+ * Block that is ready for migration to other peers.  Actual data is at the 
end of the block.
+ */
+struct MigrationReadyBlock
+{
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct MigrationReadyBlock *next;
+
+  /**
+   * This is a doubly-linked list.
+   */
+  struct MigrationReadyBlock *prev;
+
+  /**
+   * Query for the block.
+   */
+  GNUNET_HashCode query;
+
+  /**
+   * When does this block expire? 
+   */
+  struct GNUNET_TIME_Absolute expiration;
+
+  /**
+   * Peers we would consider forwarding this
+   * block to.  Zero for empty entries.
+   */
+  GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
+
+  /**
+   * Size of the block.
+   */
+  size_t size;
+
+  /**
+   *  Number of targets already used.
+   */
+  unsigned int used_targets;
+
+  /**
+   * Type of the block.
+   */
+  enum GNUNET_BLOCK_Type type;
+};
+
+
+/**
+ * Head of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_head;
+
+/**
+ * Tail of linked list of blocks that can be migrated.
+ */
+static struct MigrationReadyBlock *mig_tail;
+
+/**
+ * Request to datastore for migration (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
+
+/**
+ * ID of task that collects blocks for migration.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier mig_task;
+
+/**
+ * What is the maximum frequency at which we are allowed to
+ * poll the datastore for migration content?
+ */
+static struct GNUNET_TIME_Relative min_migration_delay;
+
+/**
+ * Are we allowed to push out content from this peer.
+ */
+static int active_from_migration;
+
+/**
+ * Size of the doubly-linked list of migration blocks.
+ */
+static unsigned int mig_size;
+
+
+/**
+ * Delete the given migration block.
+ *
+ * @param mb block to delete
+ */
+static void
+delete_migration_block (struct MigrationReadyBlock *mb)
+{
+  GNUNET_CONTAINER_DLL_remove (mig_head,
+                              mig_tail,
+                              mb);
+  GNUNET_PEER_decrement_rcs (mb->target_list,
+                            MIGRATION_LIST_SIZE);
+  mig_size--;
+  GNUNET_free (mb);
+}
+
+
+/**
+ * Compare the distance of two peers to a key.
+ *
+ * @param key key
+ * @param p1 first peer
+ * @param p2 second peer
+ * @return GNUNET_YES if P1 is closer to key than P2
+ */
+static int
+is_closer (const GNUNET_HashCode *key,
+          const struct GNUNET_PeerIdentity *p1,
+          const struct GNUNET_PeerIdentity *p2)
+{
+  return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
+                                   &p2->hashPubKey,
+                                   key);
+}
+
+
+/**
+ * Consider migrating content to a given peer.
+ *
+ * @param cls 'struct MigrationReadyBlock*' to select
+ *            targets for (or NULL for none)
+ * @param key ID of the peer 
+ * @param value 'struct ConnectedPeer' of the peer
+ * @return GNUNET_YES (always continue iteration)
+ */
+static int
+consider_migration (void *cls,
+                   const GNUNET_HashCode *key,
+                   void *value)
+{
+  struct MigrationReadyBlock *mb = cls;
+  struct ConnectedPeer *cp = value;
+  struct MigrationReadyBlock *pos;
+  struct GNUNET_PeerIdentity cppid;
+  struct GNUNET_PeerIdentity otherpid;
+  struct GNUNET_PeerIdentity worstpid;
+  size_t msize;
+  unsigned int i;
+  unsigned int repl;
+  
+  /* consider 'cp' as a migration target for mb */
+  if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
+    return GNUNET_YES; /* peer has requested no migration! */
+  if (mb != NULL)
+    {
+      GNUNET_PEER_resolve (cp->pid,
+                          &cppid);
+      repl = MIGRATION_LIST_SIZE;
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (mb->target_list[i] == 0)
+           {
+             mb->target_list[i] = cp->pid;
+             GNUNET_PEER_change_rc (mb->target_list[i], 1);
+             repl = MIGRATION_LIST_SIZE;
+             break;
+           }
+         GNUNET_PEER_resolve (mb->target_list[i],
+                              &otherpid);
+         if ( (repl == MIGRATION_LIST_SIZE) &&
+              is_closer (&mb->query,
+                         &cppid,
+                         &otherpid)) 
+           {
+             repl = i;
+             worstpid = otherpid;
+           }
+         else if ( (repl != MIGRATION_LIST_SIZE) &&
+                   (is_closer (&mb->query,
+                               &worstpid,
+                               &otherpid) ) )
+           {
+             repl = i;
+             worstpid = otherpid;
+           }       
+       }
+      if (repl != MIGRATION_LIST_SIZE) 
+       {
+         GNUNET_PEER_change_rc (mb->target_list[repl], -1);
+         mb->target_list[repl] = cp->pid;
+         GNUNET_PEER_change_rc (mb->target_list[repl], 1);
+       }
+    }
+
+  /* consider scheduling transmission to cp for content migration */
+  if (cp->cth != NULL)        
+    return GNUNET_YES; 
+  msize = 0;
+  pos = mig_head;
+  while (pos != NULL)
+    {
+      for (i=0;i<MIGRATION_LIST_SIZE;i++)
+       {
+         if (cp->pid == pos->target_list[i])
+           {
+             if (msize == 0)
+               msize = pos->size;
+             else
+               msize = GNUNET_MIN (msize,
+                                   pos->size);
+             break;
+           }
+       }
+      pos = pos->next;
+    }
+  if (msize == 0)
+    return GNUNET_YES; /* no content available */
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Trying to migrate at least %u bytes to peer `%s'\n",
+             msize,
+             GNUNET_h2s (key));
+#endif
+  if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
+    {
+      GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
+      cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
+    }
+  cp->cth 
+    = GNUNET_CORE_notify_transmit_ready (core,
+                                        0, GNUNET_TIME_UNIT_FOREVER_REL,
+                                        (const struct GNUNET_PeerIdentity*) 
key,
+                                        msize + sizeof (struct PutMessage),
+                                        &transmit_to_peer,
+                                        cp);
+  return GNUNET_YES;
+}
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ * 
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+
+
+/**
+ * If the migration task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_migration_gathering ()
+{
+  struct GNUNET_TIME_Relative delay;
+
+  if (dsh == NULL)
+    return;
+  if (mig_qe != NULL)
+    return;
+  if (mig_task != GNUNET_SCHEDULER_NO_TASK)
+    return;
+  delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+                                        mig_size);
+  delay = GNUNET_TIME_relative_divide (delay,
+                                      MAX_MIGRATION_QUEUE);
+  delay = GNUNET_TIME_relative_max (delay,
+                                   min_migration_delay);
+  mig_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                          &gather_migration_blocks,
+                                          NULL);
+}
+
+
+
+
+/**
+ * Process content offered for migration.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ */
+static void
+process_migration_content (void *cls,
+                          const GNUNET_HashCode * key,
+                          size_t size,
+                          const void *data,
+                          enum GNUNET_BLOCK_Type type,
+                          uint32_t priority,
+                          uint32_t anonymity,
+                          struct GNUNET_TIME_Absolute
+                          expiration, uint64_t uid)
+{
+  struct MigrationReadyBlock *mb;
+  
+  if (key == NULL)
+    {
+      mig_qe = NULL;
+      if (mig_size < MAX_MIGRATION_QUEUE)  
+       consider_migration_gathering ();
+      return;
+    }
+  if (GNUNET_TIME_absolute_get_remaining (expiration).rel_value < 
+      MIN_MIGRATION_CONTENT_LIFETIME.rel_value)
+    {
+      /* content will expire soon, don't bother */
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+      return;
+    }
+  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+    {
+      if (GNUNET_OK !=
+         GNUNET_FS_handle_on_demand_block (key, size, data,
+                                           type, priority, anonymity,
+                                           expiration, uid, 
+                                           &process_migration_content,
+                                           NULL))
+       {
+         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       }
+      return;
+    }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrieved block `%s' of type %u for migration\n",
+             GNUNET_h2s (key),
+             type);
+#endif
+  mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
+  mb->query = *key;
+  mb->expiration = expiration;
+  mb->size = size;
+  mb->type = type;
+  memcpy (&mb[1], data, size);
+  GNUNET_CONTAINER_DLL_insert_after (mig_head,
+                                    mig_tail,
+                                    mig_tail,
+                                    mb);
+  mig_size++;
+  GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
+                                        &consider_migration,
+                                        mb);
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+}
+
+
+
+/**
+ * Task that is run periodically to obtain blocks for content
+ * migration
+ * 
+ * @param cls unused
+ * @param tc scheduler context (also unused)
+ */
+static void
+gather_migration_blocks (void *cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  mig_task = GNUNET_SCHEDULER_NO_TASK;
+  if (dsh != NULL)
+    {
+      mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
+                                           GNUNET_TIME_UNIT_FOREVER_REL,
+                                           &process_migration_content, NULL);
+      GNUNET_assert (mig_qe != NULL);
+    }
+}
+
+
+
+size_t
+API_ (void *cls,
+      size_t size, void *buf)
+{
+    next = mig_head;
+      while (NULL != (mb = next))
+       {
+         next = mb->next;
+         for (i=0;i<MIGRATION_LIST_SIZE;i++)
+           {
+             if ( (cp->pid == mb->target_list[i]) &&
+                  (mb->size + sizeof (migm) <= size) )
+               {
+                 GNUNET_PEER_change_rc (mb->target_list[i], -1);
+                 mb->target_list[i] = 0;
+                 mb->used_targets++;
+                 memset (&migm, 0, sizeof (migm));
+                 migm.header.size = htons (sizeof (migm) + mb->size);
+                 migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
+                 migm.type = htonl (mb->type);
+                 migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
+                 memcpy (&cbuf[msize], &migm, sizeof (migm));
+                 msize += sizeof (migm);
+                 size -= sizeof (migm);
+                 memcpy (&cbuf[msize], &mb[1], mb->size);
+                 msize += mb->size;
+                 size -= mb->size;
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Pushing migration block `%s' (%u bytes) to 
`%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             (unsigned int) mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+                 break;
+               }
+             else
+               {
+#if DEBUG_FS
+                 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                             "Migration block `%s' (%u bytes) is not on 
migration list for peer `%s'\n",
+                             GNUNET_h2s (&mb->query),
+                             (unsigned int) mb->size,
+                             GNUNET_i2s (&pid));
+#endif   
+               }
+           }
+         if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
+              (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size 
(connected_peers)) )
+           {
+             delete_migration_block (mb);
+             consider_migration_gathering ();
+           }
+       }
+      consider_migration (NULL, 
+                         &pid.hashPubKey,
+                         cp);
+
+}
+
+
+

Added: gnunet/src/fs/gnunet-service-fs_put.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_put.c                               (rev 0)
+++ gnunet/src/fs/gnunet-service-fs_put.c       2011-02-10 12:59:38 UTC (rev 
14384)
@@ -0,0 +1,197 @@
+/*
+     This file is part of GNUnet.
+     (C) 2011 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_put.c
+ * @brief API to PUT zero-anonymity index data from our datastore into the DHT
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet-service-fs_put.h"
+
+/* FIXME: below are only old code fragments to use... */
+
+
+/**
+ * Request to datastore for DHT PUTs (or NULL).
+ */
+static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+
+
+/**
+ * Type we will request for the next DHT PUT round from the datastore.
+ */
+static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+
+/**
+ * ID of task that collects blocks for DHT PUTs.
+ */
+static GNUNET_SCHEDULER_TaskIdentifier dht_task;
+
+/**
+ * How many entires with zero anonymity do we currently estimate
+ * to have in the database?
+ */
+static unsigned int zero_anonymity_count_estimate;
+
+
+
+
+
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ * 
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
+ */
+static void
+gather_dht_put_blocks (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+
+/**
+ * If the DHT PUT gathering task is not currently running, consider
+ * (re)scheduling it with the appropriate delay.
+ */
+static void
+consider_dht_put_gathering (void *cls)
+{
+  struct GNUNET_TIME_Relative delay;
+
+  if (dsh == NULL)
+    return;
+  if (dht_qe != NULL)
+    return;
+  if (dht_task != GNUNET_SCHEDULER_NO_TASK)
+    return;
+  if (zero_anonymity_count_estimate > 0)
+    {
+      delay = GNUNET_TIME_relative_divide 
(GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
+                                          zero_anonymity_count_estimate);
+      delay = GNUNET_TIME_relative_min (delay,
+                                       MAX_DHT_PUT_FREQ);
+    }
+  else
+    {
+      /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
+        (hopefully) appear */
+      delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
+    }
+  dht_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                          &gather_dht_put_blocks,
+                                          cls);
+}
+
+
+
+/**
+ * Store content in DHT.
+ *
+ * @param cls closure
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ */
+static void
+process_dht_put_content (void *cls,
+                        const GNUNET_HashCode * key,
+                        size_t size,
+                        const void *data,
+                        enum GNUNET_BLOCK_Type type,
+                        uint32_t priority,
+                        uint32_t anonymity,
+                        struct GNUNET_TIME_Absolute
+                        expiration, uint64_t uid)
+{ 
+  static unsigned int counter;
+  static GNUNET_HashCode last_vhash;
+  static GNUNET_HashCode vhash;
+
+  if (key == NULL)
+    {
+      dht_qe = NULL;
+      consider_dht_put_gathering (cls);
+      return;
+    }
+  /* slightly funky code to estimate the total number of values with zero
+     anonymity from the maximum observed length of a monotonically increasing 
+     sequence of hashes over the contents */
+  GNUNET_CRYPTO_hash (data, size, &vhash);
+  if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
+    {
+      if (zero_anonymity_count_estimate > 0)
+       zero_anonymity_count_estimate /= 2;
+      counter = 0;
+    }
+  last_vhash = vhash;
+  if (counter < 31)
+    counter++;
+  if (zero_anonymity_count_estimate < (1 << counter))
+    zero_anonymity_count_estimate = (1 << counter);
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Retrieved block `%s' of type %u for DHT PUT\n",
+             GNUNET_h2s (key),
+             type);
+#endif
+  GNUNET_DHT_put (dht_handle,
+                 key,
+                 DEFAULT_PUT_REPLICATION,
+                 GNUNET_DHT_RO_NONE,
+                 type,
+                 size,
+                 data,
+                 expiration,
+                 GNUNET_TIME_UNIT_FOREVER_REL,
+                 &dht_put_continuation,
+                 cls);
+}
+
+
+
+/**
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ * 
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
+ */
+static void
+gather_dht_put_blocks (void *cls,
+                      const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  dht_task = GNUNET_SCHEDULER_NO_TASK;
+  if (dsh != NULL)
+    {
+      if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+       dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+      dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
+                                                   
GNUNET_TIME_UNIT_FOREVER_REL,
+                                                   dht_put_type++,
+                                                   &process_dht_put_content, 
NULL);
+      GNUNET_assert (dht_qe != NULL);
+    }
+}




reply via email to

[Prev in Thread] Current Thread [Next in Thread]