gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r10656 - gnunet/src/dht


From: gnunet
Subject: [GNUnet-SVN] r10656 - gnunet/src/dht
Date: Thu, 18 Mar 2010 17:32:41 +0100

Author: nevans
Date: 2010-03-18 17:32:41 +0100 (Thu, 18 Mar 2010)
New Revision: 10656

Modified:
   gnunet/src/dht/dht.h
   gnunet/src/dht/dht_api.c
   gnunet/src/dht/gnunet-service-dht.c
Log:
getting dht closer to being this crazy meta dht thing

Modified: gnunet/src/dht/dht.h
===================================================================
--- gnunet/src/dht/dht.h        2010-03-18 14:55:38 UTC (rev 10655)
+++ gnunet/src/dht/dht.h        2010-03-18 16:32:41 UTC (rev 10656)
@@ -35,6 +35,24 @@
 /**
  * Generic DHT message, wrapper for other message types
  */
+struct GNUNET_DHT_StopMessage
+{
+  /**
+   * Type: GNUNET_MESSAGE_TYPE_DHT_MESSAGE
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Unique ID identifying this request
+   */
+  uint64_t unique_id;
+
+};
+
+
+/**
+ * Generic DHT message, wrapper for other message types
+ */
 struct GNUNET_DHT_Message
 {
   /**
@@ -58,12 +76,18 @@
   uint16_t options;
 
   /**
-   * Is this message uniquely identified?  If so it has
-   * a unique_id appended to it.
+   * Is this message uniquely identified?  If so it will
+   * be fire and forget, if not we will wait for a receipt
+   * from the service.
    */
-  /* uint16_t unique; I don't think we need this, it should be held in the 
encapsulated message */
+  uint16_t unique;
 
-  /* uint64_t unique_id*/
+
+  /**
+   * Unique ID identifying this request
+   */
+  uint64_t unique_id;
+
   /* */
   /* GNUNET_MessageHeader *enc actual DHT message, copied to end of this dealy 
do */
 
@@ -112,11 +136,6 @@
    */
   size_t type;
 
-  /**
-   * The key to search for
-   */
-  GNUNET_HashCode key;
-
 };
 
 /**
@@ -156,11 +175,6 @@
    */
   struct GNUNET_MessageHeader header;
 
-  /**
-   * The key being looked up
-   */
-  GNUNET_HashCode key;
-
 };
 
 /**

Modified: gnunet/src/dht/dht_api.c
===================================================================
--- gnunet/src/dht/dht_api.c    2010-03-18 14:55:38 UTC (rev 10655)
+++ gnunet/src/dht/dht_api.c    2010-03-18 16:32:41 UTC (rev 10656)
@@ -23,6 +23,13 @@
  * @brief library to access the DHT service
  * @author Christian Grothoff
  * @author Nathan Evans
+ *
+ * TODO: Only allow a single message until confirmed as received by
+ *       the service.  For put messages call continuation as soon as
+ *       receipt acknowledged (then remove), for GET or other messages
+ *       only call continuation when data received.
+ *       Add unique identifier to message types requesting data to be
+ *       returned.
  */
 #include "platform.h"
 #include "gnunet_bandwidth_lib.h"
@@ -39,14 +46,11 @@
 
 #define DEBUG_DHT_API GNUNET_YES
 
-struct PendingMessages
+#define DEFAULT_DHT_TIMEOUT GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_SECONDS, 5)
+
+struct PendingMessage
 {
   /**
-   * Linked list of pending messages
-   */
-  struct PendingMessages *next;
-
-  /**
    * Message that is pending
    */
   struct GNUNET_MessageHeader *msg;
@@ -56,9 +60,110 @@
    */
   struct GNUNET_TIME_Relative timeout;
 
+  /**
+   * Continuation to call on message send
+   * or message receipt confirmation
+   */
+  GNUNET_DHT_MessageCallback cont;
+
+  /**
+   * Continuation closure
+   */
+  void *cont_cls;
+
+  /**
+   * Whether or not to await verification the message
+   * was received by the service
+   */
+  size_t is_unique;
+
+  /**
+   * Unique ID for this request
+   */
+  uint64_t unique_id;
+
 };
 
+struct GNUNET_DHT_GetContext
+{
+
+
+  /**
+   * Iterator to call on data receipt
+   */
+  GNUNET_DHT_GetIterator iter;
+
+  /**
+   * Closure for the iterator callback
+   */
+  void *iter_cls;
+
+};
+
 /**
+ * Handle to control a unique operation (one that is
+ * expected to return results)
+ */
+struct GNUNET_DHT_RouteHandle
+{
+
+  /**
+   * Unique identifier for this request (for key collisions)
+   */
+  uint64_t uid;
+
+  /**
+   * Key that this get request is for
+   */
+  GNUNET_HashCode key;
+
+  /**
+   * Iterator to call on data receipt
+   */
+  GNUNET_DHT_ReplyProcessor iter;
+
+  /**
+   * Closure for the iterator callback
+   */
+  void *iter_cls;
+
+  /**
+   * Main handle to this DHT api
+   */
+  struct GNUNET_DHT_Handle *dht_handle;
+};
+
+/**
+ * Handle for a non unique request, holds callback
+ * which needs to be called before we allow other
+ * messages to be processed and sent to the DHT service
+ */
+struct GNUNET_DHT_NonUniqueHandle
+{
+  /**
+   * Key that this get request is for
+   */
+  GNUNET_HashCode key;
+
+  /**
+   * Type of data get request was for
+   */
+  uint32_t type;
+
+  /**
+   * Continuation to call on service
+   * confirmation of message receipt.
+   */
+  GNUNET_SCHEDULER_Task cont;
+
+  /**
+   * Send continuation cls
+   */
+  void *cont_cls;
+};
+
+
+/**
  * Connection to the DHT service.
  */
 struct GNUNET_DHT_Handle
@@ -84,27 +189,26 @@
   struct GNUNET_CLIENT_TransmitHandle *th;
 
   /**
-   * List of the currently pending messages for the DHT service.
+   * Message we are currently sending, only allow
+   * a single message to be queued.  If not unique
+   * (typically a put request), await a confirmation
+   * from the service that the message was received.
+   * If unique, just fire and forget.
    */
-  struct PendingMessages *pending_list;
+  struct PendingMessage *current;
 
   /**
-   * Message we are currently sending.
+   * Hash map containing the current outstanding unique requests
    */
-  struct PendingMessages *current;
+  struct GNUNET_CONTAINER_MultiHashMap *outstanding_requests;
 
   /**
-   * Hash map containing the current outstanding get requests
+   * Non unique handle.  If set don't schedule another non
+   * unique request.
    */
-  struct GNUNET_CONTAINER_MultiHashMap *outstanding_get_requests;
+  struct GNUNET_DHT_NonUniqueHandle *non_unique_request;
 
   /**
-   * Hash map containing the current outstanding put requests, awaiting
-   * a response
-   */
-  struct GNUNET_CONTAINER_MultiHashMap *outstanding_put_requests;
-
-  /**
    * Kill off the connection and any pending messages.
    */
   int do_destroy;
@@ -116,6 +220,27 @@
 /* Forward declaration */
 static void process_pending_message(struct GNUNET_DHT_Handle *handle);
 
+static GNUNET_HashCode * hash_from_uid(uint64_t uid)
+{
+  int count;
+  int remaining;
+  GNUNET_HashCode *hash;
+  hash = GNUNET_malloc(sizeof(GNUNET_HashCode));
+  count = 0;
+
+  while (count < sizeof(GNUNET_HashCode))
+    {
+      remaining = sizeof(GNUNET_HashCode) - count;
+      if (remaining > sizeof(uid))
+        remaining = sizeof(uid);
+
+      memcpy(hash, &uid, remaining);
+      count += remaining;
+    }
+
+  return hash;
+}
+
 /**
  * Handler for messages received from the DHT service
  * a demultiplexer which handles numerous message types
@@ -124,9 +249,52 @@
 void service_message_handler (void *cls,
                               const struct GNUNET_MessageHeader *msg)
 {
+  struct GNUNET_DHT_Handle *handle = cls;
+  struct GNUNET_DHT_Message *dht_msg = (struct GNUNET_DHT_Message *)msg;
+  struct GNUNET_MessageHeader *enc_msg;
+  struct GNUNET_DHT_RouteHandle *route_handle;
+  GNUNET_HashCode *uid_hash;
+  size_t enc_size;
+  /* TODO: find out message type, handle callbacks for different types of 
messages.
+   * Should be a non unique acknowledgment, or unique result. */
 
-  /* TODO: find out message type, handle callbacks for different types of 
messages.
-   * Should be a put acknowledgment, get data or find node result. */
+#if DEBUG_DHT_API
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "`%s': Received response to message (uid %llu)\n", "DHT 
API", ntohl(dht_msg->unique_id));
+#endif
+
+  if (ntohs(dht_msg->unique))
+    {
+      uid_hash = hash_from_uid(ntohl(dht_msg->unique_id));
+
+      route_handle = 
GNUNET_CONTAINER_multihashmap_get(handle->outstanding_requests, uid_hash);
+      if (route_handle == NULL) /* We have no recollection of this request */
+        {
+#if DEBUG_DHT_API
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "`%s': Received response to message (uid %llu), but have 
no recollection of it!\n", "DHT API", ntohl(dht_msg->unique_id));
+#endif
+        }
+      else
+        {
+          enc_size = ntohs(dht_msg->header.size) - sizeof(struct 
GNUNET_DHT_Message);
+          GNUNET_assert(enc_size > 0);
+          enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
+          route_handle->iter(route_handle->iter_cls, enc_msg);
+        }
+    }
+  else
+    {
+      if (handle->current->unique_id == ntohl(dht_msg->unique_id))
+        {
+          handle->current->cont(handle->current->cont_cls, GNUNET_OK);
+          GNUNET_free(handle->current->msg);
+          handle->current = NULL;
+          GNUNET_free(handle->current);
+        }
+    }
+
+
 }
 
 
@@ -151,14 +319,14 @@
   default_request_timeout = 
GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 5);
   handle->cfg = cfg;
   handle->sched = sched;
-  handle->pending_list = NULL;
+
   handle->current = NULL;
   handle->do_destroy = GNUNET_NO;
   handle->th = NULL;
 
   handle->client = GNUNET_CLIENT_connect(sched, "dht", cfg);
-  handle->outstanding_get_requests = 
GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */
-  handle->outstanding_put_requests = 
GNUNET_CONTAINER_multihashmap_create(100); /* FIXME: better number */
+  handle->outstanding_requests = GNUNET_CONTAINER_multihashmap_create(ht_len);
+
   if (handle->client == NULL)
     return NULL;
 #if DEBUG_DHT_API
@@ -181,7 +349,6 @@
 void
 GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
 {
-  struct PendingMessages *pos;
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Called GNUNET_DHT_disconnect\n", "DHT API");
@@ -196,11 +363,6 @@
   if (handle->current != NULL) /* We are trying to send something now, clean 
it up */
     GNUNET_free(handle->current);
 
-  while (NULL != (pos = handle->pending_list)) /* Remove all pending sends 
from the list */
-    {
-      handle->pending_list = pos->next;
-      GNUNET_free(pos);
-    }
   if (handle->client != NULL) /* Finally, disconnect from the service */
     {
       GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
@@ -212,96 +374,26 @@
 
 
 /**
- * Handle to control a GET operation.
- */
-struct GNUNET_DHT_GetHandle
-{
-
-  /**
-   * Key that this get request is for
-   */
-  GNUNET_HashCode key;
-
-  /**
-   * Type of data get request was for
-   */
-  uint32_t type;
-
-  /**
-   * Iterator to call on data receipt
-   */
-  GNUNET_DHT_Iterator iter;
-
-  /**
-   * Closure for the iterator callback
-   */
-  void *iter_cls;
-
-  /**
-   * Main handle to this DHT api
-   */
-  struct GNUNET_DHT_Handle *dht_handle;
-};
-
-/**
- * Handle for a PUT request, holds callback
- */
-struct GNUNET_DHT_PutHandle
-{
-  /**
-   * Key that this get request is for
-   */
-  GNUNET_HashCode key;
-
-  /**
-   * Type of data get request was for
-   */
-  uint32_t type;
-
-  /**
-   * Continuation to call on put send
-   */
-  GNUNET_SCHEDULER_Task cont;
-
-  /**
-   * Send continuation cls
-   */
-  void *cont_cls;
-};
-
-/**
  * Send complete (or failed), schedule next (or don't)
  */
 static void
 finish (struct GNUNET_DHT_Handle *handle, int code)
 {
   /* TODO: if code is not GNUNET_OK, do something! */
-  struct PendingMessages *pos = handle->current;
-  struct GNUNET_DHT_GetMessage *get;
-  struct GNUNET_DHT_PutMessage *put;
+  struct PendingMessage *pos = handle->current;
 
   GNUNET_assert(pos != NULL);
 
-  switch (ntohs(pos->msg->type))
-  {
-    case GNUNET_MESSAGE_TYPE_DHT_GET:
-      get = (struct GNUNET_DHT_GetMessage *)pos->msg;
-      GNUNET_free(get);
-      break;
-    case GNUNET_MESSAGE_TYPE_DHT_PUT:
-      put = (struct GNUNET_DHT_PutMessage *)pos->msg;
-      GNUNET_free(put);
-      break;
-    default:
-      GNUNET_break(0);
-  }
+  if (pos->is_unique)
+    {
+      if (pos->cont != NULL)
+        pos->cont(pos->cont_cls, code);
 
-  handle->current = NULL;
-
-  if (code != GNUNET_SYSERR)
-    process_pending_message (handle);
-
-  GNUNET_free(pos);
+      GNUNET_free(pos->msg);
+      handle->current = NULL;
+      GNUNET_free(pos);
+    }
+  /* Otherwise we need to wait for a response to this message! */
 }
 
 /**
@@ -389,14 +481,6 @@
       //GNUNET_DHT_disconnect (handle); /* FIXME: replace with proper 
disconnect stuffs */
     }
 
-  /* schedule next action */
-  handle->current = handle->pending_list;
-  if (NULL == handle->current)
-    {
-      return;
-    }
-  handle->pending_list = handle->pending_list->next;
-  handle->current->next = NULL;
 
   if (NULL ==
       (handle->th = GNUNET_CLIENT_notify_transmit_ready (handle->client,
@@ -418,41 +502,122 @@
 }
 
 /**
- * Add a pending message to the linked list of messages which need to be sent
+ * Iterator called on each result obtained from a generic route
+ * operation
+ */
+void get_reply_iterator (void *cls,
+                         const struct GNUNET_MessageHeader *reply)
+{
+
+}
+
+/**
+ * Perform an asynchronous FIND_PEER operation on the DHT.
  *
- * @param handle handle to the specified DHT api
- * @param msg the message to add to the list
+ * @param handle handle to the DHT service
+ * @param key the key to look up
+ * @param desired_replication_level how many peers should ultimately receive
+ *                this message (advisory only, target may be too high for the
+ *                given DHT or not hit exactly).
+ * @param options options for routing
+ * @param enc send the encapsulated message to a peer close to the key
+ * @param iter function to call on each result, NULL if no replies are expected
+ * @param iter_cls closure for iter
+ * @param timeout when to abort with an error if we fail to get
+ *                a confirmation for the PUT from the local DHT service
+ * @param cont continuation to call when done;
+ *             reason will be TIMEOUT on error,
+ *             reason will be PREREQ_DONE on success
+ * @param cont_cls closure for cont
+ * @return handle to stop the request
  */
-static void add_pending(struct GNUNET_DHT_Handle *handle, struct 
GNUNET_MessageHeader *msg)
+struct GNUNET_DHT_RouteHandle *
+GNUNET_DHT_route_start (struct GNUNET_DHT_Handle *handle,
+                        const GNUNET_HashCode *key,
+                        unsigned int desired_replication_level,
+                        enum GNUNET_DHT_RouteOption options,
+                        const struct GNUNET_MessageHeader *enc,
+                        struct GNUNET_TIME_Relative timeout,
+                        GNUNET_DHT_ReplyProcessor iter,
+                        void *iter_cls,
+                        GNUNET_DHT_MessageCallback cont,
+                        void *cont_cls)
 {
-  struct PendingMessages *new_message;
-  struct PendingMessages *pos;
-  struct PendingMessages *last;
+  struct GNUNET_DHT_RouteHandle *route_handle;
+  struct PendingMessage *pending;
+  struct GNUNET_DHT_Message *message;
+  size_t is_unique;
+  size_t msize;
+  GNUNET_HashCode *uid_key;
+  int count;
 
-  new_message = GNUNET_malloc(sizeof(struct PendingMessages));
-  new_message->msg = msg;
-  new_message->timeout = default_request_timeout;
+  is_unique = GNUNET_YES;
+  if (iter == NULL)
+    is_unique = GNUNET_NO;
 
-  if (handle->pending_list != NULL)
+  route_handle = NULL;
+
+  if (is_unique)
     {
-      pos = handle->pending_list;
-      while(pos != NULL)
+      route_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_RouteHandle));
+      memcpy(&route_handle->key, key, sizeof(GNUNET_HashCode));
+      route_handle->iter = iter;
+      route_handle->iter_cls = iter_cls;
+      route_handle->dht_handle = handle;
+      route_handle->uid = GNUNET_CRYPTO_random_u64(GNUNET_CRYPTO_QUALITY_WEAK, 
-1);
+
+      count = 0;
+      uid_key = hash_from_uid(route_handle->uid);
+      /* While we have an outstanding request with the same identifier! */
+      while 
(GNUNET_CONTAINER_multihashmap_contains(handle->outstanding_requests, uid_key) 
== GNUNET_YES)
         {
-          last = pos;
-          pos = pos->next;
+          GNUNET_free(uid_key);
+          uid_key = hash_from_uid(route_handle->uid);
         }
-      new_message->next = last->next; /* Should always be null */
-      last->next = new_message;
+      /**
+       * Store based on random identifier!
+       */
+      GNUNET_CONTAINER_multihashmap_put(handle->outstanding_requests, uid_key, 
route_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+      msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size) + 
sizeof(route_handle->uid);
+      GNUNET_free(uid_key);
     }
   else
     {
-      new_message->next = handle->pending_list; /* Will always be null */
-      handle->pending_list = new_message;
+      msize = sizeof(struct GNUNET_DHT_Message) + ntohs(enc->size);
     }
 
+  message = GNUNET_malloc(msize);
+  message->header.size = htons(msize);
+  message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT);
+  memcpy(&message->key, key, sizeof(GNUNET_HashCode));
+  message->options = htons(options);
+  message->desired_replication_level = htons(options);
+  message->unique = htons(is_unique);
+
+  pending = GNUNET_malloc(sizeof(struct PendingMessage));
+  pending->msg = &message->header;
+  pending->timeout = timeout;
+  pending->cont = cont;
+  pending->cont_cls = cont_cls;
+  pending->is_unique = is_unique;
+
+  GNUNET_assert(handle->current == NULL);
+
   process_pending_message(handle);
+
+  return route_handle;
 }
 
+void
+GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *fph);
+
+
+void dht_get_processor (void *cls,
+                        const struct GNUNET_MessageHeader *reply)
+{
+
+}
+
 /**
  * Perform an asynchronous GET operation on the DHT identified.
  *
@@ -463,78 +628,112 @@
  * @param iter_cls closure for iter
  * @return handle to stop the async get
  */
-struct GNUNET_DHT_GetHandle *
+struct GNUNET_DHT_RouteHandle *
 GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
+                      struct GNUNET_TIME_Relative timeout,
                       uint32_t type,
                       const GNUNET_HashCode * key,
-                      GNUNET_DHT_Iterator iter,
+                      GNUNET_DHT_GetIterator iter,
                       void *iter_cls)
 {
+  struct GNUNET_DHT_GetContext *get_context;
   struct GNUNET_DHT_GetMessage *get_msg;
-  struct GNUNET_DHT_GetHandle *get_handle;
 
-  get_handle = 
GNUNET_CONTAINER_multihashmap_get(handle->outstanding_get_requests, key);
+  if (handle->current != NULL) /* Can't send right now, we have a pending 
message... */
+    return NULL;
 
-  if (get_handle != NULL)
-    {
-      /*
-       * A get has been previously sent, return existing handle.
-       * FIXME: should we re-transmit the request to the DHT service?
-       */
-      return get_handle;
-    }
+  get_context = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetContext));
+  get_context->iter = iter;
+  get_context->iter_cls = iter_cls;
 
-  get_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetHandle));
-  get_handle->type = type;
-  memcpy(&get_handle->key, key, sizeof(GNUNET_HashCode));
-  get_handle->iter = iter;
-  get_handle->iter_cls = iter_cls;
-
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Inserting pending get request with key %s\n", "DHT API", 
GNUNET_h2s(key));
 #endif
-  GNUNET_CONTAINER_multihashmap_put(handle->outstanding_get_requests, key, 
get_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
 
   get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
   get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET);
   get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
   get_msg->type = htonl(type);
-  memcpy(&get_msg->key, key, sizeof(GNUNET_HashCode));
 
-  add_pending(handle, &get_msg->header);
+  return GNUNET_DHT_route_start(handle, key, 0, 0, &get_msg->header, timeout, 
&get_reply_iterator, get_context, NULL, NULL);
 
-  return get_handle;
 }
 
 
+void
+GNUNET_DHT_route_stop (struct GNUNET_DHT_RouteHandle *route_handle)
+{
+  struct PendingMessage *pending;
+  struct GNUNET_DHT_StopMessage *message;
+  size_t msize;
+  GNUNET_HashCode *uid_key;
+
+  msize = sizeof(struct GNUNET_DHT_StopMessage);
+
+  message = GNUNET_malloc(msize);
+  message->header.size = htons(msize);
+  message->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_STOP);
+  message->unique_id = htonl(route_handle->uid);
+
+  pending = GNUNET_malloc(sizeof(struct PendingMessage));
+  pending->msg = (struct GNUNET_MessageHeader *)message;
+  pending->timeout = DEFAULT_DHT_TIMEOUT;
+  pending->cont = NULL;
+  pending->cont_cls = NULL;
+  pending->is_unique = GNUNET_NO;
+
+  GNUNET_assert(route_handle->dht_handle->current == NULL);
+
+  process_pending_message(route_handle->dht_handle);
+
+  uid_key = hash_from_uid(route_handle->uid);
+
+  if 
(GNUNET_CONTAINER_multihashmap_remove(route_handle->dht_handle->outstanding_requests,
 uid_key, route_handle) != GNUNET_YES)
+    {
+#if DEBUG_DHT_API
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': Remove outstanding request from hashmap failed for 
key %s, uid %llu\n", "DHT API", GNUNET_h2s(uid_key), route_handle->uid);
+#endif
+    }
+
+  return;
+}
+
+
 /**
  * Stop async DHT-get.  Frees associated resources.
  *
  * @param record GET operation to stop.
  */
 void
-GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
+GNUNET_DHT_get_stop (struct GNUNET_DHT_RouteHandle *handle)
 {
+#if OLDREMOVE
   struct GNUNET_DHT_GetMessage *get_msg;
   struct GNUNET_DHT_Handle *handle;
+  GNUNET_HashCode *uid_key;
+#endif
 
+  GNUNET_DHT_route_stop(handle);
+
+#if OLDREMOVE
+  uid_key = hash_from_uid(get_handle->uid);
+  
GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_requests,
 uid_key, get_handle) == GNUNET_YES);
+
   if (handle->do_destroy == GNUNET_NO)
     {
       get_msg = GNUNET_malloc(sizeof(struct GNUNET_DHT_GetMessage));
       get_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_GET_STOP);
       get_msg->header.size = htons(sizeof(struct GNUNET_DHT_GetMessage));
-      get_msg->type = htonl(get_handle->type);
-      memcpy(&get_msg->key, &get_handle->key, sizeof(GNUNET_HashCode));
 
-      add_pending(handle, &get_msg->header);
+
     }
+#endif
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Removing pending get request with key %s\n", "DHT API", 
GNUNET_h2s(&get_handle->key));
+              "`%s': Removing pending get request with key %s, uid %llu\n", 
"DHT API", GNUNET_h2s(&handle->key), handle->uid);
 #endif
-  
GNUNET_assert(GNUNET_CONTAINER_multihashmap_remove(handle->outstanding_get_requests,
 &get_handle->key, get_handle) == GNUNET_YES);
-  GNUNET_free(get_handle);
 }
 
 
@@ -562,44 +761,30 @@
                 const char *data,
                 struct GNUNET_TIME_Absolute exp,
                 struct GNUNET_TIME_Relative timeout,
-                GNUNET_SCHEDULER_Task cont,
+                GNUNET_DHT_MessageCallback cont,
                 void *cont_cls)
 {
   struct GNUNET_DHT_PutMessage *put_msg;
-  struct GNUNET_DHT_PutHandle *put_handle;
   size_t msize;
 
-  put_handle = 
GNUNET_CONTAINER_multihashmap_get(handle->outstanding_put_requests, key);
-
-  if (put_handle != NULL)
+  if (handle->current != NULL)
     {
-      /*
-       * A put has been previously queued, but not yet sent.
-       * FIXME: change the continuation function and callback or something?
-       */
+      cont(cont_cls, GNUNET_SYSERR);
       return;
     }
 
-  put_handle = GNUNET_malloc(sizeof(struct GNUNET_DHT_PutHandle));
-  put_handle->type = type;
-  memcpy(&put_handle->key, key, sizeof(GNUNET_HashCode));
-
 #if DEBUG_DHT_API
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "`%s': Inserting pending put request with key %s\n", "DHT API", 
GNUNET_h2s(key));
 #endif
 
-  GNUNET_CONTAINER_multihashmap_put(handle->outstanding_put_requests, key, 
put_handle, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
-
   msize = sizeof(struct GNUNET_DHT_PutMessage) + size;
   put_msg = GNUNET_malloc(msize);
   put_msg->header.type = htons(GNUNET_MESSAGE_TYPE_DHT_PUT);
   put_msg->header.size = htons(msize);
   put_msg->type = htonl(type);
-  memcpy(&put_msg->key, key, sizeof(GNUNET_HashCode));
   memcpy(&put_msg[1], data, size);
 
-  add_pending(handle, &put_msg->header);
+  GNUNET_DHT_route_start(handle, key, 0, 0, &put_msg->header, timeout, NULL, 
NULL, cont, cont_cls);
 
-  return;
 }

Modified: gnunet/src/dht/gnunet-service-dht.c
===================================================================
--- gnunet/src/dht/gnunet-service-dht.c 2010-03-18 14:55:38 UTC (rev 10655)
+++ gnunet/src/dht/gnunet-service-dht.c 2010-03-18 16:32:41 UTC (rev 10656)
@@ -88,42 +88,15 @@
 /**
  * Server handler for initiating local dht get requests
  */
-static void handle_dht_get (void *cls, struct GNUNET_SERVER_Client * client,
-                     const struct GNUNET_MessageHeader *message);
+static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client 
* client,
+                            const struct GNUNET_MessageHeader *message);
 
-/**
- * Server handler for stopping local dht get requests
- */
-static void handle_dht_get_stop (void *cls, struct GNUNET_SERVER_Client * 
client,
-                     const struct GNUNET_MessageHeader *message);
-
-/**
- * Server handler for initiating local dht find peer requests
- */
-static void handle_dht_find_peer (void *cls, struct GNUNET_SERVER_Client *
-                           client, const struct GNUNET_MessageHeader *
-                           message);
-
-/**
- * Server handler for stopping local dht find peer requests
- */
-static void handle_dht_find_peer_stop (void *cls, struct GNUNET_SERVER_Client *
-                           client, const struct GNUNET_MessageHeader *
-                           message);
-
-/**
- * Server handler for initiating local dht put requests
- */
-static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client,
-                     const struct GNUNET_MessageHeader *message);
-
-
 static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
-  {&handle_dht_get, NULL, GNUNET_MESSAGE_TYPE_DHT_GET, 0},
-  {&handle_dht_get_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_GET_STOP, 0},
+  {&handle_dht_plugin_message, NULL, GNUNET_MESSAGE_TYPE_DHT, 0},
+/*  {&handle_dht_get_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_GET_STOP, 0},
   {&handle_dht_put, NULL, GNUNET_MESSAGE_TYPE_DHT_PUT, 0},
   {&handle_dht_find_peer, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER, 0},
-  {&handle_dht_find_peer_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_STOP, 
0},
+  {&handle_dht_find_peer_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_FIND_PEER_STOP, 
0},*/
   {NULL, NULL, 0, 0}
 };
 
@@ -163,18 +136,16 @@
 };
 
 
+
 /**
  * Server handler for initiating local dht get requests
  */
-static void handle_dht_get (void *cls, struct GNUNET_SERVER_Client * client,
-                            const struct GNUNET_MessageHeader *message)
+static void handle_dht_get (void *cls, struct GNUNET_DHT_GetMessage *get_msg, 
GNUNET_HashCode *key)
 {
-  struct GNUNET_DHT_GetMessage *get_msg = (struct GNUNET_DHT_GetMessage 
*)message;
   GNUNET_HashCode get_key;
   size_t get_type;
 
   GNUNET_assert(ntohs(get_msg->header.size) >= sizeof(struct 
GNUNET_DHT_GetMessage));
-  memcpy(&get_key, &get_msg->key, sizeof(GNUNET_HashCode));
   get_type = ntohs(get_msg->type);
 
 #if DEBUG_DHT
@@ -182,92 +153,38 @@
               "`%s': Received `%s' request from client, message type %d, key 
%s\n", "DHT", "GET", get_type, GNUNET_h2s(&get_key));
 #endif
 
-  /* FIXME: Implement get stop functionality here */
-
+  /* FIXME: Implement get functionality here */
 }
 
-/**
- * Server handler for stopping local dht get requests
- */
-static void handle_dht_get_stop (void *cls, struct GNUNET_SERVER_Client * 
client,
-                          const struct GNUNET_MessageHeader *message)
-{
-  struct GNUNET_DHT_GetMessage *get_msg = (struct GNUNET_DHT_GetMessage 
*)message; /* Get message and get stop message are the same except for type */
-  GNUNET_HashCode get_key;
-  size_t get_type;
 
-  GNUNET_assert(ntohs(get_msg->header.size) >= sizeof(struct 
GNUNET_DHT_GetMessage));
-
-  memcpy(&get_key, &get_msg->key, sizeof(GNUNET_HashCode));
-  get_type = ntohs(get_msg->type);
-
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Received `%s' request from client, message type %d, key 
%s\n", "DHT", "GET STOP", get_type, GNUNET_h2s(&get_key));
-#endif
-
-  /* FIXME: Implement get stop functionality here */
-
-}
-
 /**
  * Server handler for initiating local dht find peer requests
  */
-static void handle_dht_find_peer (void *cls, struct GNUNET_SERVER_Client *
-                                  client, const struct GNUNET_MessageHeader *
-                                  message)
+static void handle_dht_find_peer (void *cls, struct GNUNET_DHT_FindPeerMessage 
*find_msg, GNUNET_HashCode *key)
 {
-  struct GNUNET_DHT_FindPeerMessage *find_msg = (struct 
GNUNET_DHT_FindPeerMessage *)message;
-  struct GNUNET_PeerIdentity peer;
 
   GNUNET_assert(ntohs(find_msg->header.size) == sizeof(struct 
GNUNET_DHT_FindPeerMessage));
-  memcpy(&peer, &find_msg->peer, sizeof(struct GNUNET_PeerIdentity));
 
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Received `%s' request from client, peer id %s\n", "DHT", 
"FIND PEER", GNUNET_i2s(&peer));
+              "`%s': Received `%s' request from client, key %s\n", "DHT", 
"FIND PEER", GNUNET_h2s(key));
 #endif
 
   /* FIXME: Implement find peer functionality here */
 }
 
-/**
- * Server handler for stopping local dht find peer requests
- */
-static void handle_dht_find_peer_stop (void *cls, struct GNUNET_SERVER_Client *
-                           client, const struct GNUNET_MessageHeader *
-                           message)
-{
-  struct GNUNET_DHT_FindPeerMessage *find_msg = (struct 
GNUNET_DHT_FindPeerMessage *)message; /* Find peer stop message is identical to 
find peer message */
-  struct GNUNET_PeerIdentity peer;
 
-  GNUNET_assert(ntohs(find_msg->header.size) == sizeof(struct 
GNUNET_DHT_FindPeerMessage));
-  memcpy(&peer, &find_msg->peer, sizeof(struct GNUNET_PeerIdentity));
-
-#if DEBUG_DHT
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Received `%s' request from client, for peer id %s\n", 
"DHT", "FIND PEER STOP", GNUNET_i2s(&peer));
-#endif
-
-  /* FIXME: Implement find peer stop functionality here */
-
-}
-
 /**
  * Server handler for initiating local dht put requests
  */
-static void handle_dht_put (void *cls, struct GNUNET_SERVER_Client * client,
-                     const struct GNUNET_MessageHeader *message)
+static void handle_dht_put (void *cls, struct GNUNET_DHT_PutMessage *put_msg, 
GNUNET_HashCode *key)
 {
-  struct GNUNET_DHT_PutMessage *put_msg = (struct GNUNET_DHT_PutMessage 
*)message;
-  GNUNET_HashCode put_key;
   size_t put_type;
   size_t data_size;
   char *data;
 
   GNUNET_assert(ntohs(put_msg->header.size) >= sizeof(struct 
GNUNET_DHT_PutMessage));
 
-  memcpy(&put_key, &put_msg->key, sizeof(GNUNET_HashCode));
   put_type = ntohs(put_msg->type);
   data_size = ntohs(put_msg->data_size);
   GNUNET_assert(ntohs(put_msg->header.size) == sizeof(struct 
GNUNET_DHT_PutMessage) + data_size);
@@ -276,7 +193,7 @@
 
 #if DEBUG_DHT
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "`%s': Received `%s' request from client, message type %d, key 
%s\n", "DHT", "PUT", put_type, GNUNET_h2s(&put_key));
+              "`%s': Received `%s' request from client, message type %d, key 
%s\n", "DHT", "PUT", put_type, GNUNET_h2s(key));
 #endif
 
   /**
@@ -284,10 +201,82 @@
    */
 
   GNUNET_free(data);
+}
 
+
+static void
+handle_dht_start_message(void *cls, struct GNUNET_DHT_Message *dht_msg)
+{
+  struct GNUNET_MessageHeader *enc_msg;
+  size_t enc_type;
+
+  enc_msg = (struct GNUNET_MessageHeader *)&dht_msg[1];
+  enc_type = ntohs(enc_msg->type);
+
+
+#if DEBUG_DHT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Received `%s' request from client, message type %d, key 
%s, uid %llu\n", "DHT", "GENERIC", enc_type, GNUNET_h2s(&dht_msg->key), 
ntohl(dht_msg->unique_id));
+#endif
+
+  /* FIXME: Implement demultiplexing functionality here */
+  switch (enc_type)
+    {
+    case GNUNET_MESSAGE_TYPE_DHT_GET:
+      handle_dht_get(cls, (struct GNUNET_DHT_GetMessage *)enc_msg, 
&dht_msg->key);
+      break;
+    case GNUNET_MESSAGE_TYPE_DHT_PUT:
+      handle_dht_put(cls, (struct GNUNET_DHT_PutMessage *)enc_msg, 
&dht_msg->key);
+      break;
+    case GNUNET_MESSAGE_TYPE_DHT_FIND_PEER:
+      handle_dht_find_peer(cls, (struct GNUNET_DHT_FindPeerMessage *)enc_msg, 
&dht_msg->key);
+      break;
+    default:
+#if DEBUG_DHT
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "`%s': Message type (%d) not handled\n", "DHT", enc_type);
+#endif
+    }
+
 }
 
+
+static void
+handle_dht_stop_message(void *cls, struct GNUNET_DHT_StopMessage *dht_stop_msg)
+{
+
+#if DEBUG_DHT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Received `%s' request from client, uid %llu\n", "DHT", 
"GENERIC STOP", ntohl(dht_stop_msg->unique_id));
+#endif
+}
+
+
+
 /**
+ * Server handler for initiating local dht get requests
+ */
+static void handle_dht_plugin_message (void *cls, struct GNUNET_SERVER_Client 
* client,
+                            const struct GNUNET_MessageHeader *message)
+{
+
+  #if DEBUG_DHT
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "`%s': Received `%s' request from client, message type %d, size 
%d\n", "DHT", "GENERIC", ntohs(message->type), ntohs(message->size));
+#endif
+
+  switch(ntohs(message->type))
+    {
+    case GNUNET_MESSAGE_TYPE_DHT:
+      handle_dht_start_message(cls, (struct GNUNET_DHT_Message *)message);
+    case GNUNET_MESSAGE_TYPE_DHT_STOP:
+      handle_dht_stop_message(cls, (struct GNUNET_DHT_StopMessage *)message);
+    }
+
+  GNUNET_SERVER_receive_done(client, GNUNET_OK);
+}
+
+/**
  * Core handler for p2p dht get requests.
  */
 static int handle_dht_p2p_get (void *cls,





reply via email to

[Prev in Thread] Current Thread [Next in Thread]