gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r17010 - gnunet/src/dht


From: gnunet
Subject: [GNUnet-SVN] r17010 - gnunet/src/dht
Date: Mon, 26 Sep 2011 00:16:21 +0200

Author: grothoff
Date: 2011-09-26 00:16:20 +0200 (Mon, 26 Sep 2011)
New Revision: 17010

Added:
   gnunet/src/dht/gnunet-service-dht.h
   gnunet/src/dht/gnunet-service-dht_datacache.c
   gnunet/src/dht/gnunet-service-dht_datacache.h
Modified:
   gnunet/src/dht/gnunet-service-dht-new.c
   gnunet/src/dht/gnunet-service-dht_clients.c
   gnunet/src/dht/gnunet-service-dht_clients.h
   gnunet/src/dht/gnunet-service-dht_neighbours.c
   gnunet/src/dht/gnunet-service-dht_neighbours.h
Log:
wild hxing

Modified: gnunet/src/dht/gnunet-service-dht-new.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht-new.c     2011-09-24 07:16:15 UTC (rev 
17009)
+++ gnunet/src/dht/gnunet-service-dht-new.c     2011-09-25 22:16:20 UTC (rev 
17010)
@@ -391,11 +391,6 @@
 static unsigned int newly_found_peers;
 
 /**
- * Handle to the datacache service (for inserting/retrieving data)
- */
-static struct GNUNET_DATACACHE_Handle *datacache;
-
-/**
  * Handle for the statistics service.
  */
 struct GNUNET_STATISTICS_Handle *stats;
@@ -1260,132 +1255,8 @@
 }
 
 
-/**
- * Iterator for local get request results,
- *
- * @param cls closure for iterator, a DatacacheGetContext
- * @param exp when does this value expire?
- * @param key the key this data is stored under
- * @param size the size of the data identified by key
- * @param data the actual data
- * @param type the type of the data
- *
- * @return GNUNET_OK to continue iteration, anything else
- * to stop iteration.
- */
-static int
-datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
-                        const GNUNET_HashCode * key, size_t size,
-                        const char *data, enum GNUNET_BLOCK_Type type)
-{
-  struct DHT_MessageContext *msg_ctx = cls;
-  struct DHT_MessageContext new_msg_ctx;
-  struct GNUNET_DHT_GetResultMessage *get_result;
-  enum GNUNET_BLOCK_EvaluationResult eval;
-  const struct DHTPutEntry *put_entry;
-  int get_size;
-  char *path_offset;
 
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s:%s': Received `%s' response from datacache\n", my_short_id,
-              "DHT", "GET");
-#endif
 
-  put_entry = (const struct DHTPutEntry *) data;
-
-  if (size !=
-      sizeof (struct DHTPutEntry) + put_entry->data_size +
-      (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)))
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Path + data size doesn't add up for data inserted into 
datacache!\nData size %d, path length %d, expected %d, got %d\n",
-                put_entry->data_size, put_entry->path_length,
-                sizeof (struct DHTPutEntry) + put_entry->data_size +
-                (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)),
-                size);
-    msg_ctx->do_forward = GNUNET_NO;
-    return GNUNET_OK;
-  }
-
-  eval =
-      GNUNET_BLOCK_evaluate (block_context, type, key, &msg_ctx->reply_bf,
-                             msg_ctx->reply_bf_mutator, msg_ctx->xquery,
-                             msg_ctx->xquery_size, &put_entry[1],
-                             put_entry->data_size);
-
-  switch (eval)
-  {
-  case GNUNET_BLOCK_EVALUATION_OK_LAST:
-    msg_ctx->do_forward = GNUNET_NO;
-  case GNUNET_BLOCK_EVALUATION_OK_MORE:
-    memcpy (&new_msg_ctx, msg_ctx, sizeof (struct DHT_MessageContext));
-    if (GNUNET_DHT_RO_RECORD_ROUTE ==
-        (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
-    {
-      new_msg_ctx.msg_options = GNUNET_DHT_RO_RECORD_ROUTE;
-    }
-
-    get_size =
-        sizeof (struct GNUNET_DHT_GetResultMessage) + put_entry->data_size +
-        (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity));
-    get_result = GNUNET_malloc (get_size);
-    get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
-    get_result->header.size = htons (get_size);
-    get_result->expiration = GNUNET_TIME_absolute_hton (exp);
-    get_result->type = htons (type);
-    get_result->put_path_length = htons (put_entry->path_length);
-    path_offset = (char *) &put_entry[1];
-    path_offset += put_entry->data_size;
-    /* Copy the actual data and the path_history to the end of the get result 
*/
-    memcpy (&get_result[1], &put_entry[1],
-            put_entry->data_size +
-            (put_entry->path_length * sizeof (struct GNUNET_PeerIdentity)));
-    new_msg_ctx.peer = my_identity;
-    new_msg_ctx.bloom = NULL;
-    new_msg_ctx.hop_count = 0;
-    new_msg_ctx.importance = DHT_DEFAULT_P2P_IMPORTANCE + 2;   /* Make result 
routing a higher priority */
-    new_msg_ctx.timeout = DHT_DEFAULT_P2P_TIMEOUT;
-    increment_stats (STAT_GET_RESPONSE_START);
-    route_result_message (&get_result->header, &new_msg_ctx);
-    GNUNET_free (get_result);
-    break;
-  case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
-#if DEBUG_DHT
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s:%s': Duplicate block error\n",
-                my_short_id, "DHT");
-#endif
-    break;
-  case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
-#if DEBUG_DHT
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "`%s:%s': Invalid request error\n",
-                my_short_id, "DHT");
-#endif
-    break;
-  case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
-#if DEBUG_DHT
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "`%s:%s': Valid request, no results.\n", my_short_id, "DHT");
-#endif
-    GNUNET_break (0);
-    break;
-  case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
-    GNUNET_break_op (0);
-    msg_ctx->do_forward = GNUNET_NO;
-    break;
-  case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
-#if DEBUG_DHT
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "`%s:%s': Unsupported block type (%u) in response!\n",
-                my_short_id, "DHT", type);
-#endif
-    /* msg_ctx->do_forward = GNUNET_NO;  // not sure... */
-    break;
-  }
-  return GNUNET_OK;
-}
-
-
 /**
  * Main function that handles whether or not to route a message to other
  * peers.
@@ -1464,10 +1335,6 @@
   increment_stats (STAT_GETS);
   results = 0;
   msg_ctx->do_forward = GNUNET_YES;
-  if (datacache != NULL)
-    results =
-        GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type,
-                              &datacache_get_iterator, msg_ctx);
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s:%s': Found %d results for `%s' request uid %llu\n",
@@ -1826,37 +1693,7 @@
 #endif
 
   increment_stats (STAT_PUTS_INSERTED);
-  if (datacache != NULL)
-  {
-    /* Put size is actual data size plus struct overhead plus path length (if 
any) */
-    put_size =
-        data_size + sizeof (struct DHTPutEntry) +
-        (msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
-    put_entry = GNUNET_malloc (put_size);
-    put_entry->data_size = data_size;
-    put_entry->path_length = msg_ctx->path_history_len;
-    /* Copy data to end of put entry */
-    memcpy (&put_entry[1], &put_msg[1], data_size);
-    if (msg_ctx->path_history_len > 0)
-    {
-      /* Copy path after data */
-      path_offset = (char *) &put_entry[1];
-      path_offset += data_size;
-      memcpy (path_offset, msg_ctx->path_history,
-              msg_ctx->path_history_len * sizeof (struct GNUNET_PeerIdentity));
-    }
 
-    ret =
-        GNUNET_DATACACHE_put (datacache, &msg_ctx->key, put_size,
-                              (const char *) put_entry, put_type,
-                              GNUNET_TIME_absolute_ntoh (put_msg->expiration));
-    GNUNET_free (put_entry);
-  }
-  else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "`%s:%s': %s request received, but have no datacache!\n",
-                my_short_id, "DHT", "PUT");
-
   route_message (msg, msg_ctx);
 }
 
@@ -2366,6 +2203,7 @@
     transport_handle = NULL;
   }
   GDS_NEIGHBOURS_done ();
+  GDS_DATACACHE_done ();
   GDS_NSE_done ();
   for (bucket_count = lowest_bucket; bucket_count < MAX_BUCKETS; 
bucket_count++)
   {
@@ -2380,15 +2218,6 @@
       delete_peer (pos, bucket_count);
     }
   }
-  if (datacache != NULL)
-  {
-#if DEBUG_DHT
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s:%s Destroying datacache!\n",
-                my_short_id, "DHT");
-#endif
-    GNUNET_DATACACHE_destroy (datacache);
-    datacache = NULL;
-  }
   if (stats != NULL)
   {
     GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
@@ -2418,7 +2247,7 @@
   unsigned long long temp_config_num;
 
   cfg = c;
-  datacache = GNUNET_DATACACHE_create (cfg, "dhtcache");
+  GDS_DATACACHE_init ();
   coreAPI = GNUNET_CORE_connect (cfg,   /* Main configuration */
                                  DEFAULT_CORE_QUEUE_SIZE,       /* queue size 
*/
                                  NULL,  /* Closure passed to DHT functions */

Added: gnunet/src/dht/gnunet-service-dht.h
===================================================================
--- gnunet/src/dht/gnunet-service-dht.h                         (rev 0)
+++ gnunet/src/dht/gnunet-service-dht.h 2011-09-25 22:16:20 UTC (rev 17010)
@@ -0,0 +1,43 @@
+/*
+     This file is part of GNUnet.
+     (C) 2011 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file dht/gnunet-service-dht.h
+ * @brief GNUnet DHT globals
+ * @author Christian Grothoff
+ */
+#ifndef GNUNET_SERVICE_DHT_H
+#define GNUNET_SERVICE_DHT_H
+
+#include "gnunet_util_lib.h"
+
+/**
+ * Configuration we use.
+ */
+extern struct GNUNET_ConfigurationHandle *GDS_cfg;
+
+
+/**
+ * Our handle to the BLOCK library.
+ */
+extern struct GNUNET_BLOCK_Context *GDS_block_context;
+
+
+#endif

Modified: gnunet/src/dht/gnunet-service-dht_clients.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht_clients.c 2011-09-24 07:16:15 UTC (rev 
17009)
+++ gnunet/src/dht/gnunet-service-dht_clients.c 2011-09-25 22:16:20 UTC (rev 
17010)
@@ -39,6 +39,7 @@
 #include "dht_new.h"
 #include <fenv.h>
 #include "gnunet-service-dht_clients.h"
+#include "gnunet-service-dht_datacache.h"
 #include "gnunet-service-dht_neighbours.h"
 
 
@@ -403,6 +404,22 @@
       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
     }
   dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
+  /* give to local clients */
+  GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+                          &dht_msg->key,
+                          0, NULL,
+                          0, NULL,
+                          ntohl (dht_msg->type),
+                          size - sizeof (struct GNUNET_DHT_ClientPutMessage),
+                          &dht_msg[1]);
+  /* store locally */
+  GST_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+                           &dht_msg->key,
+                           0, NULL,
+                           ntohl (dht_msg->type),
+                           size - sizeof (struct GNUNET_DHT_ClientPutMessage),
+                           &dht_msg[1]);
+  /* route to other peers */
   GST_NEIGHBOURS_handle_put (ntohl (dht_msg->type),
                             ntohl (dht_msg->options),
                             ntohl (dht_msg->desired_replication_level),
@@ -446,6 +463,7 @@
   get = (const struct GNUNET_DHT_ClientGetMessage *) message;
   xquery = (const char*) &get[1];
 
+  
   cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
   cqr->key = get->key;
   cqr->client = find_active_client (client);
@@ -458,12 +476,19 @@
   cqr->xquery_size = xquery_size;
   cqr->replication = ntohl (get->desired_replication_level);
   cqr->msg_options = ntohl (get->options);
-  cqr->msg_type = ntohl (get->type);
+  cqr->msg_type = ntohl (get->type);  
   GNUNET_CONTAINER_multihashmap_put (forward_map, KEY, cqr,
                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+  /* start remote requests */
   if (GNUNET_SCHEDULER_NO_TASK != retry_task)
     GNUNET_SCHEDULER_cancel (retry_task);
   retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL);
+  /* perform local lookup */
+  GDS_DATACACHE_handle_get (&get->key,
+                           cqr->msg_type,
+                           cqr->xquery,
+                           xquery_size,
+                           NULL, 0);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 

Modified: gnunet/src/dht/gnunet-service-dht_clients.h
===================================================================
--- gnunet/src/dht/gnunet-service-dht_clients.h 2011-09-24 07:16:15 UTC (rev 
17009)
+++ gnunet/src/dht/gnunet-service-dht_clients.h 2011-09-25 22:16:20 UTC (rev 
17010)
@@ -27,6 +27,7 @@
 #ifndef GNUNET_SERVICE_DHT_CLIENTS_H
 #define GNUNET_SERVICE_DHT_CLIENTS_H
 
+
 /**
  * Handle a reply we've received from another peer.  If the reply
  * matches any of our pending queries, forward it to the respective

Added: gnunet/src/dht/gnunet-service-dht_datacache.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht_datacache.c                               
(rev 0)
+++ gnunet/src/dht/gnunet-service-dht_datacache.c       2011-09-25 22:16:20 UTC 
(rev 17010)
@@ -0,0 +1,315 @@
+/*
+     This file is part of GNUnet.
+     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file dht/gnunet-service-dht_datacache.c
+ * @brief GNUnet DHT service's datacache integration
+ * @author Christian Grothoff
+ * @author Nathan Evans
+ */
+#include "gnunet-service-dht_datacache.h"
+
+
+/**
+ * Handle to the datacache service (for inserting/retrieving data)
+ */
+static struct GNUNET_DATACACHE_Handle *datacache;
+
+
+/**
+ * Entry for inserting data into datacache from the DHT.
+ */
+struct DHTPutEntry
+{
+  /**
+   * Size of data.
+   */
+  uint16_t data_size;
+
+  /**
+   * Length of recorded path.
+   */
+  uint16_t path_length;
+
+  /* PATH ENTRIES */
+
+  /* PUT DATA */
+
+};
+
+
+/**
+ * Handle a datum we've received from another peer.  Cache if
+ * possible.
+ *
+ * @param expiration when will the reply expire
+ * @param key the query this reply is for
+ * @param put_path_length number of peers in 'put_path'
+ * @param put_path path the reply took on put
+ * @param type type of the reply
+ * @param data_size number of bytes in 'data'
+ * @param data application payload data
+ */
+void
+GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration,
+                         const GNUNET_HashCode *key,
+                         unsigned int put_path_length,
+                         const struct GNUNET_PeerIdentity *put_path,
+                         uint32_t type,
+                         size_t data_size,
+                         const void *data)
+{
+  size_t plen = data_size + put_path_length * sizeof(struct 
GNUNET_PeerIdentity) + sizeof(struct DHTPutEntry);
+  char buf[plen];
+  struct DHTPutEntry *pe;
+  struct GNUNET_PeerIdentity *pp;
+  char *path_offset;
+
+  if (datacache == NULL)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "%s request received, but have no datacache!\n",
+                 "PUT");      
+      return;
+    }
+  if (data_size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+    {
+      GNUNET_break (0);
+      return;
+    }
+  /* Put size is actual data size plus struct overhead plus path length (if 
any) */
+  pe = (struct DHTPutEntry *) buf;
+  pe->data_size = htons (data_size);
+  pe->path_length = htons ((uint16_t) put_path_length);
+  pp = (struct GNUNET_PeerIdentity *) &pe[1];
+  memcpy (pp, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity));
+  memcpy (&pp[put_path_length],
+         data, data_size);
+  (void) GNUNET_DATACACHE_put (datacache, key, 
+                              plen, (const char *) pe, type,
+                              expiration);
+}
+
+
+/**
+ * Context containing information about a GET request.
+ */
+struct GetRequestContext
+{
+  /**
+   * extended query (see gnunet_block_lib.h).
+   */
+  const void *xquery;
+
+  /**
+   * Bloomfilter to filter out duplicate replies (updated)
+   */
+  struct GNUNET_CONTAINER_BloomFilter **reply_bf;
+
+  /**
+   * The key this request was about
+   */
+  GNUNET_HashCode key;
+
+  /**
+   * Number of bytes in xquery.
+   */
+  size_t xquery_size;
+
+  /**
+   * Mutator value for the reply_bf, see gnunet_block_lib.h
+   */
+  uint32_t reply_bf_mutator;
+
+};
+
+
+/**
+ * Iterator for local get request results,
+ *
+ * @param cls closure for iterator, a DatacacheGetContext
+ * @param exp when does this value expire?
+ * @param key the key this data is stored under
+ * @param size the size of the data identified by key
+ * @param data the actual data
+ * @param type the type of the data
+ *
+ * @return GNUNET_OK to continue iteration, anything else
+ * to stop iteration.
+ */
+static int
+datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
+                        const GNUNET_HashCode * key, size_t size,
+                        const char *data, enum GNUNET_BLOCK_Type type)
+{
+  struct GetRequestContext *ctx = cls;
+  const struct DHTPutEntry *pe;
+  const struct GNUNET_PeerIdentity *pp;
+  const char *data;
+  size_t data_size;
+  uint16_t put_path_length;
+  enum GNUNET_BLOCK_EvaluationResult eval;
+
+  pe = (const struct DHTPutEntry *) data;
+  put_path_length = ntohs (pe->path_length);
+  data_size = ntohs (pe->data_size);
+
+  if (size !=
+      sizeof (struct DHTPutEntry) + data_size +
+      (put_path_length * sizeof (struct GNUNET_PeerIdentity)))
+  {
+    GNUNET_break (0);
+    return GNUNET_OK;
+  }
+  pp = (const struct GNUNET_PeerIdentity *) &pe[1];
+  data = (const char*) &pp[put_path_length];
+  eval =
+      GNUNET_BLOCK_evaluate (block_context, type, key, 
+                            ctx->reply_bf,
+                             ctx->reply_bf_mutator,
+                            ctx->xquery,
+                             ctx->xquery_size, 
+                            data,
+                             data_size);
+  switch (eval)
+  {
+  case GNUNET_BLOCK_EVALUATION_OK_LAST:
+  case GNUNET_BLOCK_EVALUATION_OK_MORE:
+    /* forward to local clients */
+    GDS_CLIENT_handle_reply (exp,
+                            key,
+                            0, NULL,
+                            put_path_length, pp,
+                            type, data_size, data);
+    /* forward to other peers */
+    GDS_NEIGHBOURS_handle_reply (type, exp,
+                                key, put_path_length, pp, 
+                                0, NULL, data, data_size);
+    break;
+  case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
+    break;
+  case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
+    break;
+  case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
+    GNUNET_break (0);
+    break;
+  case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Unsupported block type (%u) in local response!\n",
+                type);
+    break;
+  }
+  return GNUNET_OK;
+}
+
+
+/**
+ * Context containing information about a GET request.
+ */
+struct GetRequestContext
+{
+  /**
+   * extended query (see gnunet_block_lib.h).
+   */
+  const void *xquery;
+
+  /**
+   * Bloomfilter to filter out duplicate replies (updated)
+   */
+  struct GNUNET_CONTAINER_BloomFilter **reply_bf;
+
+  /**
+   * The key this request was about
+   */
+  GNUNET_HashCode key;
+
+  /**
+   * Number of bytes in xquery.
+   */
+  size_t xquery_size;
+
+  /**
+   * Mutator value for the reply_bf, see gnunet_block_lib.h
+   */
+  uint32_t reply_bf_mutator;
+
+};
+
+
+/**
+ * Handle a GET request we've received from another peer.
+ *
+ * @param key the query 
+ * @param type requested data type
+ * @param xquery extended query
+ * @param xquery_size number of bytes in xquery
+ * @param reply_bf where the reply bf is (to be) stored, possibly updated, can 
be NULL
+ * @param reply_bf_mutator mutation value for reply_bf
+ */
+void
+GDS_DATACACHE_handle_get (const GNUNET_HashCode *key,
+                         uint32_t type,
+                         const void *xquery,
+                         size_t xquery_size,
+                         struct GNUNET_CONTAINER_BloomFilter **reply_bf,
+                         uint32_t reply_bf_mutator)
+{
+  struct GetRequestContext ctx;
+
+  if (datacache == NULL)
+    return;
+  ctx.key = *key;
+  ctx.xquery = xquery;
+  ctx.xquery_size = xquery_size;
+  ctx.reply_bf = reply_bf;
+  ctx.reply_bf_mutator = reply_bf_mutator;
+  (void) GNUNET_DATACACHE_get (datacache, &msg_ctx->key, type,
+                              &datacache_get_iterator, &ctx);
+}
+
+
+/**
+ * Initialize datacache subsystem.
+ */
+void 
+GDS_DATACACHE_init ()
+{
+  datacache = GNUNET_DATACACHE_create (cfg, "dhtcache");
+}
+
+
+/**
+ * Shutdown datacache subsystem.
+ */
+void
+GDS_DATACACHE_done ()
+{
+  if (datacache != NULL)
+  {
+    GNUNET_DATACACHE_destroy (datacache);
+    datacache = NULL;
+  }
+}
+
+
+/* end of gnunet-service-dht_datacache.c */

Added: gnunet/src/dht/gnunet-service-dht_datacache.h
===================================================================
--- gnunet/src/dht/gnunet-service-dht_datacache.h                               
(rev 0)
+++ gnunet/src/dht/gnunet-service-dht_datacache.h       2011-09-25 22:16:20 UTC 
(rev 17010)
@@ -0,0 +1,84 @@
+/*
+     This file is part of GNUnet.
+     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+     Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file dht/gnunet-service-dht_datacache.h
+ * @brief GNUnet DHT service's datacache integration
+ * @author Christian Grothoff
+ * @author Nathan Evans
+ */
+#ifndef GNUNET_SERVICE_DHT_DATACACHE_H
+#define GNUNET_SERVICE_DHT_DATACACHE_H
+
+/**
+ * Handle a datum we've received from another peer.  Cache if
+ * possible.
+ *
+ * @param expiration when will the reply expire
+ * @param key the query this reply is for
+ * @param put_path_length number of peers in 'put_path'
+ * @param put_path path the reply took on put
+ * @param type type of the reply
+ * @param data_size number of bytes in 'data'
+ * @param data application payload data
+ */
+void
+GDS_DATACACHE_handle_put (struct GNUNET_TIME_Absolute expiration,
+                         const GNUNET_HashCode *key,
+                         unsigned int put_path_length,
+                         const struct GNUNET_PeerIdentity *put_path,
+                         uint32_t type,
+                         size_t data_size,
+                         const void *data);
+
+
+/**
+ * Handle a GET request we've received from another peer.
+ *
+ * @param key the query 
+ * @param type requested data type
+ * @param xquery extended query
+ * @param xquery_size number of bytes in xquery
+ * @param reply_bf where the reply bf is (to be) stored, possibly updated!, 
can be NULL
+ * @param reply_bf_mutator mutation value for reply_bf
+ */
+void
+GDS_DATACACHE_handle_get (const GNUNET_HashCode *key,
+                         uint32_t type,
+                         const void *xquery,
+                         size_t xquery_size,
+                         struct GNUNET_CONTAINER_BloomFilter **reply_bf,
+                         uint32_t reply_bf_mutator);
+
+
+/**
+ * Initialize datacache subsystem.
+ */
+void 
+GDS_DATACACHE_init (void);
+
+
+/**
+ * Shutdown datacache subsystem.
+ */
+void
+GDS_DATACACHE_done (void);
+
+#endif

Modified: gnunet/src/dht/gnunet-service-dht_neighbours.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht_neighbours.c      2011-09-24 07:16:15 UTC 
(rev 17009)
+++ gnunet/src/dht/gnunet-service-dht_neighbours.c      2011-09-25 22:16:20 UTC 
(rev 17010)
@@ -37,6 +37,7 @@
 #include "gnunet_dht_service.h"
 #include "gnunet_statistics_service.h"
 #include "dht.h"
+#include "gnunet-service-dht_datacache.h"
 #include <fenv.h>
 
 /**
@@ -86,13 +87,16 @@
   uint32_t desired_replication_level GNUNET_PACKED;
 
   /**
-   * Generic route path length for a message in the
-   * DHT that arrived at a peer and generated
-   * a reply. Copied to the end of this message.
+   * Length of the PUT path that follows (if tracked).
    */
-  uint32_t outgoing_path_length GNUNET_PACKED;
+  uint32_t put_path_length GNUNET_PACKED;
 
   /**
+   * When does the content expire?
+   */
+  struct GNUNET_TIME_AbsoluteNBO expiration_time;
+
+  /**
    * Bloomfilter (for peer identities) to stop circular routes
    */
   char bloomfilter[DHT_BLOOM_SIZE];
@@ -110,12 +114,51 @@
 
 
 /**
+ * P2P Result message
+ */
+struct PeerResultMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Content type.
+   */
+  uint32_t type GNUNET_PACKED;
+
+  /**
+   * Length of the PUT path that follows (if tracked).
+   */
+  uint32_t put_path_length GNUNET_PACKED;
+
+  /**
+   * Length of the GET path that follows (if tracked).
+   */
+  uint32_t get_path_length GNUNET_PACKED;
+
+  /**
+   * The key of the corresponding GET request.
+   */
+  GNUNET_HashCode key;
+
+  /* put path (if tracked) */
+
+  /* get path (if tracked) */
+
+  /* Payload */
+
+};
+
+
+/**
  * P2P GET message
  */
 struct PeerGetMessage
 {
   /**
-   * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT
+   * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET
    */
   struct GNUNET_MessageHeader header;
 
@@ -182,21 +225,16 @@
   struct P2PPendingMessage *prev;
 
   /**
-   * Message importance level.
+   * Message importance level.  FIXME: used? useful?
    */
   unsigned int importance;
 
   /**
-   * Time when this request was scheduled to be sent.
+   * When does this message time out?
    */
-  struct GNUNET_TIME_Absolute scheduled;
+  struct GNUNET_TIME_Absolute timeout;
 
   /**
-   * How long to wait before sending message.
-   */
-  struct GNUNET_TIME_Relative timeout;
-
-  /**
    * Actual message to be sent, allocated at the end of the struct:
    * // msg = (cast) &pm[1]; 
    * // memcpy (&pm[1], data, len);
@@ -222,7 +260,8 @@
   struct PeerInfo *prev;
 
   /**
-   * Count of outstanding messages for peer.
+   * Count of outstanding messages for peer.  FIXME: NEEDED?
+   * FIXME: bound queue size!?
    */
   unsigned int pending_count;
 
@@ -467,13 +506,340 @@
 
 
 /**
- * Perform a PUT operation.  // FIXME: document if this is only
- * routing or also storage and/or even local client notification!
+ * Called when core is ready to send a message we asked for
+ * out to the destination.
  *
+ * @param cls the 'struct PeerInfo' of the target peer
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+core_transmit_notify (void *cls, size_t size, void *buf)
+{
+  struct PeerInfo *peer = cls;
+  char *cbuf = buf;
+  struct P2PPendingMessage *pending;
+  size_t off;
+  size_t msize;
+
+  peer->th = NULL;
+  if (buf == NULL)
+  {
+    /* client disconnected */
+    return 0;
+  }
+  if (peer->head == NULL)
+  {
+    /* no messages pending */
+    return 0;
+  }
+  off = 0;
+  while ( (NULL != (pending = peer->head)) &&
+         (size - off >= (msize = ntohs (pending->msg->size))) )
+  {
+    memcpy (&cbuf[off], pending->msg, msize);
+    off += msize;
+    peer->pending_count--;
+    GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending);
+    GNUNET_free (pending);
+  }
+  if (peer->head != NULL)
+    peer->th 
+      = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
+                                           pending->importance,
+                                           pending->timeout, &peer->id, msize,
+                                           &core_transmit_notify, peer);
+
+  return off;
+}
+
+
+/**
+ * Transmit all messages in the peer's message queue.
+ *
+ * @param peer message queue to process
+ */
+static void
+process_peer_queue (struct PeerInfo *peer)
+{
+  struct P2PPendingMessage *pending;
+
+  if (NULL != (pending = peer->head))
+    return;
+  if (NULL != peer->th)
+    return;
+  peer->th 
+    = GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES,
+                                        pending->importance,
+                                        pending->timeout, &peer->id,
+                                        ntohs (pending->msg->size),
+                                        &core_transmit_notify, peer);
+}
+
+
+/**
+ * To how many peers should we (on average) forward the request to
+ * obtain the desired target_replication count (on average).
+ *
+ * FIXME: double-check that this is fine
+ * 
+ * @param hop_count number of hops the message has traversed
+ * @param target_replication the number of total paths desired
+ * @return Some number of peers to forward the message to
+ */
+static unsigned int
+get_forward_count (uint32_t hop_count, 
+                  uint32_t target_replication)
+{
+  uint32_t random_value;
+  uint32_t forward_count;
+  float target_value;
+
+  /* bound by system-wide maximum */
+  target_replication = GNUNET_MIN (16 /* FIXME: use named constant */,
+                                  target_replication);
+  if (hop_count > log_of_network_size_estimate * 2.0)
+  {
+    /* Once we have reached our ideal number of hops, only forward to 1 peer */
+    return 1;
+  }
+  target_value =
+    1 + (target_replication - 1.0) / (log_of_network_size_estimate +
+                                     ((float) (target_replication - 1.0) *
+                                      hop_count));
+  /* Set forward count to floor of target_value */
+  forward_count = (uint32_t) target_value;
+  /* Subtract forward_count (floor) from target_value (yields value between 0 
and 1) */
+  target_value = target_value - forward_count;
+  random_value =
+    GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_STRONG, UINT32_MAX); 
+  if (random_value < (target_value * UINT32_MAX))
+    forward_count++;
+  return forward_count;
+}
+
+
+/**
+ * Check whether my identity is closer than any known peers.  If a
+ * non-null bloomfilter is given, check if this is the closest peer
+ * that hasn't already been routed to.
+ * FIXME: needed?
+ *
+ * @param key hash code to check closeness to
+ * @param bloom bloomfilter, exclude these entries from the decision
+ * @return GNUNET_YES if node location is closest,
+ *         GNUNET_NO otherwise.
+ */
+static int
+am_closest_peer (const GNUNET_HashCode *key,
+                 const struct GNUNET_CONTAINER_BloomFilter *bloom)
+{
+  int bits;
+  int other_bits;
+  int bucket_num;
+  int count;
+  struct PeerInfo *pos;
+  unsigned int my_distance;
+
+  if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode)))
+    return GNUNET_YES;
+  bucket_num = find_current_bucket (key);
+  bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key);
+  my_distance = distance (&my_identity.hashPubKey, key);
+  pos = k_buckets[bucket_num].head;
+  count = 0;
+  while ((pos != NULL) && (count < bucket_size))
+  {
+    if ((bloom != NULL) &&
+        (GNUNET_YES ==
+         GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey)))
+    {
+      pos = pos->next;
+      continue;                 /* Skip already checked entries */
+    }
+    other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key);
+    if (other_bits > bits)
+      return GNUNET_NO;
+    if (other_bits == bits)        /* We match the same number of bits */
+      return GNUNET_YES;
+    pos = pos->next;
+  }
+  /* No peers closer, we are the closest! */
+  return GNUNET_YES;
+}
+
+
+/**
+ * Select a peer from the routing table that would be a good routing
+ * destination for sending a message for "key".  The resulting peer
+ * must not be in the set of blocked peers.<p>
+ *
+ * Note that we should not ALWAYS select the closest peer to the
+ * target, peers further away from the target should be chosen with
+ * exponentially declining probability.
+ *
+ * FIXME: double-check that this is fine
+ * 
+ *
+ * @param key the key we are selecting a peer to route to
+ * @param bloom a bloomfilter containing entries this request has seen already
+ * @param hops how many hops has this message traversed thus far
+ * @return Peer to route to, or NULL on error
+ */
+static struct PeerInfo *
+select_peer (const GNUNET_HashCode *key,
+             const struct GNUNET_CONTAINER_BloomFilter *bloom, 
+            uint32_t hops)
+{
+  unsigned int bc;
+  unsigned int count;
+  unsigned int selected;
+  struct PeerInfo *pos;
+  unsigned int distance;
+  unsigned int largest_distance;
+  struct PeerInfo *chosen;
+
+  if (hops >= log_of_network_size_estimate)
+  {
+    /* greedy selection (closest peer that is not in bloomfilter) */
+    largest_distance = 0;
+    chosen = NULL;
+    for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
+    {
+      pos = k_buckets[bc].head;
+      count = 0;
+      while ((pos != NULL) && (count < bucket_size))
+      {
+        /* If we are doing strict Kademlia routing, then checking the 
bloomfilter is basically cheating! */
+        if (GNUNET_NO ==
+            GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
+        {
+          distance = inverse_distance (key, &pos->id.hashPubKey);
+          if (distance > largest_distance)
+          {
+            chosen = pos;
+            largest_distance = distance;
+          }
+        }
+        count++;
+        pos = pos->next;
+      }
+    }
+    return chosen;
+  }
+
+  /* select "random" peer */
+  /* count number of peers that are available and not filtered */
+  count = 0;
+  for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
+  {
+    pos = k_buckets[bc].head;
+    while ((pos != NULL) && (count < bucket_size))
+    {
+      if (GNUNET_YES ==
+          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
+      {
+        pos = pos->next;
+        continue;               /* Ignore bloomfiltered peers */
+      }
+      count++;
+      pos = pos->next;
+    }
+  }
+  if (count == 0)               /* No peers to select from! */
+  {
+    return NULL;
+  }
+  /* Now actually choose a peer */
+  selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
+  count = 0;
+  for (bc = lowest_bucket; bc < MAX_BUCKETS; bc++)
+  {
+    pos = k_buckets[bc].head;
+    while ((pos != NULL) && (count < bucket_size))
+    {
+      if (GNUNET_YES ==
+          GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))
+      {
+        pos = pos->next;
+        continue;               /* Ignore bloomfiltered peers */
+      }
+      if (0 == selected--)
+        return pos;
+      pos = pos->next;
+    }
+  }
+  GNUNET_break (0);
+  return NULL;
+}
+
+
+/**
+ * Compute the set of peers that the given request should be
+ * forwarded to.
+ *
+ * @param key routing key
+ * @param bloom bloom filter excluding peers as targets, all selected
+ *        peers will be added to the bloom filter
+ * @param hop_count number of hops the request has traversed so far
+ * @param target_replication desired number of replicas
+ * @param targets where to store an array of target peers (to be
+ *         free'd by the caller)
+ * @return number of peers returned in 'targets'.
+ */
+static unsigned int
+get_target_peers (const GNUNET_HashCode *key,
+                 struct GNUNET_CONTAINER_BloomFilter *bloom,
+                 uint32_t hop_count,
+                 uint32_t target_replication,
+                 struct PeerInfo ***targets)
+{
+  unsigned int ret;
+  unsigned int off;
+  struct PeerInfo **rtargets;
+  struct PeerInfo *nxt;
+
+  ret = get_forward_count (hop_count, target_replication);
+  if (ret == 0)
+  {
+    *targets = NULL;
+    return 0;
+  }
+  rtargets = GNUNET_malloc (sizeof (struct PeerInfo*) * ret);
+  off = 0;
+  while (ret-- > 0)
+  {
+    nxt = select_peer (key, bloom, hop_count);
+    if (nxt == NULL)
+      break;
+    rtargets[off++] = nxt;
+    GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->id.hashPubKey);
+  }
+  if (0 == off)
+  {
+    GNUNET_free (rtargets);
+    *targets = NULL;
+    return 0;
+  }
+  *targets = rtargets;
+  return off;
+}
+
+
+/**
+ * Perform a PUT operation.   Forwards the given request to other
+ * peers.   Does not store the data locally.  Does not give the
+ * data to local clients.  May do nothing if this is the only
+ * peer in the network (or if we are the closest peer in the
+ * network).
+ *
  * @param type type of the block
  * @param options routing options
  * @param desired_replication_level desired replication count
  * @param expiration_time when does the content expire
+ * @param hop_count how many hops has this message traversed so far
+ * @param bf Bloom filter of peers this PUT has already traversed
  * @param key key for the content
  * @param put_path_length number of entries in put_path
  * @param put_path peers this request has traversed so far (if tracked)
@@ -481,27 +847,87 @@
  * @param data_size number of bytes in data
  */
 void
-GST_NEIGHBOURS_handle_put (uint32_t type,
+GDS_NEIGHBOURS_handle_put (uint32_t type,
                           uint32_t options,
                           uint32_t desired_replication_level,
                           GNUNET_TIME_Absolute expiration_time,
+                          uint32_t hop_count,
+                          struct GNUNET_CONTAINER_BloomFilter *bf,
                           const GNUNET_HashCode *key,
                           unsigned int put_path_length,
                           struct GNUNET_PeerIdentity *put_path,
                           const void *data,
                           size_t data_size)
 {
-  // FIXME
+  unsigned int target_count;
+  unsigned int i;
+  struct PeerInfo **targets;
+  struct PeerInfo *target;
+  struct P2PPendingMessage *pending;
+  size_t msize;
+  struct PeerPutMessage *ppm;
+  struct GNUNET_PeerIdentity *pp;
+  
+  target_count = get_target_peers (key, bf, hop_count,
+                                  desired_replication_level,
+                                  &targets);
+  if (0 == target_count)
+    return;
+  msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + 
sizeof (struct PeerPutMessage);
+  if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    put_path_length = 0;
+    msize = data_size + sizeof (struct PeerPutMessage);
+  }
+  if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  for (i=0;i<target_count;i++)
+  {
+    target = targets[i];
+    pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
+    pending->importance = 0; /* FIXME */
+    pending->timeout = expiration_time;   
+    ppm = (struct PeerPutMessage*) &pending[1];
+    pending->msg = &ppm->header;
+    ppm->header.size = htons (msize);
+    ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT);
+    ppm->options = htonl (options);
+    ppm->type = htonl (type);
+    ppm->hop_count = htonl (hop_count + 1);
+    ppm->desired_replication_level = htonl (desired_replication_level);
+    ppm->put_path_length = htonl (put_path_length);
+    ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time);
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CONTAINER_bloomfilter_get_raw_data (bf,
+                                                             ppm->bloomfilter,
+                                                             DHT_BLOOM_SIZE));
+    ppm->key = *key;
+    pp = (const struct GNUNET_PeerIdentity*) &ppm[1];
+    memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * 
put_path_length);
+    memcpy (&pp[put_path_length], data, data_size);
+    GNUNET_CONTAINER_DLL_insert (target->head,
+                                target->tail,
+                                pending);
+    target->pending_count++;
+    process_peer_queue (target);
+  }
+  GNUNET_free (targets);
 }
 
 
 /**
- * Perform a GET operation.  // FIXME: document if this is only
- * routing or also state-tracking and/or even local lookup!
+ * Perform a GET operation.  Forwards the given request to other
+ * peers.  Does not lookup the key locally.  May do nothing if this is
+ * the only peer in the network (or if we are the closest peer in the
+ * network).
  *
  * @param type type of the block
  * @param options routing options
  * @param desired_replication_level desired replication count
+ * @param hop_count how many hops did this request traverse so far?
  * @param key key for the content
  * @param xquery extended query
  * @param xquery_size number of bytes in xquery
@@ -510,9 +936,10 @@
  * @param peer_bf filter for peers not to select (again)
  */
 void
-GST_NEIGHBOURS_handle_get (uint32_t type,
+GDS_NEIGHBOURS_handle_get (uint32_t type,
                           uint32_t options,
                           uint32_t desired_replication_level,
+                          uint32_t hop_count,
                           const GNUNET_HashCode *key,
                           const void *xquery,
                           size_t xquery_size,
@@ -520,13 +947,69 @@
                           uint32_t reply_bf_mutator,
                           const struct GNUNET_CONTAINER_BloomFilter *peer_bf)
 {
-  // FIXME
+  unsigned int target_count;
+  unsigned int i;
+  struct PeerInfo **targets;
+  struct PeerInfo *target;
+  struct P2PPendingMessage *pending;
+  size_t msize;
+  struct PeerGetMessage *pgm;
+  char *xq;
+  size_t reply_bf_size;
+  
+  target_count = get_target_peers (key, peer_bf, hop_count,
+                                  desired_replication_level,
+                                  &targets);
+  if (0 == target_count)
+    return;
+  reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
+  msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size;
+  if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    return;
+  }
+  for (i=0;i<target_count;i++)
+  {
+    target = targets[i];
+    pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); 
+    pending->importance = 0; /* FIXME */
+    pending->timeout = GNUNET_TIME_relative_to_absolute 
(GNUNET_TIME_UNIT_HOURS); /* FIXME */
+    pgm = (struct PeerGetMessage*) &pending[1];
+    pending->msg = &pgm->header;
+    pgm->header.size = htons (msize);
+    pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET);
+    pgm->options = htonl (options);
+    pgm->type = htonl (type);
+    pgm->hop_count = htonl (hop_count + 1);
+    pgm->desired_replication_level = htonl (desired_replication_level);
+    pgm->xquery_size = htonl (xquery_size);
+    pgm->bf_mutator = reply_bf_mutator; 
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf,
+                                                             pgm->bloomfilter,
+                                                             DHT_BLOOM_SIZE));
+    pgm->key = *key;
+    xq = (const struct GNUNET_PeerIdentity*) &ppm[1];
+    memcpy (xq, xquery, xquery_size);
+    GNUNET_assert (GNUNET_OK ==
+                  GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf,
+                                                             &xq[xquery_size],
+                                                             reply_bf_size));
+    GNUNET_CONTAINER_DLL_insert (target->head,
+                                target->tail,
+                                pending);
+    target->pending_count++;
+    process_peer_queue (target);
+  }
+  GNUNET_free (targets);
 }
 
 
 /**
- * Handle a reply (route to origin).  FIXME: should this be here?
- * (reply-routing table might be better done elsewhere).
+ * Handle a reply (route to origin).  Only forwards the reply back to
+ * other peers waiting for it.  Does not do local caching or
+ * forwarding to local clients.
  *
  * @param type type of the block
  * @param options routing options
@@ -540,7 +1023,7 @@
  * @param data_size number of bytes in data
  */
 void
-GST_NEIGHBOURS_handle_reply (uint32_t type,
+GDS_NEIGHBOURS_handle_reply (uint32_t type,
                             uint32_t options,
                             GNUNET_TIME_Absolute expiration_time,
                             const GNUNET_HashCode *key,
@@ -556,10 +1039,27 @@
 
 
 /**
+ * Closure for 'add_known_to_bloom'.
+ */
+struct BloomConstructorContext
+{
+  /**
+   * Bloom filter under construction.
+   */
+  struct GNUNET_CONTAINER_BloomFilter *bloom;
+
+  /**
+   * Mutator to use.
+   */
+  uint32_t bf_mutator;
+};
+
+
+/**
  * Add each of the peers we already know to the bloom filter of
  * the request so that we don't get duplicate HELLOs.
  *
- * @param cls the 'struct GNUNET_CONTAINER_BloomFilter' we're building
+ * @param cls the 'struct BloomConstructorContext'.
  * @param key peer identity to add to the bloom filter
  * @param value value the peer information (unused)
  * @return GNUNET_YES (we should continue to iterate)
@@ -567,9 +1067,11 @@
 static int
 add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
 {
-  struct GNUNET_CONTAINER_BloomFilter *bloom = cls;
+  struct BloomConstructorContext *ctx = cls;
+  GNUNET_HashCode mh;
 
-  GNUNET_CONTAINER_bloomfilter_add (bloom, key);
+  GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
+  GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
   return GNUNET_YES;
 }
 
@@ -589,7 +1091,7 @@
   struct GNUNET_DHT_FindPeerMessage *find_peer_msg;
   struct DHT_MessageContext msg_ctx;
   struct GNUNET_TIME_Relative next_send_time;
-  struct GNUNET_CONTAINER_BloomFilter *temp_bloom;
+  struct BloomConstructorContext bcc;
 
   find_peer_task = GNUNET_SCHEDULER_NO_TASK;
   if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
@@ -602,38 +1104,21 @@
     newly_found_peers = 0;
     return;
   }
-
-  // FIXME: build message...
-  find_peer_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_FindPeerMessage));
-  find_peer_msg->header.size =
-      htons (sizeof (struct GNUNET_DHT_FindPeerMessage));
-  find_peer_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
-  temp_bloom =
-      GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
-  GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom,
-                                         temp_bloom);
-  GNUNET_assert (GNUNET_OK ==
-                 GNUNET_CONTAINER_bloomfilter_get_raw_data (temp_bloom,
-                                                            find_peer_msg->
-                                                            bloomfilter,
-                                                            DHT_BLOOM_SIZE));
-  GNUNET_CONTAINER_bloomfilter_free (temp_bloom);
-
-  memset (&msg_ctx, 0, sizeof (struct DHT_MessageContext));
-  memcpy (&msg_ctx.key, &my_identity.hashPubKey, sizeof (GNUNET_HashCode));
-  msg_ctx.unique_id =
-      GNUNET_ntohll (GNUNET_CRYPTO_random_u64
-                     (GNUNET_CRYPTO_QUALITY_STRONG, UINT64_MAX));
-  msg_ctx.replication = DHT_DEFAULT_FIND_PEER_REPLICATION;
-  msg_ctx.msg_options = GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE;
-  msg_ctx.network_size = log_of_network_size_estimate;
-  msg_ctx.peer = my_identity;
-  msg_ctx.importance = DHT_DEFAULT_FIND_PEER_IMPORTANCE;
-  msg_ctx.timeout = DHT_DEFAULT_FIND_PEER_TIMEOUT;
-  // FIXME: transmit message...
-  demultiplex_message (&find_peer_msg->header, &msg_ctx);
-  GNUNET_free (find_peer_msg);
-
+  bcc.bf_mutator = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
UINT32_MAX);
+  bcc.bloom =
+    GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, DHT_BLOOM_K);
+  GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, 
+                                        &add_known_to_bloom,
+                                         &bcc);
+  // FIXME: pass priority!?
+  GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO,
+                            GNUNET_DHT_RO_FIND_PEER,
+                            16 /* FIXME: replication level? */,
+                            0,
+                            &my_identity.hashPubKey,
+                            NULL, 0,
+                            bcc.bloom, bcc.bf_mutator, NULL);
+  GNUNET_CONTAINER_bloomfilter_free (bcc.bloom);
   /* schedule next round */
   newly_found_peers = 0;
   next_send_time.rel_value =
@@ -674,9 +1159,10 @@
 
 
 /**
- * Core handler for p2p get requests.
+ * Core handler for p2p put requests.
  *
  * @param cls closure
+ * @param peer sender of the request
  * @param message message
  * @param peer peer identity this notification is about
  * @param atsi performance data
@@ -684,84 +1170,105 @@
  *         GNUNET_SYSERR to close it (signal serious error)
  */
 static int
-handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
+handle_dht_p2p_put (void *cls,
+                   const struct GNUNET_PeerIdentity *peer,
                    const struct GNUNET_MessageHeader *message,
                    const struct GNUNET_TRANSPORT_ATS_Information
                    *atsi)
 {
-  struct GNUNET_DHT_P2PRouteMessage *incoming =
-      (struct GNUNET_DHT_P2PRouteMessage *) message;
-  struct GNUNET_MessageHeader *enc_msg =
-      (struct GNUNET_MessageHeader *) &incoming[1];
-  struct DHT_MessageContext *msg_ctx;
-  char *route_path;
-  int path_size;
-
-  if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
+  const struct PeerPutMessage *put;
+  const struct GNUNET_PeerIdentity *put_path;
+  const void *payload;
+  uint32_t putlen;
+  uint16_t msize;
+  size_t payload_size;
+  struct GNUNET_CONTAINER_BloomFilter *bf;
+  GNUNET_HashCode test_key;
+  
+  msize = ntohs (message->size);
+  if (msize < sizeof (struct PeerPutMessage))
   {
     GNUNET_break_op (0);
     return GNUNET_YES;
   }
-
-  if (get_max_send_delay ().rel_value > MAX_REQUEST_TIME.rel_value)
+  put = (const struct PeerPutMessage*) message;
+  putlen = ntohl (put->put_path_length);
+  if ( (msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct 
GNUNET_PeerIdentity)) ||
+       (putlen > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct 
GNUNET_PeerIdentity)) )
+    {
+      GNUNET_break_op (0);
+      return GNUNET_YES;
+    }
+  put_path = (const struct GNUNET_PeerIdentity*) &put[1];  
+  payload = &put_path[putlen];
+  payload_size = msize - (sizeof (struct PeerPutMessage) + 
+                         putlen * sizeof (struct GNUNET_PeerIdentity));
+  switch (GNUNET_BLOCK_get_key (block_context,
+                               ntohl (put->type),
+                               payload, payload_size,
+                               &test_key))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Sending of previous replies took too long, backing off!\n");
-    increment_stats ("# route requests dropped due to high load");
-    decrease_max_send_delay (get_max_send_delay ());
-    return GNUNET_YES;
-  }
-  msg_ctx = GNUNET_malloc (sizeof (struct DHT_MessageContext));
-  msg_ctx->bloom =
-      GNUNET_CONTAINER_bloomfilter_init (incoming->bloomfilter, DHT_BLOOM_SIZE,
-                                         DHT_BLOOM_K);
-  GNUNET_assert (msg_ctx->bloom != NULL);
-  msg_ctx->hop_count = ntohl (incoming->hop_count);
-  memcpy (&msg_ctx->key, &incoming->key, sizeof (GNUNET_HashCode));
-  msg_ctx->replication = ntohl (incoming->desired_replication_level);
-  msg_ctx->msg_options = ntohl (incoming->options);
-  if (GNUNET_DHT_RO_RECORD_ROUTE ==
-      (msg_ctx->msg_options & GNUNET_DHT_RO_RECORD_ROUTE))
-  {
-    path_size =
-        ntohl (incoming->outgoing_path_length) *
-        sizeof (struct GNUNET_PeerIdentity);
-    if (ntohs (message->size) !=
-        (sizeof (struct GNUNET_DHT_P2PRouteMessage) + ntohs (enc_msg->size) +
-         path_size))
+  case GNUNET_YES:
+    if (0 != memcmp (&test_key, key, sizeof (GNUNET_HashCode)))
     {
       GNUNET_break_op (0);
-      GNUNET_free (msg_ctx);
       return GNUNET_YES;
     }
-    route_path = (char *) &incoming[1];
-    route_path = route_path + ntohs (enc_msg->size);
-    msg_ctx->path_history =
-        GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity) + path_size);
-    memcpy (msg_ctx->path_history, route_path, path_size);
-    memcpy (&msg_ctx->path_history[path_size], &my_identity,
-            sizeof (struct GNUNET_PeerIdentity));
-    msg_ctx->path_history_len = ntohl (incoming->outgoing_path_length) + 1;
+    break;
+  case GNUNET_NO:
+    GNUNET_break_op (0);
+    return GNUNET_YES;
+  case GNUNET_SYSERR:
+    /* cannot verify, good luck */
+    break;
   }
-  msg_ctx->network_size = ntohl (incoming->network_size);
-  msg_ctx->peer = *peer;
-  msg_ctx->importance = DHT_DEFAULT_P2P_IMPORTANCE;
-  msg_ctx->timeout = DHT_DEFAULT_P2P_TIMEOUT;
-  demultiplex_message (enc_msg, msg_ctx);
-  if (msg_ctx->bloom != NULL)
+  bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter,
+                                         DHT_BLOOM_SIZE,
+                                         DHT_BLOOM_K);
   {
-    GNUNET_CONTAINER_bloomfilter_free (msg_ctx->bloom);
-    msg_ctx->bloom = NULL;
+    struct GNUNET_PeerIdentity pp[putlen+1];
+  
+    /* extend 'put path' by sender */
+    memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity));
+    pp[putlen] = *sender;
+
+    /* give to local clients */
+    GDS_CLIENT_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time),
+                            &put->key,
+                            0, NULL,
+                            putlen + 1,
+                            pp,
+                            ntohl (put->type),
+                            payload_size,
+                            payload);
+    /* store locally */
+    GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (put->expiration_time),
+                             &put->key,
+                             putlen + 1, pp,
+                             ntohl (put->type),
+                             payload_size,
+                             payload);
+    /* route to other peers */
+    GDS_NEIGHBOURS_handle_put (ntohl (put->type),
+                              ntohl (put->options),
+                              ntohl (put->desired_replication_level),
+                              GNUNET_TIME_absolute_ntoh (put->expiration_time),
+                              ntohl (put->hop_count),
+                              bf,
+                              putlen + 1, pp,
+                              payload,
+                              payload_size);
   }
-  GNUNET_free (msg_ctx);
+  GNUNET_CONTAINER_bloomfilter_free (bf);
   return GNUNET_YES;
 }
 
 
 /**
- * Core handler for p2p put requests.
+ * Core handler for p2p get requests.
  *
  * @param cls closure
+ * @param peer sender of the request
  * @param message message
  * @param peer peer identity this notification is about
  * @param atsi performance data
@@ -769,11 +1276,18 @@
  *         GNUNET_SYSERR to close it (signal serious error)
  */
 static int
-handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
+handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
                    const struct GNUNET_MessageHeader *message,
                    const struct GNUNET_TRANSPORT_ATS_Information
                    *atsi)
 {
+  // 1) validate GET
+  // 2) store in routing table
+  // 3) check options (i.e. FIND PEER)
+  // 4) local lookup (=> need eval result!)
+  // 5) p2p forwarding
+
+
   struct GNUNET_DHT_P2PRouteMessage *incoming =
       (struct GNUNET_DHT_P2PRouteMessage *) message;
   struct GNUNET_MessageHeader *enc_msg =
@@ -782,6 +1296,7 @@
   char *route_path;
   int path_size;
 
+  // FIXME
   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
   {
     GNUNET_break_op (0);
@@ -844,7 +1359,7 @@
 
 
 /**
- * Core handler for p2p route results.
+ * Core handler for p2p result messages.
  *
  * @param cls closure
  * @param message message
@@ -858,12 +1373,17 @@
                       const struct GNUNET_TRANSPORT_ATS_Information
                       *atsi)
 {
+  // 1) validate result format
+  // 2) append 'peer' to put path
+  // 3) forward to local clients
+  // 4) p2p routing
   const struct GNUNET_DHT_P2PRouteResultMessage *incoming =
       (const struct GNUNET_DHT_P2PRouteResultMessage *) message;
   struct GNUNET_MessageHeader *enc_msg =
       (struct GNUNET_MessageHeader *) &incoming[1];
   struct DHT_MessageContext msg_ctx;
 
+  // FIXME
   if (ntohs (enc_msg->size) >= GNUNET_SERVER_MAX_MESSAGE_SIZE - 1)
   {
     GNUNET_break_op (0);
@@ -903,7 +1423,7 @@
  * Initialize neighbours subsystem.
  */
 int
-GST_NEIGHBOURS_init ()
+GDS_NEIGHBOURS_init ()
 {
   static struct GNUNET_CORE_MessageHandler core_handlers[] = {
     {&handle_dht_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0},
@@ -918,18 +1438,16 @@
       GNUNET_CONFIGURATION_get_value_number (cfg, "DHT", "bucket_size",
                                              &temp_config_num))
     bucket_size = (unsigned int) temp_config_num;  
-  coreAPI = GNUNET_CORE_connect (GDS_cfg,   /* Main configuration */
-                                 DEFAULT_CORE_QUEUE_SIZE,       /* queue size 
*/
-                                 NULL,  /* Closure passed to DHT functions */
-                                 &core_init,    /* Call core_init once 
connected */
-                                 &handle_core_connect,  /* Handle connects */
-                                 &handle_core_disconnect,       /* remove 
peers on disconnects */
+  coreAPI = GNUNET_CORE_connect (GDS_cfg,
+                                 DEFAULT_CORE_QUEUE_SIZE,
+                                 NULL,
+                                 &core_init,
+                                 &handle_core_connect,
+                                 &handle_core_disconnect, 
                                  NULL,  /* Do we care about "status" updates? 
*/
-                                 NULL,  /* Don't want notified about all 
incoming messages */
-                                 GNUNET_NO,     /* For header only inbound 
notification */
-                                 NULL,  /* Don't want notified about all 
outbound messages */
-                                 GNUNET_NO,     /* For header only outbound 
notification */
-                                 core_handlers);        /* Register these 
handlers */  
+                                 NULL, GNUNET_NO,
+                                 NULL, GNUNET_NO,
+                                 core_handlers);
   if (coreAPI == NULL)
     return GNUNET_SYSERR;
   all_known_peers = GNUNET_CONTAINER_multihashmap_create (256);
@@ -941,7 +1459,7 @@
  * Shutdown neighbours subsystem.
  */
 void
-GST_NEIGHBOURS_done ()
+GDS_NEIGHBOURS_done ()
 {
   GNUNET_assert (coreAPI != NULL);
   GNUNET_CORE_disconnect (coreAPI);

Modified: gnunet/src/dht/gnunet-service-dht_neighbours.h
===================================================================
--- gnunet/src/dht/gnunet-service-dht_neighbours.h      2011-09-24 07:16:15 UTC 
(rev 17009)
+++ gnunet/src/dht/gnunet-service-dht_neighbours.h      2011-09-25 22:16:20 UTC 
(rev 17010)
@@ -29,12 +29,18 @@
 
 
 /**
- * Perform a PUT operation.
+ * Perform a PUT operation.  Forwards the given request to other
+ * peers.   Does not store the data locally.  Does not give the
+ * data to local clients.  May do nothing if this is the only
+ * peer in the network (or if we are the closest peer in the
+ * network).
  *
  * @param type type of the block
  * @param options routing options
- * @param desired_replication_level desired replication count
+ * @param desired_replication_level desired replication level
  * @param expiration_time when does the content expire
+ * @param hop_count how many hops has this message traversed so far
+ * @param bf Bloom filter of peers this PUT has already traversed
  * @param key key for the content
  * @param put_path_length number of entries in put_path
  * @param put_path peers this request has traversed so far (if tracked)
@@ -42,10 +48,12 @@
  * @param data_size number of bytes in data
  */
 void
-GST_NEIGHBOURS_handle_put (uint32_t type,
+GDS_NEIGHBOURS_handle_put (uint32_t type,
                           uint32_t options,
                           uint32_t desired_replication_level,
                           GNUNET_TIME_Absolute expiration_time,
+                          uint32_t hop_count,
+                          struct GNUNET_CONTAINER_BloomFilter *bf,
                           const GNUNET_HashCode *key,
                           unsigned int put_path_length,
                           struct GNUNET_PeerIdentity *put_path,
@@ -54,11 +62,15 @@
 
 
 /**
- * Perform a GET operation.
+ * Perform a GET operation.  Forwards the given request to other
+ * peers.  Does not lookup the key locally.  May do nothing if this is
+ * the only peer in the network (or if we are the closest peer in the
+ * network).
  *
  * @param type type of the block
  * @param options routing options
  * @param desired_replication_level desired replication count
+ * @param hop_count how many hops did this request traverse so far?
  * @param key key for the content
  * @param xquery extended query
  * @param xquery_size number of bytes in xquery
@@ -67,9 +79,10 @@
  * @param peer_bf filter for peers not to select (again)
  */
 void
-GST_NEIGHBOURS_handle_get (uint32_t type,
+GDS_NEIGHBOURS_handle_get (uint32_t type,
                           uint32_t options,
                           uint32_t desired_replication_level,
+                          uint32_t hop_count,
                           const GNUNET_HashCode *key,
                           const void *xquery,
                           size_t xquery_size,
@@ -79,10 +92,11 @@
 
 
 /**
- * Handle a reply (route to origin).
+ * Handle a reply (route to origin).  Only forwards the reply back to
+ * other peers waiting for it.  Does not do local caching or
+ * forwarding to local clients.
  *
  * @param type type of the block
- * @param options routing options
  * @param expiration_time when does the content expire
  * @param key key for the content
  * @param put_path_length number of entries in put_path
@@ -93,8 +107,7 @@
  * @param data_size number of bytes in data
  */
 void
-GST_NEIGHBOURS_handle_reply (uint32_t type,
-                            uint32_t options,
+GDS_NEIGHBOURS_handle_reply (uint32_t type,
                             GNUNET_TIME_Absolute expiration_time,
                             const GNUNET_HashCode *key,
                             unsigned int put_path_length,
@@ -109,13 +122,13 @@
  * Initialize neighbours subsystem.
  */
 void
-GST_NEIGHBOURS_init (void);
+GDS_NEIGHBOURS_init (void);
 
 /**
  * Shutdown neighbours subsystem.
  */
 void
-GST_NEIGHBOURS_done (void);
+GDS_NEIGHBOURS_done (void);
 
 
 #endif




reply via email to

[Prev in Thread] Current Thread [Next in Thread]