[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r25110 - in gnunet/src: fs include
From: |
gnunet |
Subject: |
[GNUnet-SVN] r25110 - in gnunet/src: fs include |
Date: |
Thu, 22 Nov 2012 19:27:12 +0100 |
Author: grothoff
Date: 2012-11-22 19:27:12 +0100 (Thu, 22 Nov 2012)
New Revision: 25110
Modified:
gnunet/src/fs/gnunet-service-fs.c
gnunet/src/fs/gnunet-service-fs.h
gnunet/src/fs/gnunet-service-fs_pr.c
gnunet/src/fs/gnunet-service-fs_stream.c
gnunet/src/include/gnunet_protocols.h
Log:
mostly finishing server-side for FS-over-stream
Modified: gnunet/src/fs/gnunet-service-fs.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs.c 2012-11-22 18:02:45 UTC (rev 25109)
+++ gnunet/src/fs/gnunet-service-fs.c 2012-11-22 18:27:12 UTC (rev 25110)
@@ -116,6 +116,11 @@
double GSF_current_priorities;
/**
+ * Size of the datastore queue we assume for common requests.
+ */
+unsigned int GSF_datastore_queue_size;
+
+/**
* How many query messages have we received 'recently' that
* have not yet been claimed as cover traffic?
*/
@@ -615,7 +620,18 @@
run (void *cls, struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
+ unsigned long long dqs;
+
GSF_cfg = cfg;
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs",
"DATASTORE_QUEUE_SIZE",
+ &dqs))
+ {
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
+ "fs", "DATASTORE_QUEUE_SIZE");
+ dqs = 1024;
+ }
+ GSF_datastore_queue_size = (unsigned int) dqs;
GSF_enable_randomized_delays =
GNUNET_CONFIGURATION_get_value_yesno (cfg, "fs", "DELAY");
GSF_dsh = GNUNET_DATASTORE_connect (cfg);
Modified: gnunet/src/fs/gnunet-service-fs.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs.h 2012-11-22 18:02:45 UTC (rev 25109)
+++ gnunet/src/fs/gnunet-service-fs.h 2012-11-22 18:27:12 UTC (rev 25110)
@@ -258,6 +258,12 @@
extern int GSF_enable_randomized_delays;
/**
+ * Size of the datastore queue we assume for common requests.
+ */
+extern unsigned int GSF_datastore_queue_size;
+
+
+/**
* Test if the DATABASE (GET) load on this peer is too high
* to even consider processing the query at
* all.
Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c 2012-11-22 18:02:45 UTC (rev
25109)
+++ gnunet/src/fs/gnunet-service-fs_pr.c 2012-11-22 18:27:12 UTC (rev
25110)
@@ -196,12 +196,6 @@
/**
- * Size of the datastore queue we assume for common requests.
- * Determined based on the network quota.
- */
-static unsigned int datastore_queue_size;
-
-/**
* Heap with the request that will expire next at the top. Contains
* pointers of type "struct PendingRequest*"; these will *also* be
* aliased from the "requests_by_peer" data structures and the
@@ -1307,7 +1301,7 @@
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED &
pr->public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
@@ -1347,7 +1341,7 @@
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED &
pr->public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
@@ -1405,7 +1399,7 @@
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED & pr->
public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
@@ -1487,7 +1481,7 @@
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED & pr->
public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
@@ -1639,8 +1633,6 @@
void
GSF_pending_request_init_ ()
{
- unsigned long long dqs;
-
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs",
"MAX_PENDING_REQUESTS",
@@ -1649,16 +1641,6 @@
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
"fs", "MAX_PENDING_REQUESTS");
}
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "fs",
"DATASTORE_QUEUE_SIZE",
- &dqs))
- {
- GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
- "fs", "DATASTORE_QUEUE_SIZE");
- dqs = 1024;
- }
- datastore_queue_size = (unsigned int) dqs;
-
active_to_migration =
GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
Modified: gnunet/src/fs/gnunet-service-fs_stream.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_stream.c 2012-11-22 18:02:45 UTC (rev
25109)
+++ gnunet/src/fs/gnunet-service-fs_stream.c 2012-11-22 18:27:12 UTC (rev
25110)
@@ -24,6 +24,7 @@
* @author Christian Grothoff
*
* TODO:
+ * - add statistics
* - limit # concurrent clients, timeout for read
*/
#include "platform.h"
@@ -33,6 +34,7 @@
#include "gnunet_protocols.h"
#include "gnunet_applications.h"
#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
#include "gnunet-service-fs_stream.h"
/**
@@ -66,6 +68,16 @@
struct GNUNET_STREAM_IOWriteHandle *wh;
/**
+ * Tokenizer for requests.
+ */
+ struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
+ /**
+ * Current active request to the datastore, if we have one pending.
+ */
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ /**
* Size of the last write that was initiated.
*/
size_t reply_size;
@@ -74,6 +86,56 @@
/**
+ * Query from one peer, asking the other for CHK-data.
+ */
+struct StreamQueryMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Block type must be DBLOCK or IBLOCK.
+ */
+ uint32_t type;
+
+ /**
+ * Query hash from CHK (hash of encrypted block).
+ */
+ struct GNUNET_HashCode query;
+
+};
+
+
+/**
+ * Reply to a StreamQueryMessage.
+ */
+struct StreamReplyMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Block type must be DBLOCK or IBLOCK.
+ */
+ uint32_t type;
+
+ /**
+ * Expiration time for the block.
+ */
+ struct GNUNET_TIME_AbsoluteNBO expiration;
+
+ /* followed by the encrypted block */
+
+};
+
+
+/**
* Listen socket for incoming requests.
*/
static struct GNUNET_STREAM_ListenSocket *listen_socket;
@@ -101,6 +163,9 @@
GNUNET_STREAM_io_read_cancel (sc->rh);
if (NULL != sc->wh)
GNUNET_STREAM_io_write_cancel (sc->wh);
+ if (NULL != sc->qe)
+ GNUNET_DATASTORE_cancel (sc->qe);
+ GNUNET_SERVER_mst_destroy (sc->mst);
GNUNET_STREAM_close (sc->socket);
GNUNET_CONTAINER_DLL_remove (sc_head,
sc_tail,
@@ -124,15 +189,70 @@
process_request (void *cls,
enum GNUNET_STREAM_Status status,
const void *data,
+ size_t size);
+
+
+/**
+ * We're done handling a request from a client, read the next one.
+ *
+ * @param sc client to continue reading requests from
+ */
+static void
+continue_reading (struct StreamClient *sc)
+{
+ int ret;
+
+ ret =
+ GNUNET_SERVER_mst_receive (sc->mst,
+ NULL,
+ NULL, 0,
+ GNUNET_NO, GNUNET_YES);
+ if (GNUNET_NO == ret)
+ return;
+ sc->rh = GNUNET_STREAM_read (sc->socket,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_request,
+ sc);
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on
timeout
+ * @return number of bytes of processed from 'data' (any data remaining should
be
+ * given to the next time the read processor is called).
+ */
+static size_t
+process_request (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *data,
size_t size)
{
struct StreamClient *sc = cls;
+ int ret;
sc->rh = NULL;
switch (status)
{
case GNUNET_STREAM_OK:
- // fixme: handle request...
+ ret =
+ GNUNET_SERVER_mst_receive (sc->mst,
+ NULL,
+ data, size,
+ GNUNET_NO, GNUNET_YES);
+ if (GNUNET_NO == ret)
+ return size; /* more messages in MST */
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break_op (0);
+ terminate_stream (sc);
+ return size;
+ }
break;
case GNUNET_STREAM_TIMEOUT:
case GNUNET_STREAM_SHUTDOWN:
@@ -144,15 +264,152 @@
GNUNET_break (0);
return size;
}
- sc->rh = GNUNET_STREAM_read (sc->socket,
- GNUNET_TIME_UNIT_FOREVER_REL,
- &process_request,
- sc);
+ continue_reading (sc);
return size;
}
/**
+ * Sending a reply was completed, continue processing.
+ *
+ * @param cls closure with the struct StreamClient which sent the query
+ */
+static void
+write_continuation (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size)
+{
+ struct StreamClient *sc = cls;
+
+ sc->wh = NULL;
+ if ( (GNUNET_STREAM_OK == status) &&
+ (size == sc->reply_size) )
+ continue_reading (sc);
+ else
+ terminate_stream (sc);
+}
+
+
+/**
+ * Process a datum that was stored in the datastore.
+ *
+ * @param cls closure with the struct StreamClient which sent the query
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+handle_datastore_reply (void *cls,
+ const struct GNUNET_HashCode * key,
+ size_t size, const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ struct StreamClient *sc = cls;
+ size_t msize = size + sizeof (struct StreamReplyMessage);
+ char buf[msize] GNUNET_ALIGN;
+ struct StreamReplyMessage *srm = (struct StreamReplyMessage *) buf;
+
+ sc->qe = NULL;
+ if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
+ {
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key,
+ size, data, type,
+ priority, anonymity,
+ expiration, uid,
+ &handle_datastore_reply,
+ sc))
+ {
+ continue_reading (sc);
+ }
+ return;
+ }
+ if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ continue_reading (sc);
+ return;
+ }
+ srm->header.size = htons ((uint16_t) msize);
+ srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
+ srm->type = htonl (type);
+ srm->expiration = GNUNET_TIME_absolute_hton (expiration);
+ memcpy (&srm[1], data, size);
+ sc->reply_size = msize;
+ sc->wh = GNUNET_STREAM_write (sc->socket,
+ buf, msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &write_continuation,
+ sc);
+ if (NULL == sc->wh)
+ {
+ terminate_stream (sc);
+ return;
+ }
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete message is received.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure with the 'struct StreamClient'
+ * @param client identification of the client, NULL
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+request_cb (void *cls,
+ void *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct StreamClient *sc = cls;
+ const struct StreamQueryMessage *sqm;
+
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
+ if (sizeof (struct StreamQueryMessage) !=
+ ntohs (message->size))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ sqm = (const struct StreamQueryMessage *) message;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received query for `%s' via stream\n",
+ GNUNET_h2s (&sqm->query));
+ sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+ 0,
+ &sqm->query,
+ ntohl (sqm->type),
+ 0 /* FIXME: priority */,
+ GSF_datastore_queue_size,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &handle_datastore_reply, sc);
+ if (NULL == sc->qe)
+ continue_reading (sc);
+ return GNUNET_OK;
+ default:
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+}
+
+
+/**
* Functions of this type are called upon new stream connection from other
peers
* or upon binding error which happen when the app_port given in
* GNUNET_STREAM_listen() is already taken.
@@ -175,6 +432,8 @@
return GNUNET_SYSERR;
sc = GNUNET_malloc (sizeof (struct StreamClient));
sc->socket = socket;
+ sc->mst = GNUNET_SERVER_mst_create (&request_cb,
+ sc);
sc->rh = GNUNET_STREAM_read (sc->socket,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_request,
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2012-11-22 18:02:45 UTC (rev
25109)
+++ gnunet/src/include/gnunet_protocols.h 2012-11-22 18:27:12 UTC (rev
25110)
@@ -483,7 +483,17 @@
*/
#define GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP 139
+/**
+ * P2P request for content (one FS to another via a stream).
+ */
+#define GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY 140
+/**
+ * P2P answer for content (one FS to another via a stream).
+ */
+#define GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY 141
+
+
/*******************************************************************************
* DHT message types
******************************************************************************/
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r25110 - in gnunet/src: fs include,
gnunet <=