gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r25110 - in gnunet/src: fs include


From: gnunet
Subject: [GNUnet-SVN] r25110 - in gnunet/src: fs include
Date: Thu, 22 Nov 2012 19:27:12 +0100

Author: grothoff
Date: 2012-11-22 19:27:12 +0100 (Thu, 22 Nov 2012)
New Revision: 25110

Modified:
   gnunet/src/fs/gnunet-service-fs.c
   gnunet/src/fs/gnunet-service-fs.h
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_stream.c
   gnunet/src/include/gnunet_protocols.h
Log:
mostly finishing server-side for FS-over-stream

Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c   2012-11-22 18:02:45 UTC (rev 25109)
+++ gnunet/src/fs/gnunet-service-fs.c   2012-11-22 18:27:12 UTC (rev 25110)
@@ -116,6 +116,11 @@
 double GSF_current_priorities;
 
 /**
+ * Size of the datastore queue we assume for common requests.
+ */
+unsigned int GSF_datastore_queue_size;
+
+/**
  * How many query messages have we received 'recently' that
  * have not yet been claimed as cover traffic?
  */
@@ -615,7 +620,18 @@
 run (void *cls, struct GNUNET_SERVER_Handle *server,
      const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
+  unsigned long long dqs;
+
   GSF_cfg = cfg;
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", 
"DATASTORE_QUEUE_SIZE",
+                                           &dqs))
+  {
+    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
+                              "fs", "DATASTORE_QUEUE_SIZE");
+    dqs = 1024;
+  }
+  GSF_datastore_queue_size = (unsigned int) dqs;
   GSF_enable_randomized_delays =
       GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY");
   GSF_dsh = GNUNET_DATASTORE_connect (cfg);

Modified: gnunet/src/fs/gnunet-service-fs.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs.h   2012-11-22 18:02:45 UTC (rev 25109)
+++ gnunet/src/fs/gnunet-service-fs.h   2012-11-22 18:27:12 UTC (rev 25110)
@@ -258,6 +258,12 @@
 extern int GSF_enable_randomized_delays;
 
 /**
+ * Size of the datastore queue we assume for common requests.
+ */
+extern unsigned int GSF_datastore_queue_size;
+
+
+/**
  * Test if the DATABASE (GET) load on this peer is too high
  * to even consider processing the query at
  * all.

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2012-11-22 18:02:45 UTC (rev 
25109)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2012-11-22 18:27:12 UTC (rev 
25110)
@@ -196,12 +196,6 @@
 
 
 /**
- * Size of the datastore queue we assume for common requests.
- * Determined based on the network quota.
- */
-static unsigned int datastore_queue_size;
-
-/**
  * Heap with the request that will expire next at the top.  Contains
  * pointers of type "struct PendingRequest*"; these will *also* be
  * aliased from the "requests_by_peer" data structures and the
@@ -1307,7 +1301,7 @@
                                   (0 !=
                                    (GSF_PRO_PRIORITY_UNLIMITED &
                                     pr->public_data.options)) ? UINT_MAX :
-                                  datastore_queue_size
+                                  GSF_datastore_queue_size
                                   /* max queue size */ ,
                                   GNUNET_TIME_UNIT_FOREVER_REL,
                                   &process_local_reply, pr);
@@ -1347,7 +1341,7 @@
                                   (0 !=
                                    (GSF_PRO_PRIORITY_UNLIMITED &
                                     pr->public_data.options)) ? UINT_MAX :
-                                  datastore_queue_size
+                                  GSF_datastore_queue_size
                                   /* max queue size */ ,
                                   GNUNET_TIME_UNIT_FOREVER_REL,
                                   &process_local_reply, pr);
@@ -1405,7 +1399,7 @@
                                 (0 !=
                                  (GSF_PRO_PRIORITY_UNLIMITED & pr->
                                   public_data.options)) ? UINT_MAX :
-                                datastore_queue_size
+                                GSF_datastore_queue_size
                                 /* max queue size */ ,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &process_local_reply, pr);
@@ -1487,7 +1481,7 @@
                                 (0 !=
                                  (GSF_PRO_PRIORITY_UNLIMITED & pr->
                                   public_data.options)) ? UINT_MAX :
-                                datastore_queue_size
+                                GSF_datastore_queue_size
                                 /* max queue size */ ,
                                 GNUNET_TIME_UNIT_FOREVER_REL,
                                 &process_local_reply, pr);
@@ -1639,8 +1633,6 @@
 void
 GSF_pending_request_init_ ()
 {
-  unsigned long long dqs;
-
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs",
                                              "MAX_PENDING_REQUESTS",
@@ -1649,16 +1641,6 @@
     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
                               "fs", "MAX_PENDING_REQUESTS");
   }
-  if (GNUNET_OK !=
-      GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs", 
"DATASTORE_QUEUE_SIZE",
-                                           &dqs))
-  {
-    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
-                              "fs", "DATASTORE_QUEUE_SIZE");
-    dqs = 1024;
-  }
-  datastore_queue_size = (unsigned int) dqs;
-
   active_to_migration =
       GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);

Modified: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c    2012-11-22 18:02:45 UTC (rev 
25109)
+++ gnunet/src/fs/gnunet-service-fs_stream.c    2012-11-22 18:27:12 UTC (rev 
25110)
@@ -24,6 +24,7 @@
  * @author Christian Grothoff
  *
  * TODO:
+ * - add statistics
  * - limit # concurrent clients, timeout for read
  */
 #include "platform.h"
@@ -33,6 +34,7 @@
 #include "gnunet_protocols.h"
 #include "gnunet_applications.h"
 #include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
 #include "gnunet-service-fs_stream.h"
 
 /**
@@ -66,6 +68,16 @@
   struct GNUNET_STREAM_IOWriteHandle *wh;
   
   /**
+   * Tokenizer for requests.
+   */
+  struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+  
+  /**
+   * Current active request to the datastore, if we have one pending.
+   */
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  /**
    * Size of the last write that was initiated.
    */ 
   size_t reply_size;
@@ -74,6 +86,56 @@
 
 
 /**
+ * Query from one peer, asking the other for CHK-data.
+ */
+struct StreamQueryMessage
+{
+
+  /**
+   * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Block type must be DBLOCK or IBLOCK.
+   */
+  uint32_t type;
+
+  /**
+   * Query hash from CHK (hash of encrypted block).
+   */
+  struct GNUNET_HashCode query;
+
+};
+
+
+/**
+ * Reply to a StreamQueryMessage.
+ */
+struct StreamReplyMessage
+{
+
+  /**
+   * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Block type must be DBLOCK or IBLOCK.
+   */
+  uint32_t type;
+
+  /**
+   * Expiration time for the block.
+   */
+  struct GNUNET_TIME_AbsoluteNBO expiration;
+
+  /* followed by the encrypted block */
+
+};
+
+
+/**
  * Listen socket for incoming requests.
  */
 static struct GNUNET_STREAM_ListenSocket *listen_socket;
@@ -101,6 +163,9 @@
     GNUNET_STREAM_io_read_cancel (sc->rh);
   if (NULL != sc->wh)
     GNUNET_STREAM_io_write_cancel (sc->wh);
+  if (NULL != sc->qe)
+    GNUNET_DATASTORE_cancel (sc->qe);
+  GNUNET_SERVER_mst_destroy (sc->mst);
   GNUNET_STREAM_close (sc->socket);
   GNUNET_CONTAINER_DLL_remove (sc_head,
                               sc_tail,
@@ -124,15 +189,70 @@
 process_request (void *cls,
                 enum GNUNET_STREAM_Status status,
                 const void *data,
+                size_t size);
+
+
+/**
+ * We're done handling a request from a client, read the next one.
+ *
+ * @param sc client to continue reading requests from
+ */
+static void
+continue_reading (struct StreamClient *sc)
+{
+  int ret;
+
+  ret = 
+    GNUNET_SERVER_mst_receive (sc->mst,
+                              NULL,
+                              NULL, 0,
+                              GNUNET_NO, GNUNET_YES);
+  if (GNUNET_NO == ret)
+    return; 
+  sc->rh = GNUNET_STREAM_read (sc->socket,
+                              GNUNET_TIME_UNIT_FOREVER_REL,
+                              &process_request,
+                              sc);      
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on 
timeout 
+ * @return number of bytes of processed from 'data' (any data remaining should 
be
+ *         given to the next time the read processor is called).
+ */
+static size_t 
+process_request (void *cls,
+                enum GNUNET_STREAM_Status status,
+                const void *data,
                 size_t size)
 {
   struct StreamClient *sc = cls;
+  int ret;
 
   sc->rh = NULL;
   switch (status)
   {
   case GNUNET_STREAM_OK:
-    // fixme: handle request...
+    ret = 
+      GNUNET_SERVER_mst_receive (sc->mst,
+                                NULL,
+                                data, size,
+                                GNUNET_NO, GNUNET_YES);
+    if (GNUNET_NO == ret)
+      return size; /* more messages in MST */
+    if (GNUNET_SYSERR == ret)
+    {
+      GNUNET_break_op (0);
+      terminate_stream (sc);
+      return size;
+    }
     break;
   case GNUNET_STREAM_TIMEOUT:
   case GNUNET_STREAM_SHUTDOWN:
@@ -144,15 +264,152 @@
     GNUNET_break (0);
     return size;
   }
-  sc->rh = GNUNET_STREAM_read (sc->socket,
-                              GNUNET_TIME_UNIT_FOREVER_REL,
-                              &process_request,
-                              sc);
+  continue_reading (sc);
   return size;
 }
 
 
 /**
+ * Sending a reply was completed, continue processing.
+ *
+ * @param cls closure with the struct StreamClient which sent the query
+ */
+static void
+write_continuation (void *cls,
+                   enum GNUNET_STREAM_Status status,
+                   size_t size)
+{
+  struct StreamClient *sc = cls;
+  
+  sc->wh = NULL;
+  if ( (GNUNET_STREAM_OK == status) &&
+       (size == sc->reply_size) )
+    continue_reading (sc);
+  else
+    terminate_stream (sc);    
+}
+
+
+/**
+ * Process a datum that was stored in the datastore.
+ *
+ * @param cls closure with the struct StreamClient which sent the query
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ */
+static void 
+handle_datastore_reply (void *cls,
+                       const struct GNUNET_HashCode * key,
+                       size_t size, const void *data,
+                       enum GNUNET_BLOCK_Type type,
+                       uint32_t priority,
+                       uint32_t anonymity,
+                       struct GNUNET_TIME_Absolute
+                       expiration, uint64_t uid)
+{
+  struct StreamClient *sc = cls;
+  size_t msize = size + sizeof (struct StreamReplyMessage);
+  char buf[msize] GNUNET_ALIGN;
+  struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
+
+  sc->qe = NULL;
+  if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
+  {
+    if (GNUNET_OK !=
+       GNUNET_FS_handle_on_demand_block (key,
+                                         size, data, type,
+                                         priority, anonymity,
+                                         expiration, uid,
+                                         &handle_datastore_reply,
+                                         sc))
+    {
+      continue_reading (sc);
+    }
+    return;
+  }
+  if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
+  {
+    GNUNET_break (0);
+    continue_reading (sc);
+    return;
+  }
+  srm->header.size = htons ((uint16_t) msize);
+  srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
+  srm->type = htonl (type);
+  srm->expiration = GNUNET_TIME_absolute_hton (expiration);
+  memcpy (&srm[1], data, size);
+  sc->reply_size = msize;
+  sc->wh = GNUNET_STREAM_write (sc->socket,
+                               buf, msize,
+                               GNUNET_TIME_UNIT_FOREVER_REL,
+                               &write_continuation,
+                               sc);
+  if (NULL == sc->wh)
+  {
+    terminate_stream (sc);
+    return;
+  }
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure with the 'struct StreamClient'
+ * @param client identification of the client, NULL
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+request_cb (void *cls,
+           void *client,
+           const struct GNUNET_MessageHeader *message)
+{
+  struct StreamClient *sc = cls;
+  const struct StreamQueryMessage *sqm;
+
+  switch (ntohs (message->type))
+  {
+  case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
+    if (sizeof (struct StreamQueryMessage) != 
+       ntohs (message->size))
+    {
+      GNUNET_break_op (0);
+      return GNUNET_SYSERR;
+    }
+    sqm = (const struct StreamQueryMessage *) message;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+               "Received query for `%s' via stream\n",
+               GNUNET_h2s (&sqm->query));
+    sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+                                      0,
+                                      &sqm->query,
+                                      ntohl (sqm->type),
+                                      0 /* FIXME: priority */, 
+                                      GSF_datastore_queue_size,
+                                      GNUNET_TIME_UNIT_FOREVER_REL,
+                                      &handle_datastore_reply, sc);
+    if (NULL == sc->qe)
+      continue_reading (sc);
+    return GNUNET_OK;
+  default:
+    GNUNET_break_op (0);
+    return GNUNET_SYSERR;
+  }
+}
+
+
+/**
  * Functions of this type are called upon new stream connection from other 
peers
  * or upon binding error which happen when the app_port given in
  * GNUNET_STREAM_listen() is already taken.
@@ -175,6 +432,8 @@
     return GNUNET_SYSERR;
   sc = GNUNET_malloc (sizeof (struct StreamClient));
   sc->socket = socket;
+  sc->mst = GNUNET_SERVER_mst_create (&request_cb,
+                                     sc);
   sc->rh = GNUNET_STREAM_read (sc->socket,
                               GNUNET_TIME_UNIT_FOREVER_REL,
                               &process_request,

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2012-11-22 18:02:45 UTC (rev 
25109)
+++ gnunet/src/include/gnunet_protocols.h       2012-11-22 18:27:12 UTC (rev 
25110)
@@ -483,7 +483,17 @@
  */
 #define GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP 139
 
+/**
+ * P2P request for content (one FS to another via a stream).
+ */
+#define GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY 140
 
+/**
+ * P2P answer for content (one FS to another via a stream).
+ */
+#define GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY 141
+
+
 
/*******************************************************************************
  * DHT message types
  
******************************************************************************/




reply via email to

[Prev in Thread] Current Thread [Next in Thread]