gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r25132 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r25132 - gnunet/src/fs
Date: Sun, 25 Nov 2012 20:18:09 +0100

Author: grothoff
Date: 2012-11-25 20:18:09 +0100 (Sun, 25 Nov 2012)
New Revision: 25132

Modified:
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_stream.c
Log:
-switch to hash map for replies to avoid linear scan, add timeout for inactive 
clients

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2012-11-25 13:12:04 UTC (rev 
25131)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2012-11-25 19:18:09 UTC (rev 
25132)
@@ -1181,7 +1181,11 @@
     GNUNET_break (0 == data_size);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Error retrieiving block via stream\n");
-    /* FIXME: should re-try a few times... */
+    /* FIXME: maybe we should re-try a few times; but then
+       we MUST bound the number of re-tries to not keep
+       asking indefinitely with fresh streams; this should
+       be implemented if/when the stream code gets its
+       timeout/parallel-session limits */
     return;
   }
   if (GNUNET_YES !=

Modified: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c    2012-11-25 13:12:04 UTC (rev 
25131)
+++ gnunet/src/fs/gnunet-service-fs_stream.c    2012-11-25 19:18:09 UTC (rev 
25132)
@@ -24,7 +24,7 @@
  * @author Christian Grothoff
  *
  * TODO:
- * - limit # concurrent clients, have timeouts for server-side
+ * - limit # concurrent clients
  */
 #include "platform.h"
 #include "gnunet_constants.h"
@@ -37,6 +37,12 @@
 #include "gnunet-service-fs_stream.h"
 
 /**
+ * After how long do we termiante idle connections?
+ */
+#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 
2)
+
+
+/**
  * Information we keep around for each active streaming client.
  */
 struct StreamClient
@@ -82,6 +88,11 @@
   GNUNET_SCHEDULER_TaskIdentifier terminate_task;
 
   /**
+   * Task that is scheduled to terminate idle connections.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+  /**
    * Size of the last write that was initiated.
    */ 
   size_t reply_size;
@@ -210,16 +221,12 @@
   struct GSF_StreamRequest *pending_tail;
 
   /**
-   * Head of DLL of requests waiting for a reply on this stream.
+   * Map from query to 'struct GSF_StreamRequest's waiting for
+   * a reply.
    */
-  struct GSF_StreamRequest *waiting_head;
+  struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
 
   /**
-   * Tail of DLL of requests waiting for a reply on this stream.
-   */
-  struct GSF_StreamRequest *waiting_tail;
-
-  /**
    * Connection to the other peer.
    */
   struct GNUNET_STREAM_Socket *stream;
@@ -290,7 +297,31 @@
 
 /* ********************* client-side code ************************* */
 
+/**
+ * Iterator called on each entry in a waiting map to 
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to clean up
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+                   const struct GNUNET_HashCode *key,
+                   void *value)
+{
+  struct GSF_StreamRequest *sr = value;
 
+  sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+           GNUNET_TIME_UNIT_FOREVER_ABS,
+           0, NULL);
+  GSF_stream_query_cancel (sr);
+  return GNUNET_YES;
+}
+
+
 /**
  * Destroy a stream handle.
  *
@@ -308,13 +339,9 @@
              0, NULL);
     GSF_stream_query_cancel (sr);
   }
-  while (NULL != (sr = sh->waiting_head))
-  {
-    sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
-             GNUNET_TIME_UNIT_FOREVER_ABS,
-             0, NULL);
-    GSF_stream_query_cancel (sr);
-  }
+  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+                                        &free_waiting_entry,
+                                        sh);
   if (NULL != sh->wh)
     GNUNET_STREAM_io_write_cancel (sh->wh);
   if (NULL != sh->rh)
@@ -326,6 +353,7 @@
                 GNUNET_CONTAINER_multihashmap_remove (stream_map,
                                                       &sh->target.hashPubKey,
                                                       sh));
+  GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
   GNUNET_free (sh);
 }
 
@@ -357,6 +385,35 @@
 
 
 /**
+ * Iterator called on each entry in a waiting map to 
+ * move it back to the pending list.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to move to pending
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+move_to_pending (void *cls,
+                const struct GNUNET_HashCode *key,
+                void *value)
+{
+  struct StreamHandle *sh = cls;
+  struct GSF_StreamRequest *sr = value;
+  
+  GNUNET_assert (GNUNET_YES ==
+                GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+                                                      key,
+                                                      value));
+  GNUNET_CONTAINER_DLL_insert (sh->pending_head,
+                              sh->pending_tail,
+                              sr);
+  sr->was_transmitted = GNUNET_NO;
+  return GNUNET_YES;
+}
+
+
+/**
  * We had a serious error, tear down and re-create stream from scratch.
  *
  * @param sh stream to reset
@@ -364,8 +421,6 @@
 static void
 reset_stream (struct StreamHandle *sh)
 {
-  struct GSF_StreamRequest *sr;
-
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Resetting stream to %s\n",
              GNUNET_i2s (&sh->target));
@@ -373,16 +428,9 @@
     GNUNET_STREAM_io_read_cancel (sh->rh);
   GNUNET_STREAM_close (sh->stream);
   sh->is_ready = GNUNET_NO;
-  while (NULL != (sr = sh->waiting_tail))
-  {
-    GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
-                                sh->waiting_tail,
-                                sr);
-    GNUNET_CONTAINER_DLL_insert (sh->pending_head,
-                                sh->pending_tail,
-                                sr);
-    sr->was_transmitted = GNUNET_NO;
-  }
+  GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+                                        &move_to_pending,
+                                        sh);
   sh->stream = GNUNET_STREAM_open (GSF_cfg,
                                   &sh->target,
                                   GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
@@ -541,9 +589,10 @@
   GNUNET_CONTAINER_DLL_remove (sh->pending_head,
                               sh->pending_tail,
                               sr);
-  GNUNET_CONTAINER_DLL_insert_tail (sh->waiting_head,
-                                   sh->waiting_tail,
-                                   sr);
+  GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
+                                    &sr->query,
+                                    sr,
+                                    
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Sending query via stream to %s\n",
              GNUNET_i2s (&sh->target));
@@ -561,6 +610,67 @@
 
 
 /**
+ * Closure for 'handle_reply'.
+ */
+struct HandleReplyClosure
+{
+
+  /**
+   * Reply payload.
+   */ 
+  const void *data;
+
+  /**
+   * Expiration time for the block.
+   */
+  struct GNUNET_TIME_Absolute expiration;
+
+  /**
+   * Number of bytes in 'data'.
+   */
+  size_t data_size;
+
+  /** 
+   * Type of the block.
+   */
+  enum GNUNET_BLOCK_Type type;
+  
+  /**
+   * Did we have a matching query?
+   */
+  int found;
+};
+
+
+/**
+ * Iterator called on each entry in a waiting map to 
+ * process a result.
+ *
+ * @param cls the 'struct HandleReplyClosure'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to handle result for
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+handle_reply (void *cls,
+             const struct GNUNET_HashCode *key,
+             void *value)
+{
+  struct HandleReplyClosure *hrc = cls;
+  struct GSF_StreamRequest *sr = value;
+  
+  sr->proc (sr->proc_cls,
+           hrc->type,
+           hrc->expiration,
+           hrc->data_size,
+           hrc->data);
+  GSF_stream_query_cancel (sr);
+  hrc->found = GNUNET_YES;
+  return GNUNET_YES;
+}
+
+
+/**
  * Functions with this signature are called whenever a
  * complete reply is received.
  *
@@ -578,10 +688,10 @@
 {
   struct StreamHandle *sh = cls;
   const struct StreamReplyMessage *srm;
+  struct HandleReplyClosure hrc;
   uint16_t msize;
   enum GNUNET_BLOCK_Type type;
   struct GNUNET_HashCode query;
-  struct GSF_StreamRequest *sr;
 
   msize = ntohs (message->size);
   switch (ntohs (message->type))
@@ -611,24 +721,22 @@
     GNUNET_STATISTICS_update (GSF_stats,
                              gettext_noop ("# replies received via stream"), 1,
                              GNUNET_NO);
-    for (sr = sh->waiting_head; NULL != sr; sr = sr->next)
-      if (0 == memcmp (&query,
-                      &sr->query,
-                      sizeof (struct GNUNET_HashCode)))
-       break;
-    if (NULL == sr)
+    hrc.data = &srm[1];
+    hrc.data_size = msize;
+    hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+    hrc.type = type;
+    hrc.found = GNUNET_NO;
+    GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
+                                               &query,
+                                               &handle_reply,
+                                               &hrc);
+    if (GNUNET_NO == hrc.found)
     {
       GNUNET_STATISTICS_update (GSF_stats,
                                gettext_noop ("# replies received via stream 
dropped"), 1,
                                GNUNET_NO);
       return GNUNET_OK;
     }
-    sr->proc (sr->proc_cls,
-             type,
-             GNUNET_TIME_absolute_ntoh (srm->expiration),
-             msize,
-             &srm[1]);
-    GSF_stream_query_cancel (sr);
     return GNUNET_OK;
   default:
     GNUNET_break_op (0);
@@ -665,6 +773,7 @@
   sh = GNUNET_malloc (sizeof (struct StreamHandle));
   sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
                                      sh);
+  sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
   sh->target = *target;
   sh->stream = GNUNET_STREAM_open (GSF_cfg,
                                   &sh->target,
@@ -731,15 +840,15 @@
   struct StreamHandle *sh = sr->sh;
 
   if (GNUNET_YES == sr->was_transmitted)
-    GNUNET_CONTAINER_DLL_remove (sh->waiting_head,
-                                sh->waiting_tail,
-                                sr);
+    GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+                                         &sr->query,
+                                         sr);
   else
     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
                                 sh->pending_tail,
                                 sr);
   GNUNET_free (sr);
-  if ( (NULL == sh->waiting_head) &&
+  if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
        (NULL == sh->pending_head) )
     sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
                                                     &stream_timeout,
@@ -763,6 +872,8 @@
                            GNUNET_NO);
   if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
     GNUNET_SCHEDULER_cancel (sc->terminate_task); 
+  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
  if (NULL != sc->rh)
     GNUNET_STREAM_io_read_cancel (sc->rh);
   if (NULL != sc->wh)
@@ -796,6 +907,39 @@
 
 
 /**
+ * Task run to asynchronously terminate the stream due to timeout.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */ 
+static void
+timeout_stream_task (void *cls,
+                    const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct StreamClient *sc = cls;
+
+  sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+  terminate_stream (sc);
+}
+
+
+/**
+ * Reset the timeout for the stream client (due to activity).
+ *
+ * @param sc client handle to reset timeout for
+ */
+static void
+refresh_timeout_task (struct StreamClient *sc)
+{
+  if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+    GNUNET_SCHEDULER_cancel (sc->timeout_task); 
+  sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
+                                                  &timeout_stream_task,
+                                                  sc);
+}
+
+
+/**
  * We had a serious error, termiante stream,
  * but do so asynchronously.
  *
@@ -845,6 +989,7 @@
                               GNUNET_NO, GNUNET_YES);
   if (GNUNET_NO == ret)
     return; 
+  refresh_timeout_task (sc);
   sc->rh = GNUNET_STREAM_read (sc->socket,
                               GNUNET_TIME_UNIT_FOREVER_REL,
                               &process_request,
@@ -1049,6 +1194,7 @@
     GNUNET_STATISTICS_update (GSF_stats,
                              gettext_noop ("# queries received via stream"), 1,
                              GNUNET_NO);
+    refresh_timeout_task (sc);
     sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
                                       0,
                                       &sqm->query,
@@ -1106,6 +1252,7 @@
   GNUNET_CONTAINER_DLL_insert (sc_head,
                               sc_tail,
                               sc);
+  refresh_timeout_task (sc);
   return GNUNET_OK;
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]