[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r37452 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r37452 - gnunet/src/fs |
Date: |
Sun, 3 Jul 2016 15:18:48 +0200 |
Author: grothoff
Date: 2016-07-03 15:18:48 +0200 (Sun, 03 Jul 2016)
New Revision: 37452
Modified:
gnunet/src/fs/fs_api.h
gnunet/src/fs/fs_publish.c
Log:
convert fs publish to MQ
Modified: gnunet/src/fs/fs_api.h
===================================================================
--- gnunet/src/fs/fs_api.h 2016-07-03 12:50:36 UTC (rev 37451)
+++ gnunet/src/fs/fs_api.h 2016-07-03 13:18:48 UTC (rev 37452)
@@ -1203,11 +1203,6 @@
struct GNUNET_FS_Handle *h;
/**
- * Connection to FS service (only used for LOC URI signing).
- */
- struct GNUNET_CLIENT_Connection *fs_client;
-
- /**
* Our top-level activity entry (if we are top-level, otherwise NULL).
*/
struct TopLevelActivity *top;
@@ -1242,7 +1237,7 @@
* Our own message queue for the FS service; only briefly used when
* we start to index a file, otherwise NULL.
*/
- struct GNUNET_CLIENT_Connection *client;
+ struct GNUNET_MQ_Handle *mq;
/**
* Current position in the file-tree for the upload.
Modified: gnunet/src/fs/fs_publish.c
===================================================================
--- gnunet/src/fs/fs_publish.c 2016-07-03 12:50:36 UTC (rev 37451)
+++ gnunet/src/fs/fs_publish.c 2016-07-03 13:18:48 UTC (rev 37452)
@@ -92,10 +92,10 @@
GNUNET_DATASTORE_disconnect (pc->dsh, GNUNET_NO);
pc->dsh = NULL;
}
- if (NULL != pc->client)
+ if (NULL != pc->mq)
{
- GNUNET_CLIENT_disconnect (pc->client);
- pc->client = NULL;
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
}
GNUNET_assert (NULL == pc->upload_task);
GNUNET_free (pc);
@@ -493,7 +493,8 @@
p = pc->fi_pos;
if (NULL == pc->dsh)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Waiting for datastore connection\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Waiting for datastore connection\n");
GNUNET_assert (NULL == pc->upload_task);
pc->upload_task =
GNUNET_SCHEDULER_add_with_priority
@@ -679,53 +680,105 @@
/**
- * Process the response (or lack thereof) from
- * the "fs" service to our 'start index' request.
+ * Check the response from the "fs" service to our 'start index'
+ * request.
*
* @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
* @param msg the response we got
*/
+static int
+check_index_start_failed (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ size_t msize = ntohs (msg->size) - sizeof (*msg);
+ const char *emsg = (const char *) &msg[1];
+
+ if (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0')
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process the response from the "fs" service to our 'start index'
+ * request.
+ *
+ * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
+ * @param msg the response we got
+ */
static void
-process_index_start_response (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_index_start_failed (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_FS_PublishContext *pc = cls;
struct GNUNET_FS_FileInformation *p;
- const char *emsg;
- uint16_t msize;
+ const char *emsg = (const char *) &msg[1];
- GNUNET_CLIENT_disconnect (pc->client);
- pc->client = NULL;
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
p = pc->fi_pos;
- if (NULL == msg)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Can not index file `%s': %s. Will try to insert instead.\n"),
+ p->filename,
+ gettext (emsg));
+ p->data.file.do_index = GNUNET_NO;
+ GNUNET_FS_file_information_sync_ (p);
+ publish_content (pc);
+}
+
+
+/**
+ * Process the response from the "fs" service to our 'start index'
+ * request.
+ *
+ * @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
+ * @param msg the response we got
+ */
+static void
+handle_index_start_ok (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_FS_PublishContext *pc = cls;
+ struct GNUNET_FS_FileInformation *p;
+
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
+ p = pc->fi_pos;
+ p->data.file.index_start_confirmed = GNUNET_YES;
+ GNUNET_FS_file_information_sync_ (p);
+ publish_content (pc);
+}
+
+
+/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_FS_PublishContext *`
+ * @param error error code
+ */
+static void
+index_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_FS_PublishContext *pc = cls;
+ struct GNUNET_FS_FileInformation *p;
+
+ if (NULL != pc->mq)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Can not index file `%s': %s. Will try to insert
instead.\n"),
- p->filename,
- _("timeout on index-start request to `fs' service"));
- p->data.file.do_index = GNUNET_NO;
- GNUNET_FS_file_information_sync_ (p);
- publish_content (pc);
- return;
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
}
- if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK)
- {
- msize = ntohs (msg->size);
- emsg = (const char *) &msg[1];
- if ((msize <= sizeof (struct GNUNET_MessageHeader)) ||
- (emsg[msize - sizeof (struct GNUNET_MessageHeader) - 1] != '\0'))
- emsg = gettext_noop ("unknown error");
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _
- ("Can not index file `%s': %s. Will try to insert
instead.\n"),
- p->filename, gettext (emsg));
- p->data.file.do_index = GNUNET_NO;
- GNUNET_FS_file_information_sync_ (p);
- publish_content (pc);
- return;
- }
- p->data.file.index_start_confirmed = GNUNET_YES;
- /* success! continue with indexing */
+ p = pc->fi_pos;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Can not index file `%s': %s. Will try to insert instead.\n"),
+ p->filename,
+ _("error on index-start request to `fs' service"));
+ p->data.file.do_index = GNUNET_NO;
GNUNET_FS_file_information_sync_ (p);
publish_content (pc);
}
@@ -742,11 +795,22 @@
hash_for_index_cb (void *cls,
const struct GNUNET_HashCode *res)
{
+ GNUNET_MQ_hd_fixed_size (index_start_ok,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK,
+ struct GNUNET_MessageHeader);
+ GNUNET_MQ_hd_var_size (index_start_failed,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED,
+ struct GNUNET_MessageHeader);
struct GNUNET_FS_PublishContext *pc = cls;
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_index_start_ok_handler (pc),
+ make_index_start_failed_handler (pc),
+ GNUNET_MQ_handler_end ()
+ };
struct GNUNET_FS_FileInformation *p;
+ struct GNUNET_MQ_Envelope *env;
struct IndexStartMessage *ism;
size_t slen;
- struct GNUNET_CLIENT_Connection *client;
uint64_t dev;
uint64_t ino;
char *fn;
@@ -785,8 +849,10 @@
publish_content (pc);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Hash of indexed file `%s' is `%s'\n",
- p->filename, GNUNET_h2s (res));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Hash of indexed file `%s' is `%s'\n",
+ p->filename,
+ GNUNET_h2s (res));
if (0 != (pc->options & GNUNET_FS_PUBLISH_OPTION_SIMULATE_ONLY))
{
p->data.file.file_id = *res;
@@ -797,8 +863,12 @@
GNUNET_free (fn);
return;
}
- client = GNUNET_CLIENT_connect ("fs", pc->h->cfg);
- if (NULL == client)
+ pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg,
+ "fs",
+ handlers,
+ &index_mq_error_handler,
+ pc);
+ if (NULL == pc->mq)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Can not index file `%s': %s. Will try to insert
instead.\n"),
@@ -815,10 +885,13 @@
p->data.file.have_hash = GNUNET_YES;
GNUNET_FS_file_information_sync_ (p);
}
- ism = GNUNET_malloc (sizeof (struct IndexStartMessage) + slen);
- ism->header.size = htons (sizeof (struct IndexStartMessage) + slen);
- ism->header.type = htons (GNUNET_MESSAGE_TYPE_FS_INDEX_START);
- if (GNUNET_OK == GNUNET_DISK_file_get_identifiers (p->filename, &dev, &ino))
+ env = GNUNET_MQ_msg_extra (ism,
+ slen,
+ GNUNET_MESSAGE_TYPE_FS_INDEX_START);
+ if (GNUNET_OK ==
+ GNUNET_DISK_file_get_identifiers (p->filename,
+ &dev,
+ &ino))
{
ism->device = GNUNET_htonll (dev);
ism->inode = GNUNET_htonll (ino);
@@ -826,19 +899,16 @@
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- _("Failed to get file identifiers for `%s'\n"), p->filename);
+ _("Failed to get file identifiers for `%s'\n"),
+ p->filename);
}
ism->file_id = *res;
- memcpy (&ism[1], fn, slen);
+ memcpy (&ism[1],
+ fn,
+ slen);
GNUNET_free (fn);
- pc->client = client;
- GNUNET_break (GNUNET_YES ==
- GNUNET_CLIENT_transmit_and_get_response (client, &ism->header,
-
GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
-
&process_index_start_response,
- pc));
- GNUNET_free (ism);
+ GNUNET_MQ_send (pc->mq,
+ env);
}
@@ -862,7 +932,8 @@
p->chk_uri,
&p->bo,
pc->options,
- &publish_kblocks_cont, pc);
+ &publish_kblocks_cont,
+ pc);
}
else
{
@@ -872,40 +943,24 @@
/**
- * Process the response (or lack thereof) from
- * the "fs" service to our LOC sign request.
+ * Process the response from the "fs" service to our LOC sign request.
*
* @param cls closure (of type `struct GNUNET_FS_PublishContext *`)
- * @param msg the response we got
+ * @param sig the response we got
*/
static void
-process_signature_response (void *cls,
- const struct GNUNET_MessageHeader *msg)
+handle_signature_response (void *cls,
+ const struct ResponseLocSignatureMessage *sig)
{
struct GNUNET_FS_PublishContext *pc = cls;
- const struct ResponseLocSignatureMessage *sig;
struct GNUNET_FS_FileInformation *p;
p = pc->fi_pos;
- if (NULL == msg)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Can not create LOC URI. Will continue with CHK
instead.\n"));
- publish_kblocks (pc);
- return;
- }
- if (sizeof (struct ResponseLocSignatureMessage) !=
- ntohs (msg->size))
- {
- GNUNET_break (0);
- publish_kblocks (pc);
- return;
- }
- sig = (const struct ResponseLocSignatureMessage *) msg;
p->chk_uri->type = GNUNET_FS_URI_LOC;
/* p->data.loc.fi kept from CHK before */
p->chk_uri->data.loc.peer = sig->peer;
- p->chk_uri->data.loc.expirationTime = GNUNET_TIME_absolute_ntoh
(sig->expiration_time);
+ p->chk_uri->data.loc.expirationTime
+ = GNUNET_TIME_absolute_ntoh (sig->expiration_time);
p->chk_uri->data.loc.contentSignature = sig->signature;
GNUNET_FS_file_information_sync_ (p);
GNUNET_FS_publish_sync_ (pc);
@@ -914,6 +969,31 @@
/**
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
+ *
+ * @param cls closure with the `struct GNUNET_FS_PublishContext *`
+ * @param error error code
+ */
+static void
+loc_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ struct GNUNET_FS_PublishContext *pc = cls;
+
+ if (NULL != pc->mq)
+ {
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = NULL;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _("Can not create LOC URI. Will continue with CHK instead.\n"));
+ publish_kblocks (pc);
+}
+
+
+/**
* We're publishing without anonymity. Contact the FS service
* to create a signed LOC URI for further processing, then
* continue with KSKs.
@@ -923,12 +1003,25 @@
static void
create_loc_uri (struct GNUNET_FS_PublishContext *pc)
{
- struct RequestLocSignatureMessage req;
+ GNUNET_MQ_hd_fixed_size (signature_response,
+ GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGNATURE,
+ struct ResponseLocSignatureMessage);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_signature_response_handler (pc),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
+ struct RequestLocSignatureMessage *req;
struct GNUNET_FS_FileInformation *p;
- if (NULL == pc->client)
- pc->client = GNUNET_CLIENT_connect ("fs", pc->h->cfg);
- if (NULL == pc->client)
+ if (NULL != pc->mq)
+ GNUNET_MQ_destroy (pc->mq);
+ pc->mq = GNUNET_CLIENT_connecT (pc->h->cfg,
+ "fs",
+ handlers,
+ &loc_mq_error_handler,
+ pc);
+ if (NULL == pc->mq)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
_("Can not create LOC URI. Will continue with CHK
instead.\n"));
@@ -936,19 +1029,14 @@
return;
}
p = pc->fi_pos;
- req.header.size = htons (sizeof (struct RequestLocSignatureMessage));
- req.header.type = htons (GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN);
- req.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT);
- req.expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time);
- req.chk = p->chk_uri->data.chk.chk;
- req.file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length);
- GNUNET_break (GNUNET_YES ==
- GNUNET_CLIENT_transmit_and_get_response (pc->client,
- &req.header,
-
GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
-
&process_signature_response,
- pc));
+ env = GNUNET_MQ_msg (req,
+ GNUNET_MESSAGE_TYPE_FS_REQUEST_LOC_SIGN);
+ req->purpose = htonl (GNUNET_SIGNATURE_PURPOSE_PEER_PLACEMENT);
+ req->expiration_time = GNUNET_TIME_absolute_hton (p->bo.expiration_time);
+ req->chk = p->chk_uri->data.chk.chk;
+ req->file_length = GNUNET_htonll (p->chk_uri->data.chk.file_length);
+ GNUNET_MQ_send (pc->mq,
+ env);
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r37452 - gnunet/src/fs,
gnunet <=