gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r10982 - gnunet/src/dht


From: gnunet
Subject: [GNUnet-SVN] r10982 - gnunet/src/dht
Date: Mon, 19 Apr 2010 17:16:38 +0200

Author: nevans
Date: 2010-04-19 17:16:38 +0200 (Mon, 19 Apr 2010)
New Revision: 10982

Modified:
   gnunet/src/dht/Makefile.am
   gnunet/src/dht/dht.h
   gnunet/src/dht/dht_api.c
   gnunet/src/dht/gnunet-dht-get-peer.c
   gnunet/src/dht/gnunet-service-dht.c
   gnunet/src/dht/test_dht_api.c
Log:
dht api fixes, it works again (for me)

Modified: gnunet/src/dht/Makefile.am
===================================================================
--- gnunet/src/dht/Makefile.am  2010-04-19 15:16:18 UTC (rev 10981)
+++ gnunet/src/dht/Makefile.am  2010-04-19 15:16:38 UTC (rev 10982)
@@ -68,6 +68,7 @@
  test_dht_api.c
 test_dht_api_LDADD = \
  $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/hello/libgnunethello.la \
  $(top_builddir)/src/dht/libgnunetdht.la    
 
 EXTRA_DIST = \

Modified: gnunet/src/dht/dht.h
===================================================================
--- gnunet/src/dht/dht.h        2010-04-19 15:16:18 UTC (rev 10981)
+++ gnunet/src/dht/dht.h        2010-04-19 15:16:38 UTC (rev 10982)
@@ -34,12 +34,13 @@
                                                    * msg);
 
 /**
- * FIXME.
+ * Message which indicates the DHT should cancel outstanding
+ * requests and discard any state.
  */
 struct GNUNET_DHT_StopMessage
 {
   /**
-   * Type: GNUNET_MESSAGE_TYPE_DHT_MESSAGE
+   * Type: GNUNET_MESSAGE_TYPE_DHT_STOP
    */
   struct GNUNET_MessageHeader header;
 
@@ -57,7 +58,8 @@
 
 
 /**
- * Generic DHT message, wrapper for other message types
+ * Generic DHT message, indicates that a route request
+ * should be issued.
  */
 struct GNUNET_DHT_RouteMessage
 {
@@ -77,7 +79,8 @@
   GNUNET_HashCode key;
 
   /**
-   * Unique ID identifying this request
+   * Unique ID identifying this request, if 0 then
+   * the client will not expect a response
    */
   uint64_t unique_id GNUNET_PACKED;
 
@@ -86,12 +89,6 @@
    */
   uint32_t desired_replication_level GNUNET_PACKED;
 
-  /**
-   * Is this message uniquely identified?  If so it will
-   * be fire and forget, if not we will wait for a receipt
-   * from the service.
-   */
-  uint32_t unique GNUNET_PACKED;
 
   /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy 
do */
 

Modified: gnunet/src/dht/dht_api.c
===================================================================
--- gnunet/src/dht/dht_api.c    2010-04-19 15:16:18 UTC (rev 10981)
+++ gnunet/src/dht/dht_api.c    2010-04-19 15:16:38 UTC (rev 10982)
@@ -24,6 +24,11 @@
  * @author Christian Grothoff
  * @author Nathan Evans
  *
+ * TODO: retransmission of pending requests maybe happens now, at least
+ *       the code is in place to do so.  Need to add checks when api calls
+ *       happen to check if retransmission is in progress, and if so set
+ *       the single pending message for transmission once the list of
+ *       retries are done.
  */
 
 #include "platform.h"
@@ -67,16 +72,23 @@
   void *cont_cls;
 
   /**
-   * Whether or not to await verification the message
-   * was received by the service
+   * Unique ID for this request
    */
-  size_t is_unique;
+  uint64_t unique_id;
 
+};
+
+struct PendingMessageList
+{
   /**
-   * Unique ID for this request
+   * This is a singly linked list.
    */
-  uint64_t unique_id;
+  struct PendingMessageList *next;
 
+  /**
+   * The pending message.
+   */
+  struct PendingMessage *message;
 };
 
 struct GNUNET_DHT_GetContext
@@ -108,8 +120,7 @@
 };
 
 /**
- * Handle to control a unique operation (one that is
- * expected to return results)
+ * Handle to a route request
  */
 struct GNUNET_DHT_RouteHandle
 {
@@ -138,37 +149,16 @@
    * Main handle to this DHT api
    */
   struct GNUNET_DHT_Handle *dht_handle;
-};
 
-/**
- * Handle for a non unique request, holds callback
- * which needs to be called before we allow other
- * messages to be processed and sent to the DHT service
- */
-struct GNUNET_DHT_NonUniqueHandle
-{
   /**
-   * Key that this get request is for
+   * The actual message sent for this request,
+   * used for retransmitting requests on service
+   * failure/reconnect.  Freed on route_stop.
    */
-  GNUNET_HashCode key;
+  struct GNUNET_DHT_RouteMessage *message;
+};
 
-  /**
-   * Type of data get request was for
-   */
-  uint32_t type;
 
-  /**
-   * Continuation to call on service
-   * confirmation of message receipt.
-   */
-  GNUNET_SCHEDULER_Task cont;
-
-  /**
-   * Send continuation cls
-   */
-  void *cont_cls;
-};
-
 /**
  * Handle to control a get operation.
  */
@@ -185,6 +175,7 @@
   struct GNUNET_DHT_GetContext get_context;
 };
 
+
 /**
  * Handle to control a find peer operation.
  */
@@ -202,6 +193,27 @@
 };
 
 
+enum DHT_Retransmit_Stage
+{
+  /**
+   * The API is not retransmitting anything at this time.
+   */
+  DHT_NOT_RETRANSMITTING,
+
+  /**
+   * The API is retransmitting, and nothing has been single
+   * queued for sending.
+   */
+  DHT_RETRANSMITTING,
+
+  /**
+   * The API is retransmitting, and a single message has been
+   * queued for transmission once finished.
+   */
+  DHT_RETRANSMITTING_MESSAGE_QUEUED
+};
+
+
 /**
  * Connection to the DHT service.
  */
@@ -242,16 +254,27 @@
   struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
 
   /**
-   * Non unique handle.  If set don't schedule another non
-   * unique request.
+   * Generator for unique ids.
    */
-  struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
+  uint64_t uid_gen;
 
   /**
-   * Generator for unique ids.
+   * Are we currently retransmitting requests?  If so queue a _single_
+   * new request when received.
    */
-  uint64_t uid_gen;
+  enum DHT_Retransmit_Stage retransmit_stage;
 
+  /**
+   * Linked list of retranmissions, to be used in the event
+   * of a dht service disconnect/reconnect.
+   */
+  struct PendingMessageList *retransmissions;
+
+  /**
+   * A single pending message allowed to be scheduled
+   * during retransmission phase.
+   */
+  struct PendingMessage *retransmission_buffer;
 };
 
 
@@ -269,8 +292,255 @@
   *((uint64_t*)hash) = uid;
 }
 
+/**
+ * Iterator callback to retransmit each outstanding request
+ * because the connection to the DHT service went down (and
+ * came back).
+ *
+ *
+ */
+static int retransmit_iterator (void *cls,
+                                const GNUNET_HashCode * key,
+                                void *value)
+{
+  struct GNUNET_DHT_RouteHandle *route_handle = value;
+  struct PendingMessageList *pending_message_list;
 
+  pending_message_list = GNUNET_malloc(sizeof(struct PendingMessageList) + 
sizeof(struct PendingMessage));
+  pending_message_list->message = (struct PendingMessage 
*)&pending_message_list[1];
+  pending_message_list->message->msg = &route_handle->message->header;
+  pending_message_list->message->timeout = GNUNET_TIME_relative_get_forever();
+  pending_message_list->message->cont = NULL;
+  pending_message_list->message->cont_cls = NULL;
+  pending_message_list->message->unique_id = route_handle->uid;
+  /* Add the new pending message to the front of the retransmission list */
+  pending_message_list->next = route_handle->dht_handle->retransmissions;
+
+  return GNUNET_OK;
+}
+
 /**
+ * Try to (re)connect to the dht service.
+ *
+ * @return GNUNET_YES on success, GNUNET_NO on failure.
+ */
+static int
+try_connect (struct GNUNET_DHT_Handle *handle)
+{
+  if (handle->client != NULL)
+    return GNUNET_OK;
+  handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
+  if (handle->client != NULL)
+    return GNUNET_YES;
+#if DEBUG_STATISTICS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              _("Failed to connect to the dht service!\n"));
+#endif
+  return GNUNET_NO;
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ */
+static void
+finish (struct GNUNET_DHT_Handle *handle, int code)
+{
+  struct PendingMessage *pos = handle->current;
+  GNUNET_HashCode uid_hash;
+  hash_from_uid (pos->unique_id, &uid_hash);
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
+#endif
+  GNUNET_assert (pos != NULL);
+
+  if (pos->cont != NULL)
+    {
+      if (code == GNUNET_SYSERR)
+        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+                                           pos->cont_cls,
+                                           GNUNET_SCHEDULER_REASON_TIMEOUT);
+      else
+        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
+                                           pos->cont_cls,
+                                           
GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+    }
+
+  if (pos->unique_id != 0)
+    GNUNET_free(pos->msg);
+  GNUNET_free (pos);
+  handle->current = NULL;
+}
+
+/**
+ * Transmit the next pending message, called by notify_transmit_ready
+ */
+static size_t
+transmit_pending (void *cls, size_t size, void *buf)
+{
+  struct GNUNET_DHT_Handle *handle = cls;
+  size_t tsize;
+
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': In transmit_pending\n", "DHT API");
+#endif
+  if (buf == NULL)
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': In transmit_pending buf is NULL\n", "DHT API");
+#endif
+      finish (handle, GNUNET_SYSERR);
+      return 0;
+    }
+
+  handle->th = NULL;
+
+  if (handle->current != NULL)
+    {
+      tsize = ntohs (handle->current->msg->size);
+      if (size >= tsize)
+        {
+#if DEBUG_DHT_API
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "`%s': Sending message size %d\n", "DHT API", tsize);
+#endif
+          memcpy (buf, handle->current->msg, tsize);
+          finish (handle, GNUNET_OK);
+          return tsize;
+        }
+      else
+        {
+          return 0;
+        }
+    }
+  /* Have no pending request */
+  return 0;
+}
+
+/**
+ * Try to send messages from list of messages to send
+ */
+static void
+process_pending_message (struct GNUNET_DHT_Handle *handle)
+{
+
+  if (handle->current == NULL)
+    return;                     /* action already pending */
+  if (GNUNET_YES != try_connect (handle))
+    {
+      finish (handle, GNUNET_SYSERR);
+      return;
+    }
+
+  if (NULL ==
+      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
+                                                         ntohs (handle->
+                                                                current->msg->
+                                                                size),
+                                                         handle->current->
+                                                         timeout, GNUNET_YES,
+                                                         &transmit_pending,
+                                                         handle)))
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Failed to transmit request to dht service.\n");
+#endif
+      finish (handle, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Scheduled sending message of size %d to service\n",
+              "DHT API", ntohs (handle->current->msg->size));
+#endif
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ * Forward declaration.
+ */
+static void
+finish_retransmission (struct GNUNET_DHT_Handle *handle, int code);
+
+/* Forward declaration */
+static size_t
+transmit_pending_retransmission (void *cls, size_t size, void *buf);
+
+/**
+ * Try to send messages from list of messages to send
+ */
+static void
+process_pending_retransmissions (struct GNUNET_DHT_Handle *handle)
+{
+
+  if (handle->current == NULL)
+    return;                     /* action already pending */
+  if (GNUNET_YES != try_connect (handle))
+    {
+      finish_retransmission (handle, GNUNET_SYSERR);
+      return;
+    }
+
+  if (NULL ==
+      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
+                                                         ntohs (handle->
+                                                                current->msg->
+                                                                size),
+                                                         handle->current->
+                                                         timeout, GNUNET_YES,
+                                                         
&transmit_pending_retransmission,
+                                                         handle)))
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Failed to transmit request to dht service.\n");
+#endif
+      finish_retransmission (handle, GNUNET_SYSERR);
+      return;
+    }
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Scheduled sending message of size %d to service\n",
+              "DHT API", ntohs (handle->current->msg->size));
+#endif
+}
+
+/**
+ * Send complete (or failed), call continuation if we have one.
+ */
+static void
+finish_retransmission (struct GNUNET_DHT_Handle *handle, int code)
+{
+  struct PendingMessage *pos = handle->current;
+  struct PendingMessageList *pending_list;
+#if DEBUG_DHT_API
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish (retransmission) 
called!\n", "DHT API");
+#endif
+  GNUNET_assert (pos == handle->retransmissions->message);
+  pending_list = handle->retransmissions;
+  handle->retransmissions = handle->retransmissions->next;
+  GNUNET_free (pending_list);
+
+  if (handle->retransmissions == NULL)
+    {
+      handle->retransmit_stage = DHT_NOT_RETRANSMITTING;
+    }
+
+  if (handle->retransmissions != NULL)
+    {
+      handle->current = handle->retransmissions->message;
+      process_pending_retransmissions(handle);
+    }
+  else if (handle->retransmission_buffer != NULL)
+    {
+      handle->current = handle->retransmission_buffer;
+      process_pending_message(handle);
+    }
+}
+
+/**
  * Handler for messages received from the DHT service
  * a demultiplexer which handles numerous message types
  *
@@ -286,8 +556,6 @@
   uint64_t uid;
   GNUNET_HashCode uid_hash;
   size_t enc_size;
-  /* TODO: find out message type, handle callbacks for different types of 
messages.
-   * Should be a non unique acknowledgment, or unique result. */
 
   if (msg == NULL)
     {
@@ -300,8 +568,11 @@
       handle->client = GNUNET_CLIENT_connect (handle->sched, 
                                              "dht", 
                                              handle->cfg);
-      /* FIXME: re-transmit *all* of our GET requests AND re-start
-        receiving responses! */
+
+      handle->retransmit_stage = DHT_RETRANSMITTING;
+      GNUNET_CONTAINER_multihashmap_iterate(handle->outstanding_requests, 
&retransmit_iterator, handle);
+      handle->current = handle->retransmissions->message;
+      process_pending_retransmissions(handle);
       return;
     }
 
@@ -341,37 +612,6 @@
 
         break;
       }
-      /* FIXME: we don't want these anymore, call continuation once message is 
sent. */
-      /*
-    case GNUNET_MESSAGE_TYPE_DHT_STOP:
-      {
-        stop_msg = (struct GNUNET_DHT_StopMessage *) msg;
-        uid = GNUNET_ntohll (stop_msg->unique_id);
-#if DEBUG_DHT_API
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "`%s': Received response to message (uid %llu), current 
uid %llu\n",
-                    "DHT API", uid, handle->current->unique_id);
-#endif
-        if (handle->current->unique_id == uid)
-          {
-#if DEBUG_DHT_API
-            GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                        "`%s': Have pending confirmation for this message!\n",
-                        "DHT API", uid);
-#endif
-            if (handle->current->cont != NULL)
-              GNUNET_SCHEDULER_add_continuation (handle->sched,
-                                                 handle->current->cont,
-                                                 handle->current->cont_cls,
-                                                 
GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-
-            GNUNET_free (handle->current->msg);
-            GNUNET_free (handle->current);
-            handle->current = NULL;
-          }
-        break;
-      }
-      */
     default:
       {
         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -407,6 +647,7 @@
   handle->cfg = cfg;
   handle->sched = sched;
   handle->client = GNUNET_CLIENT_connect (sched, "dht", cfg);
+  handle->uid_gen = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, -1);
   if (handle->client == NULL)
     {
       GNUNET_free (handle);
@@ -451,48 +692,18 @@
       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
       handle->client = NULL;
     }
-  /* Either assert that outstanding_requests is empty */
-  /* FIXME: handle->outstanding_requests not freed! */
+
+  
GNUNET_assert(GNUNET_CONTAINER_multihashmap_size(handle->outstanding_requests) 
== 0);
+  GNUNET_CONTAINER_multihashmap_destroy(handle->outstanding_requests);
   GNUNET_free (handle);
 }
 
 
 /**
- * Send complete (or failed), call continuation if we have one.
- */
-static void
-finish (struct GNUNET_DHT_Handle *handle, int code)
-{
-  struct PendingMessage *pos = handle->current;
-#if DEBUG_DHT_API
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': Finish called!\n", "DHT API");
-#endif
-  GNUNET_assert (pos != NULL);
-
-
-  if (pos->cont != NULL)
-    {
-      if (code == GNUNET_SYSERR)
-        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
-                                           pos->cont_cls,
-                                           GNUNET_SCHEDULER_REASON_TIMEOUT);
-      else
-        GNUNET_SCHEDULER_add_continuation (handle->sched, pos->cont,
-                                           pos->cont_cls,
-                                           
GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-    }
-
-  GNUNET_free (pos->msg);
-  GNUNET_free (pos);
-  handle->current = NULL;
-}
-
-
-/**
  * Transmit the next pending message, called by notify_transmit_ready
  */
 static size_t
-transmit_pending (void *cls, size_t size, void *buf)
+transmit_pending_retransmission (void *cls, size_t size, void *buf)
 {
   struct GNUNET_DHT_Handle *handle = cls;
   size_t tsize;
@@ -507,8 +718,7 @@
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "`%s': In transmit_pending buf is NULL\n", "DHT API");
 #endif
-      /* FIXME: free associated resources or summat */
-      finish (handle, GNUNET_SYSERR);
+      finish_retransmission (handle, GNUNET_SYSERR);
       return 0;
     }
 
@@ -524,7 +734,7 @@
                       "`%s': Sending message size %d\n", "DHT API", tsize);
 #endif
           memcpy (buf, handle->current->msg, tsize);
-          finish (handle, GNUNET_OK);
+          finish_retransmission (handle, GNUNET_OK);
           return tsize;
         }
       else
@@ -538,66 +748,6 @@
 
 
 /**
- * Try to (re)connect to the dht service.
- *
- * @return GNUNET_YES on success, GNUNET_NO on failure.
- */
-static int
-try_connect (struct GNUNET_DHT_Handle *handle)
-{
-  if (handle->client != NULL)
-    return GNUNET_OK;
-  handle->client = GNUNET_CLIENT_connect (handle->sched, "dht", handle->cfg);
-  if (handle->client != NULL)
-    return GNUNET_YES;
-#if DEBUG_STATISTICS
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              _("Failed to connect to the dht service!\n"));
-#endif
-  return GNUNET_NO;
-}
-
-
-/**
- * Try to send messages from list of messages to send
- */
-static void
-process_pending_message (struct GNUNET_DHT_Handle *handle)
-{
-
-  if (handle->current == NULL)
-    return;                     /* action already pending */
-  if (GNUNET_YES != try_connect (handle))
-    {
-      finish (handle, GNUNET_SYSERR);
-      return;
-    }
-
-  if (NULL ==
-      (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
-                                                         ntohs (handle->
-                                                                current->msg->
-                                                                size),
-                                                         handle->current->
-                                                         timeout, GNUNET_YES,
-                                                         &transmit_pending,
-                                                         handle)))
-    {
-#if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Failed to transmit request to dht service.\n");
-#endif
-      finish (handle, GNUNET_SYSERR);
-      return;
-    }
-#if DEBUG_DHT_API
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Scheduled sending message of size %d to service\n",
-              "DHT API", ntohs (handle->current->msg->size));
-#endif
-}
-
-/**
  * Iterator called on each result obtained from a generic route
  * operation
  */
@@ -633,20 +783,31 @@
 find_peer_reply_iterator (void *cls, const struct GNUNET_MessageHeader *reply)
 {
   struct GNUNET_DHT_FindPeerHandle *find_peer_handle = cls;
+  struct GNUNET_MessageHeader *hello;
+  size_t hello_size;
 
-#if DEBUG_DHT_API
+  if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT)
+    {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Find peer iterator called.\n");
-#endif
-  if (ntohs (reply->type) != GNUNET_MESSAGE_TYPE_HELLO)
-    return;
+                  "Received wrong type of response to a find peer 
request...\n");
+      return;
+    }
 
+
   GNUNET_assert (ntohs (reply->size) >=
                  sizeof (struct GNUNET_MessageHeader));
+  hello_size = ntohs(reply->size) - sizeof(struct GNUNET_MessageHeader);
+  hello = (struct GNUNET_MessageHeader *)&reply[1];
 
+  if (ntohs(hello->type) != GNUNET_MESSAGE_TYPE_HELLO)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Encapsulated message of type %d, is not a `%s' message!\n", 
ntohs(hello->type), "HELLO");
+      return;
+    }
   find_peer_handle->find_peer_context.proc (find_peer_handle->
                                             find_peer_context.proc_cls,
-                                            (struct GNUNET_HELLO_Message 
*)reply);
+                                            (struct GNUNET_HELLO_Message 
*)hello);
 }
 
 /**
@@ -685,36 +846,38 @@
   struct GNUNET_DHT_RouteHandle *route_handle;
   struct PendingMessage *pending;
   struct GNUNET_DHT_RouteMessage *message;
-  size_t expects_response;
   uint16_t msize;
   GNUNET_HashCode uid_key;
-  uint64_t uid;
 
   if (sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size) >= 
GNUNET_SERVER_MAX_MESSAGE_SIZE)
     {
       GNUNET_break (0);
       return NULL;
     }
-  expects_response = GNUNET_YES;
-  if (iter == NULL)
-    expects_response = GNUNET_NO;
-  uid = handle->uid_gen++;
-  if (expects_response)
+
+  route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
+  memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
+  route_handle->iter = iter;
+  route_handle->iter_cls = iter_cls;
+  route_handle->dht_handle = handle;
+  if (iter != NULL)
     {
-      route_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_RouteHandle));
-      memcpy (&route_handle->key, key, sizeof (GNUNET_HashCode));
-      route_handle->iter = iter;
-      route_handle->iter_cls = iter_cls;
-      route_handle->dht_handle = handle;
-      route_handle->uid = uid;
-#if DEBUG_DHT_API
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "`%s': Unique ID is %llu\n", "DHT API", uid);
-#endif
+      route_handle->uid = handle->uid_gen++;
+      hash_from_uid (route_handle->uid, &uid_key);
       GNUNET_CONTAINER_multihashmap_put (handle->outstanding_requests,
                                          &uid_key, route_handle,
                                          
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
     }
+  else
+    {
+      route_handle->uid = 0;
+    }
+
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': Unique ID is %llu\n", "DHT API", route_handle->uid);
+#endif
+
   msize = sizeof (struct GNUNET_DHT_RouteMessage) + ntohs (enc->size);
   message = GNUNET_malloc (msize);
   message->header.size = htons (msize);
@@ -722,18 +885,25 @@
   memcpy (&message->key, key, sizeof (GNUNET_HashCode));
   message->options = htonl (options);
   message->desired_replication_level = htonl (options);
-  message->unique = htonl (expects_response);
-  message->unique_id = GNUNET_htonll (uid);
+  message->unique_id = GNUNET_htonll (route_handle->uid);
   memcpy (&message[1], enc, ntohs (enc->size));
   pending = GNUNET_malloc (sizeof (struct PendingMessage));
   pending->msg = &message->header;
   pending->timeout = timeout;
   pending->cont = cont;
   pending->cont_cls = cont_cls;
-  pending->unique_id = uid;
-  GNUNET_assert (handle->current == NULL);
-  handle->current = pending;
-  process_pending_message (handle);
+  pending->unique_id = route_handle->uid;
+  if (handle->current == NULL)
+    {
+      handle->current = pending;
+      process_pending_message (handle);
+    }
+  else if ((handle->current != NULL) && (handle->retransmit_stage == 
DHT_RETRANSMITTING))
+  {
+    handle->retransmit_stage = DHT_RETRANSMITTING_MESSAGE_QUEUED;
+    handle->retransmission_buffer = pending;
+  }
+  route_handle->message = message;
   return route_handle;
 }
 
@@ -762,9 +932,9 @@
                       GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
   struct GNUNET_DHT_GetHandle *get_handle;
-  struct GNUNET_DHT_GetMessage *get_msg;
+  struct GNUNET_DHT_GetMessage get_msg;
 
-  if (handle->current != NULL)  /* Can't send right now, we have a pending 
message... */
+  if ((handle->current != NULL) && (handle->retransmit_stage != 
DHT_RETRANSMITTING)) /* Can't send right now, we have a pending message... */
     return NULL;
 
   get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
@@ -777,14 +947,14 @@
               GNUNET_h2s (key));
 #endif
 
-  get_msg = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetMessage));
-  get_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
-  get_msg->header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
-  get_msg->type = htons (type);
+  get_msg.header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET);
+  get_msg.header.size = htons (sizeof (struct GNUNET_DHT_GetMessage));
+  get_msg.type = htons (type);
 
   get_handle->route_handle =
-    GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg->header, timeout,
+    GNUNET_DHT_route_start (handle, key, 0, 0, &get_msg.header, timeout,
                             &get_reply_iterator, get_handle, cont, cont_cls);
+
   return get_handle;
 }
 
@@ -821,10 +991,23 @@
   pending->timeout = DEFAULT_DHT_TIMEOUT;
   pending->cont = cont;
   pending->cont_cls = cont_cls;
-  pending->unique_id = route_handle->uid;
-  GNUNET_assert (route_handle->dht_handle->current == NULL);
-  route_handle->dht_handle->current = pending;
-  process_pending_message (route_handle->dht_handle);
+  pending->unique_id = 0; /* When finished is called, free pending->msg */
+
+  if (route_handle->dht_handle->current == NULL)
+    {
+      route_handle->dht_handle->current = pending;
+      process_pending_message (route_handle->dht_handle);
+    }
+  else if ((route_handle->dht_handle->current != NULL) && 
(route_handle->dht_handle->retransmit_stage == DHT_RETRANSMITTING))
+    {
+      route_handle->dht_handle->retransmit_stage = 
DHT_RETRANSMITTING_MESSAGE_QUEUED;
+      route_handle->dht_handle->retransmission_buffer = pending;
+    }
+  else
+    {
+      GNUNET_break(0);
+    }
+
   hash_from_uid (route_handle->uid, &uid_key);
   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
                 (route_handle->dht_handle->outstanding_requests, &uid_key,
@@ -843,6 +1026,17 @@
 GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle,
                      GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
+  if ((get_handle->route_handle->dht_handle->current != NULL) &&
+      (get_handle->route_handle->dht_handle->retransmit_stage != 
DHT_RETRANSMITTING))
+    {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation 
(get_handle->route_handle->dht_handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+      return;
+    }
+
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Removing pending get request with key %s, uid %llu\n",
@@ -880,9 +1074,9 @@
                             void *cont_cls)
 {
   struct GNUNET_DHT_FindPeerHandle *find_peer_handle;
-  struct GNUNET_MessageHeader *find_peer_msg;
+  struct GNUNET_MessageHeader find_peer_msg;
 
-  if (handle->current != NULL)  /* Can't send right now, we have a pending 
message... */
+  if ((handle->current != NULL) && (handle->retransmit_stage != 
DHT_RETRANSMITTING))  /* Can't send right now, we have a pending message... */
     return NULL;
 
   find_peer_handle =
@@ -896,11 +1090,10 @@
               "FIND PEER", GNUNET_h2s (key));
 #endif
 
-  find_peer_msg = GNUNET_malloc(sizeof(struct GNUNET_MessageHeader));
-  find_peer_msg->size = htons(sizeof(struct GNUNET_MessageHeader));
-  find_peer_msg->type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
+  find_peer_msg.size = htons(sizeof(struct GNUNET_MessageHeader));
+  find_peer_msg.type = htons(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER);
   find_peer_handle->route_handle =
-    GNUNET_DHT_route_start (handle, key, 0, options, find_peer_msg,
+    GNUNET_DHT_route_start (handle, key, 0, options, &find_peer_msg,
                             timeout, &find_peer_reply_iterator,
                             find_peer_handle, cont, cont_cls);
   return find_peer_handle;
@@ -917,6 +1110,17 @@
 GNUNET_DHT_find_peer_stop (struct GNUNET_DHT_FindPeerHandle *find_peer_handle,
                            GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
+  if ((find_peer_handle->route_handle->dht_handle->current != NULL) &&
+      (find_peer_handle->route_handle->dht_handle->retransmit_stage != 
DHT_RETRANSMITTING))
+    {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation 
(find_peer_handle->route_handle->dht_handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+      return;
+    }
+
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Removing pending `%s' request with key %s, uid %llu\n",
@@ -958,12 +1162,16 @@
                 GNUNET_SCHEDULER_Task cont, void *cont_cls)
 {
   struct GNUNET_DHT_PutMessage *put_msg;
+  struct GNUNET_DHT_RouteHandle *put_route;
   size_t msize;
 
-  if (handle->current != NULL)
+  if ((handle->current != NULL) && (handle->retransmit_stage != 
DHT_RETRANSMITTING))
     {
-      GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
-                                         GNUNET_SCHEDULER_REASON_TIMEOUT);
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
       return;
     }
 
@@ -982,8 +1190,19 @@
   put_msg->expiration = GNUNET_TIME_absolute_hton(exp);
   memcpy (&put_msg[1], data, size);
 
-  GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, timeout, NULL,
-                          NULL, cont, cont_cls);
+  put_route = GNUNET_DHT_route_start (handle, key, 0, 0, &put_msg->header, 
timeout, NULL,
+                                      NULL, cont, cont_cls);
 
+  if (put_route == NULL) /* Route start failed! */
+    {
+      if (cont != NULL)
+        {
+          GNUNET_SCHEDULER_add_continuation (handle->sched, cont, cont_cls,
+                                             GNUNET_SCHEDULER_REASON_TIMEOUT);
+        }
+    }
+  else
+    GNUNET_free(put_route);
+
   GNUNET_free (put_msg);
 }

Modified: gnunet/src/dht/gnunet-dht-get-peer.c
===================================================================
--- gnunet/src/dht/gnunet-dht-get-peer.c        2010-04-19 15:16:18 UTC (rev 
10981)
+++ gnunet/src/dht/gnunet-dht-get-peer.c        2010-04-19 15:16:38 UTC (rev 
10982)
@@ -101,17 +101,18 @@
  * operation
  *
  * @param cls closure (NULL)
- * @param peer the peer we learned about
- * @param reply the response message, should be a HELLO
+ * @param hello the response message, a HELLO
  */
 void find_peer_processor (void *cls,
-                          const struct GNUNET_PeerIdentity *peer,
-                          const struct GNUNET_MessageHeader *reply)
+                          const struct GNUNET_HELLO_Message *hello)
 {
-  result_count++;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "test_find_peer_processor called (peer `%s'), total results 
%d!\n", GNUNET_i2s(peer), result_count);
-
+  struct GNUNET_PeerIdentity peer;
+  if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer))
+    {
+      result_count++;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "test_find_peer_processor called (peer `%s'), total results 
%d!\n", GNUNET_i2s(&peer), result_count);
+    }
 }
 
 
@@ -191,8 +192,14 @@
   if (verbose)
     fprintf (stderr, "Issuing FIND PEER request for %s!\n", query_key);
 
-  find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle, timeout, 0, NULL, 
&key,
-                        &find_peer_processor, NULL, &message_sent_cont, NULL);
+  find_peer_handle = GNUNET_DHT_find_peer_start (dht_handle,
+                                                 timeout,
+                                                 0,
+                                                 &key,
+                                                 &find_peer_processor,
+                                                 NULL,
+                                                 &message_sent_cont,
+                                                 NULL);
 
 }
 

Modified: gnunet/src/dht/gnunet-service-dht.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht.c 2010-04-19 15:16:18 UTC (rev 10981)
+++ gnunet/src/dht/gnunet-service-dht.c 2010-04-19 15:16:38 UTC (rev 10982)
@@ -166,7 +166,7 @@
   /**
    * The key this request was about
    */
-  GNUNET_HashCode *key;
+  const GNUNET_HashCode *key;
 
   /**
    * The unique identifier of this request
@@ -240,9 +240,6 @@
   if (buf == NULL)             
     {
       /* client disconnected */
-#if DEBUG_DHT
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "`%s': buffer was NULL\n", "DHT");
-#endif
       return 0;
     }
   off = 0;
@@ -256,10 +253,6 @@
       GNUNET_free (reply);
       off += msize;
     }
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "`%s': Copying reply to buffer, REALLY SENT\n", "DHT");
-#endif
   process_pending_messages (client);
   return off;
 }
@@ -284,7 +277,7 @@
 
 
 /**
- * Called when a reply needs to be sent to a client, either as
+ * Called when a reply needs to be sent to a client, as
  * a result it found to a GET or FIND PEER request.
  *
  * @param client the client to send the reply to
@@ -296,7 +289,7 @@
                       const struct GNUNET_MessageHeader *message,
                       unsigned long long uid)
 {
-  struct GNUNET_DHT_Message *reply;
+  struct GNUNET_DHT_RouteResultMessage *reply;
   struct PendingMessage *pending_message;
   uint16_t msize;
   size_t tsize;
@@ -305,21 +298,21 @@
               "`%s': Sending reply to client.\n", "DHT");
 #endif
   msize = ntohs (message->size);
-  tsize = sizeof (struct GNUNET_DHT_Message) + msize;
+  tsize = sizeof (struct GNUNET_DHT_RouteResultMessage) + msize;
   if (tsize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
     {
-      GNUNET_BREAK_op (0);
+      GNUNET_break_op (0);
       return;
     }
-  reply = GNUNET_malloc (tsize);
+
+  pending_message = GNUNET_malloc (sizeof (struct PendingMessage) + tsize);
+  pending_message->msg = (struct GNUNET_MessageHeader *)&pending_message[1];
+  reply = (struct GNUNET_DHT_RouteResultMessage *)&pending_message[1];
   reply->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT);
   reply->header.size = htons (tsize);
-  if (uid != 0)
-    reply->unique = htonl (GNUNET_YES); // ????
   reply->unique_id = GNUNET_htonll (uid);
   memcpy (&reply[1], message, msize);
-  pending_message = GNUNET_malloc (sizeof (struct PendingMessage)); // inline
-  pending_message->msg = &reply->header;
+
   add_pending_message (client, pending_message);
 }
 
@@ -354,7 +347,6 @@
   get_result->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_GET_RESULT);
   get_result->header.size =
     htons (sizeof (struct GNUNET_DHT_GetResultMessage) + size);
-  get_result->data_size = htons (size);
   get_result->expiration = exp;
   memcpy (&get_result->key, key, sizeof (GNUNET_HashCode));
   get_result->type = htons (type);
@@ -383,12 +375,13 @@
   unsigned int results;
   struct DatacacheGetContext datacache_get_context;
 
-  if (ntohs (msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
+  get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
+  if (ntohs (get_msg->header.size) != sizeof (struct GNUNET_DHT_GetMessage))
     {
       GNUNET_break (0);
       return;
     }
-  get_msg = (const struct GNUNET_DHT_GetMessage *) msg;
+
   get_type = ntohs (get_msg->type);
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -422,7 +415,7 @@
                      const struct GNUNET_MessageHeader *find_msg,
                       struct DHT_MessageContext *message_context)
 {
-  struct GNUNET_DHT_FindPeerResultMessage *find_peer_result;
+  struct GNUNET_MessageHeader *find_peer_result;
   size_t hello_size;
   size_t tsize;
 
@@ -430,8 +423,8 @@
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Received `%s' request from client, key %s (msg size %d, 
we expected %d)\n",
               "DHT", "FIND PEER", GNUNET_h2s (message_context->key),
-              ntohs (find_msg->header.size),
-              sizeof (struct GNUNET_DHT_FindPeerMessage));
+              ntohs (find_msg->size),
+              sizeof (struct GNUNET_MessageHeader));
 #endif
   if (my_hello == NULL)
   {
@@ -444,13 +437,18 @@
   }
   /* Simplistic find_peer functionality, always return our hello */
   hello_size = ntohs(my_hello->size);
-  tsize = hello_size + sizeof (struct GNUNET_DHT_FindPeerResultMessage);
+  tsize = hello_size + sizeof (struct GNUNET_MessageHeader);
   // check tsize < MAX
   find_peer_result = GNUNET_malloc (tsize);
-  find_peer_result->header.type = htons 
(GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
-  find_peer_result->header.size = htons (tsize);
-  memcpy (&find_peer_result[1], &my_hello, hello_size);
-  send_reply_to_client(message_context->client, &find_peer_result->header, 
message_context->unique_id);
+  find_peer_result->type = htons (GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_RESULT);
+  find_peer_result->size = htons (tsize);
+  memcpy (&find_peer_result[1], my_hello, hello_size);
+#if DEBUG_DHT_HELLO
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "`%s': Sending hello size %d to client.\n",
+                "DHT", hello_size);
+#endif
+  send_reply_to_client(message_context->client, find_peer_result, 
message_context->unique_id);
   GNUNET_free(find_peer_result);
 }
 
@@ -471,24 +469,24 @@
   size_t put_type;
   size_t data_size;
 
-  GNUNET_assert (ntohs (msg->header.size) >=
+  GNUNET_assert (ntohs (msg->size) >=
                  sizeof (struct GNUNET_DHT_PutMessage));
   put_msg = (struct GNUNET_DHT_PutMessage *)msg;
-  put_type = ntohl (put_msg->type);
+  put_type = ntohs (put_msg->type);
   data_size = ntohs (put_msg->header.size) - sizeof (struct 
GNUNET_DHT_PutMessage);
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': %s msg total size is %d, data size %d, struct size %d\n",
-              "DHT", "PUT", ntohs (put_msg->header.size), data_size,
-              sizeof (struct GNUNET_DHT_PutMessage));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Received `%s' request from client, message type %d, key 
%s\n",
               "DHT", "PUT", put_type, GNUNET_h2s (message_context->key));
 #endif
   if (datacache != NULL)
     GNUNET_DATACACHE_put (datacache, message_context->key, data_size,
                           (char *) &put_msg[1], put_type,
-                          put_msg->expiration);
+                          GNUNET_TIME_absolute_ntoh(put_msg->expiration));
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "`%s': %s request received locally, but have no datacache!\n",
+                "DHT", "PUT");
 }
 
 
@@ -520,41 +518,6 @@
 }
 
 /**
- * Construct a message receipt confirmation for a particular uid.
- * Receipt confirmations are used for any requests that don't expect
- * a reply otherwise (i.e. put requests, stop requests).
- *
- * @param client the handle for the client
- * @param uid the unique identifier of this message
- */
-static void
-send_client_receipt_confirmation (struct GNUNET_SERVER_Client *client,
-                                  uint64_t uid)
-{
-  struct GNUNET_DHT_StopMessage *confirm_message;
-  struct ClientList *active_client;
-  struct PendingMessage *pending_message;
-
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Sending receipt confirmation for uid %llu\n", "DHT",
-              uid);
-#endif
-  confirm_message = GNUNET_malloc (sizeof (struct GNUNET_DHT_StopMessage));
-  confirm_message->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_STOP);
-  confirm_message->header.size =
-    htons (sizeof (struct GNUNET_DHT_StopMessage));
-  confirm_message->unique_id = GNUNET_htonll (uid);
-
-  active_client = find_active_client (client);
-  pending_message = GNUNET_malloc (sizeof (struct PendingMessage));
-  pending_message->msg = &confirm_message->header;
-
-  add_pending_message (active_client, pending_message);
-
-}
-
-/**
  * Handler for any generic DHT messages, calls the appropriate handler
  * depending on message type, sends confirmation if responses aren't otherwise
  * expected.
@@ -567,9 +530,10 @@
 handle_dht_start_message (void *cls, struct GNUNET_SERVER_Client *client,
                           const struct GNUNET_MessageHeader *message)
 {
-  const struct GNUNET_DHT_Message *dht_msg = (const struct GNUNET_DHT_Message 
*) message;
+  const struct GNUNET_DHT_RouteMessage *dht_msg = (const struct 
GNUNET_DHT_RouteMessage *) message;
   const struct GNUNET_MessageHeader *enc_msg;
   struct DHT_MessageContext *message_context;
+  int handle_locally;
   size_t enc_type;
 
   enc_msg = (const struct GNUNET_MessageHeader *) &dht_msg[1];
@@ -590,28 +554,37 @@
   message_context->replication = ntohl (dht_msg->desired_replication_level);
   message_context->msg_options = ntohl (dht_msg->options);
 
+  /* TODO: Steps to be added by students */
   /* FIXME: Implement *remote* DHT operations here (forward request) */
+  /* Implement generic route function and call here. */
   /* FIXME: *IF* handling should be local, then do this: */
-  switch (enc_type)
+  /* 1. find if this peer is closest based on whatever metric the DHT uses
+   * 2. if this peer is closest _OR_ the message options indicate it should
+   *    be processed everywhere _AND_ we want it processed everywhere, then
+   *    handle it locally.
+   */
+  handle_locally = GNUNET_YES;
+  if (handle_locally == GNUNET_YES)
     {
-    case GNUNET_MESSAGE_TYPE_DHT_GET:
-      handle_dht_get (cls, enc_msg,
-                      message_context);
-      break;
-    case GNUNET_MESSAGE_TYPE_DHT_PUT:
-      handle_dht_put (cls, enc_msg,
-                      message_context);
-      send_client_receipt_confirmation (client,
-                                        GNUNET_ntohll (dht_msg->unique_id));
-      break;
-    case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
-      handle_dht_find_peer (cls,
-                            enc_msg,
-                            message_context);
-      break;
-    default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "`%s': Message type (%d) not handled\n", "DHT", enc_type);
+      switch (enc_type)
+        {
+        case GNUNET_MESSAGE_TYPE_DHT_GET:
+          handle_dht_get (cls, enc_msg,
+                          message_context);
+          break;
+        case GNUNET_MESSAGE_TYPE_DHT_PUT:
+          handle_dht_put (cls, enc_msg,
+                          message_context);
+          break;
+        case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
+          handle_dht_find_peer (cls,
+                                enc_msg,
+                                message_context);
+          break;
+        default:
+          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                      "`%s': Message type (%d) not handled\n", "DHT", 
enc_type);
+        }
     }
   GNUNET_free (message_context);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -620,14 +593,14 @@
 
 /**
  * Handler for any generic DHT stop messages, calls the appropriate handler
- * depending on message type, sends confirmation by default (stop messages
- * do not otherwise expect replies)
+ * depending on message type (if processed locally)
  *
  * @param cls closure for the service
  * @param client the client we received this message from
  * @param message the actual message received
  *
- * TODO: add demultiplexing for stop message types.
+ * TODO: once message are remembered by unique id, add code to
+ *       forget them here
  */
 static void
 handle_dht_stop_message (void *cls, struct GNUNET_SERVER_Client *client,
@@ -635,13 +608,17 @@
 {
   const struct GNUNET_DHT_StopMessage *dht_stop_msg =
     (const struct GNUNET_DHT_StopMessage *) message;
-
+  uint64_t uid;
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Received `%s' request from client, uid %llu\n", "DHT",
               "GENERIC STOP", GNUNET_ntohll (dht_stop_msg->unique_id));
 #endif
-  /* TODO: actually stop... */
+
+  uid = GNUNET_ntohll(dht_stop_msg->unique_id);
+  /* TODO: actually stop... free associated resources for the request
+   * lookup request by uid and remove state. */
+
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
@@ -767,14 +744,14 @@
 
 
 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
-  {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0},
+  {&handle_dht_start_message, NULL, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0},
   {&handle_dht_stop_message, NULL, GNUNET_MESSAGE_TYPE_DHT_STOP, 0},
   {NULL, NULL, 0, 0}
 };
 
 
 static struct GNUNET_CORE_MessageHandler core_handlers[] = {
-  {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE_REQUEST, 0},
+  {&handle_dht_p2p_route_request, GNUNET_MESSAGE_TYPE_DHT_ROUTE, 0},
   {&handle_dht_p2p_route_result, GNUNET_MESSAGE_TYPE_DHT_ROUTE_RESULT, 0},
   {NULL, 0, 0}
 };
@@ -803,7 +780,6 @@
                                  GNUNET_TIME_UNIT_FOREVER_REL,
                                  NULL,  /* FIXME: anything we want to pass 
around? */
                                  &core_init,    /* Call core_init once 
connected */
-                                 NULL,  /* Don't care about pre-connects */
                                  NULL,  /* Don't care about connects */
                                  NULL,  /* Don't care about disconnects */
                                  NULL,  /* Don't want notified about all 
incoming messages */

Modified: gnunet/src/dht/test_dht_api.c
===================================================================
--- gnunet/src/dht/test_dht_api.c       2010-04-19 15:16:18 UTC (rev 10981)
+++ gnunet/src/dht/test_dht_api.c       2010-04-19 15:16:38 UTC (rev 10982)
@@ -32,6 +32,7 @@
 #include "gnunet_program_lib.h"
 #include "gnunet_scheduler_lib.h"
 #include "gnunet_dht_service.h"
+#include "gnunet_hello_lib.h"
 
 #define VERBOSE GNUNET_NO
 
@@ -90,6 +91,8 @@
 
 static struct PeerContext p1;
 
+struct RetryContext retry_context;
+
 static struct GNUNET_SCHEDULER_Handle *sched;
 
 static int ok;
@@ -146,7 +149,10 @@
 #if VERBOSE
   fprintf (stderr, "Ending on an unhappy note.\n");
 #endif
-
+  if (retry_context.peer_ctx->find_peer_handle != NULL)
+    GNUNET_DHT_find_peer_stop(retry_context.peer_ctx->find_peer_handle, NULL, 
NULL);
+  if (retry_context.retry_task != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel(sched, retry_context.retry_task);
   GNUNET_DHT_disconnect (p1.dht_handle);
 
   ok = 1;
@@ -186,21 +192,41 @@
  * @param reply response
  */
 void test_find_peer_processor (void *cls,
-                          const struct GNUNET_PeerIdentity *peer,
-                          const struct GNUNET_MessageHeader *reply)
+                               const struct GNUNET_HELLO_Message *hello)
 {
   struct RetryContext *retry_ctx = cls;
+  struct GNUNET_PeerIdentity peer;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "test_find_peer_processor called (peer `%s'), stopping find peer 
request!\n", GNUNET_i2s(peer));
+  if (GNUNET_OK == GNUNET_HELLO_get_id(hello, &peer))
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "test_find_peer_processor called (peer `%s'), stopping find 
peer request!\n", GNUNET_i2s(&peer));
 
-  if (retry_ctx->retry_task != GNUNET_SCHEDULER_NO_TASK)
-    GNUNET_SCHEDULER_cancel(sched, retry_ctx->retry_task);
+      if (retry_ctx->retry_task != GNUNET_SCHEDULER_NO_TASK)
+        {
+          GNUNET_SCHEDULER_cancel(sched, retry_ctx->retry_task);
+          retry_ctx->retry_task = GNUNET_SCHEDULER_NO_TASK;
+        }
 
-  GNUNET_SCHEDULER_add_continuation (sched, &test_find_peer_stop, &p1,
-                                     GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+      GNUNET_SCHEDULER_add_continuation (sched, &test_find_peer_stop, &p1,
+                                         GNUNET_SCHEDULER_REASON_PREREQ_DONE);
+    }
+  else
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "received find peer request, but hello_get_id failed!\n");
+    }
+
 }
 
+/**
+ * Retry the find_peer task on timeout. (Forward declaration)
+ *
+ * @param cls closure
+ * @param tc context information (why was this task triggered now?)
+ */
+void
+retry_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext 
*tc);
 
 /**
  * Retry the find_peer task on timeout.
@@ -219,9 +245,9 @@
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "test_find_peer timed out, retrying!\n");
-
+      retry_ctx->next_timeout = 
GNUNET_TIME_relative_multiply(retry_ctx->next_timeout, 2);
       retry_ctx->peer_ctx->find_peer_handle =
-          GNUNET_DHT_find_peer_start (retry_ctx->peer_ctx->dht_handle, 
retry_ctx->next_timeout, 0, NULL, &hash,
+          GNUNET_DHT_find_peer_start (retry_ctx->peer_ctx->dht_handle, 
retry_ctx->next_timeout, 0, &hash,
                                       &test_find_peer_processor, retry_ctx, 
NULL, NULL);
     }
   else
@@ -235,14 +261,14 @@
   if (retry_ctx->peer_ctx->find_peer_handle == NULL)
     GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1);
   else
-    retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, 
retry_ctx->next_timeout, &retry_find_peer, retry_ctx);
+    retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, 
retry_ctx->next_timeout, &retry_find_peer_stop, retry_ctx);
 }
 
 /**
  * Retry the find_peer task on timeout.
  *
  * @param cls closure
- * @param tc context information (why was this task triggered now)
+ * @param tc context information (why was this task triggered now?)
  */
 void
 retry_find_peer_stop (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -270,24 +296,22 @@
   struct PeerContext *peer = cls;
   GNUNET_HashCode hash;
   memset (&hash, 42, sizeof (GNUNET_HashCode));
-  struct RetryContext *retry_ctx;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Called test_find_peer!\n");
   GNUNET_assert (peer->dht_handle != NULL);
 
-  retry_ctx = GNUNET_malloc(sizeof(struct RetryContext));
-  retry_ctx->real_timeout = GNUNET_TIME_relative_to_absolute(TOTAL_TIMEOUT);
-  retry_ctx->next_timeout = BASE_TIMEOUT;
-  retry_ctx->peer_ctx = peer;
+  retry_context.real_timeout = GNUNET_TIME_relative_to_absolute(TOTAL_TIMEOUT);
+  retry_context.next_timeout = BASE_TIMEOUT;
+  retry_context.peer_ctx = peer;
 
   peer->find_peer_handle =
-    GNUNET_DHT_find_peer_start (peer->dht_handle, retry_ctx->next_timeout, 0, 
NULL, &hash,
-                                &test_find_peer_processor, retry_ctx, NULL, 
NULL);
+    GNUNET_DHT_find_peer_start (peer->dht_handle, retry_context.next_timeout, 
0, &hash,
+                                &test_find_peer_processor, &retry_context, 
NULL, NULL);
 
   if (peer->find_peer_handle == NULL)
     GNUNET_SCHEDULER_add_now (sched, &end_badly, &p1);
   else
-    retry_ctx->retry_task = GNUNET_SCHEDULER_add_delayed(sched, 
retry_ctx->next_timeout, &retry_find_peer_stop, retry_ctx);
+    retry_context.retry_task = GNUNET_SCHEDULER_add_delayed(sched, 
retry_context.next_timeout, &retry_find_peer_stop, &retry_context);
 }
 
 /**





reply via email to

[Prev in Thread] Current Thread [Next in Thread]