[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r33309 - in gnunet/src: include peerstore
From: |
gnunet |
Subject: |
[GNUnet-SVN] r33309 - in gnunet/src: include peerstore |
Date: |
Fri, 16 May 2014 18:45:43 +0200 |
Author: otarabai
Date: 2014-05-16 18:45:43 +0200 (Fri, 16 May 2014)
New Revision: 33309
Modified:
gnunet/src/include/gnunet_peerstore_service.h
gnunet/src/peerstore/gnunet-service-peerstore.c
gnunet/src/peerstore/peerstore_api.c
gnunet/src/peerstore/peerstore_common.c
gnunet/src/peerstore/peerstore_common.h
gnunet/src/peerstore/test_peerstore_api.c
Log:
peerstore API now uses MQ
Modified: gnunet/src/include/gnunet_peerstore_service.h
===================================================================
--- gnunet/src/include/gnunet_peerstore_service.h 2014-05-16 12:54:18 UTC
(rev 33308)
+++ gnunet/src/include/gnunet_peerstore_service.h 2014-05-16 16:45:43 UTC
(rev 33309)
@@ -48,6 +48,44 @@
struct GNUNET_PEERSTORE_StoreContext;
/**
+ * Single PEERSTORE record
+ */
+struct GNUNET_PEERSTORE_Record
+{
+
+ /**
+ * Responsible sub system string
+ */
+ char *sub_system;
+
+ /**
+ * Peer Identity
+ */
+ struct GNUNET_PeerIdentity *peer;
+
+ /**
+ * Record key string
+ */
+ char *key;
+
+ /**
+ * Record value BLOB
+ */
+ void *value;
+
+ /**
+ * Size of 'value' BLOB
+ */
+ size_t value_size;
+
+ /**
+ * Expiry time of entry
+ */
+ struct GNUNET_TIME_Absolute *expiry;
+
+};
+
+/**
* Continuation called with a status result.
*
* @param cls closure
Modified: gnunet/src/peerstore/gnunet-service-peerstore.c
===================================================================
--- gnunet/src/peerstore/gnunet-service-peerstore.c 2014-05-16 12:54:18 UTC
(rev 33308)
+++ gnunet/src/peerstore/gnunet-service-peerstore.c 2014-05-16 16:45:43 UTC
(rev 33309)
@@ -87,7 +87,7 @@
* @param sub_system name of the GNUnet sub system responsible
* @param value stored value
* @param size size of stored value
- */
+ *
int record_iterator(void *cls,
const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
@@ -108,7 +108,7 @@
GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
GNUNET_SERVER_transmit_context_append_message(tc, (const struct
GNUNET_MessageHeader *)srm);
return GNUNET_YES;
-}
+}*/
/**
* Handle an iterate request from client
@@ -116,7 +116,7 @@
* @param cls unused
* @param client identification of the client
* @param message the actual message
- */
+ *
void handle_iterate (void *cls,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
@@ -147,7 +147,7 @@
{
}
-}
+}*/
/**
* Handle a store request from client
@@ -190,7 +190,7 @@
record->key,
record->value,
record->value_size,
- record->expiry))
+ *record->expiry))
{
response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK;
}
@@ -220,7 +220,7 @@
{
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
{&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0},
- {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
+// {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
{NULL, NULL, 0, 0}
};
char *database;
Modified: gnunet/src/peerstore/peerstore_api.c
===================================================================
--- gnunet/src/peerstore/peerstore_api.c 2014-05-16 12:54:18 UTC (rev
33308)
+++ gnunet/src/peerstore/peerstore_api.c 2014-05-16 16:45:43 UTC (rev
33309)
@@ -50,6 +50,21 @@
*/
struct GNUNET_CLIENT_Connection *client;
+ /**
+ * Message queue
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Head of active STORE requests.
+ */
+ struct GNUNET_PEERSTORE_StoreContext *store_head;
+
+ /**
+ * Tail of active STORE requests.
+ */
+ struct GNUNET_PEERSTORE_StoreContext *store_tail;
+
};
/**
@@ -57,13 +72,27 @@
*/
struct GNUNET_PEERSTORE_StoreContext
{
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_StoreContext *next;
/**
+ * Kept in a DLL.
+ */
+ struct GNUNET_PEERSTORE_StoreContext *prev;
+
+ /**
* Handle to the PEERSTORE service.
*/
struct GNUNET_PEERSTORE_Handle *h;
/**
+ * MQ Envelope with store request message
+ */
+ struct GNUNET_MQ_Envelope *ev;
+
+ /**
* Continuation called with service response
*/
GNUNET_PEERSTORE_Continuation cont;
@@ -73,9 +102,27 @@
*/
void *cont_cls;
+ /**
+ * #GNUNET_YES / #GNUNET_NO
+ * if sent, cannot be canceled
+ */
+ int request_sent;
+
};
/******************************************************************************/
+/******************* DECLARATIONS
*********************/
+/******************************************************************************/
+
+/**
+ * When a response for store request is received
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ * @param msg message received, NULL on timeout or fatal error
+ */
+void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg);
+
+/******************************************************************************/
/******************* CONNECTION FUNCTIONS
*********************/
/******************************************************************************/
@@ -85,7 +132,7 @@
* @param h handle to the service
*/
static void
-reconnect (struct GNUNET_PEERSTORE_Handle *h)
+reconnect (struct GNUNET_PEERSTORE_Handle *h) //FIXME: MQ friendly
{
LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
@@ -98,6 +145,12 @@
}
+static void
+handle_client_error (void *cls, enum GNUNET_MQ_Error error) //FIXME: implement
+{
+ //struct GNUNET_PEERSTORE_Handle *h = cls;
+}
+
/**
* Connect to the PEERSTORE service.
*
@@ -106,15 +159,30 @@
struct GNUNET_PEERSTORE_Handle *
GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
{
- struct GNUNET_CLIENT_Connection *client;
struct GNUNET_PEERSTORE_Handle *h;
+ static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+ {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK,
sizeof(struct GNUNET_MessageHeader)},
+ {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL,
sizeof(struct GNUNET_MessageHeader)},
+ GNUNET_MQ_HANDLERS_END
+ };
- client = GNUNET_CLIENT_connect ("peerstore", cfg);
- if(NULL == client)
+ h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
+ h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
+ if(NULL == h->client)
+ {
+ GNUNET_free(h);
return NULL;
- h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
- h->client = client;
+ }
h->cfg = cfg;
+ h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
+ mq_handlers,
+ &handle_client_error,
+ h);
+ if(NULL == h->mq)
+ {
+ GNUNET_free(h);
+ return NULL;
+ }
LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
return h;
}
@@ -128,6 +196,11 @@
void
GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
{
+ if(NULL != h->mq)
+ {
+ GNUNET_MQ_destroy(h->mq);
+ h->mq = NULL;
+ }
if (NULL != h->client)
{
GNUNET_CLIENT_disconnect (h->client);
@@ -139,7 +212,7 @@
/******************************************************************************/
-/******************* ADD FUNCTIONS
*********************/
+/******************* STORE FUNCTIONS
*********************/
/******************************************************************************/
/**
@@ -148,41 +221,77 @@
* @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
* @param msg message received, NULL on timeout or fatal error
*/
-void store_response_receiver (void *cls, const struct GNUNET_MessageHeader
*msg)
+void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg)
//FIXME: MQ friendly
{
- struct GNUNET_PEERSTORE_StoreContext *sc = cls;
+ struct GNUNET_PEERSTORE_Handle *h = cls;
+ struct GNUNET_PEERSTORE_StoreContext *sc;
uint16_t msg_type;
+ GNUNET_PEERSTORE_Continuation cont;
+ void *cont_cls;
- if(NULL == sc->cont)
+ sc = h->store_head;
+ if(NULL == sc)
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this
should not happen.\n");
+ reconnect(h);
return;
- if(NULL == msg)
+ }
+ cont = sc->cont;
+ cont_cls = sc->cont_cls;
+ GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
+ GNUNET_free(sc);
+ if(NULL == msg) /* Connection error */
{
- sc->cont(sc->cont_cls, GNUNET_SYSERR);
- reconnect(sc->h);
+ if(NULL != cont)
+ cont(cont_cls, GNUNET_SYSERR);
+ reconnect(h);
return;
}
- msg_type = ntohs(msg->type);
- if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
- sc->cont(sc->cont_cls, GNUNET_OK);
- else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
- sc->cont(sc->cont_cls, GNUNET_SYSERR);
- else
+ if(NULL != cont) /* Run continuation */
{
- LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid response from `PEERSTORE'
service.\n");
- sc->cont(sc->cont_cls, GNUNET_SYSERR);
+ msg_type = ntohs(msg->type);
+ if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
+ cont(cont_cls, GNUNET_OK);
+ else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
+ cont(cont_cls, GNUNET_SYSERR);
}
}
/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ */
+void store_request_sent (void *cls)
+{
+ struct GNUNET_PEERSTORE_StoreContext *sc = cls;
+
+ sc->request_sent = GNUNET_YES;
+}
+
+/**
* Cancel a store request
*
* @param sc Store request context
*/
void
-GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
+GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
//FIXME: MQ friendly
{
- sc->cont = NULL;
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
+ "Canceling store request.\n");
+ if(GNUNET_NO == sc->request_sent)
+ {
+ if(NULL != sc->ev)
+ GNUNET_MQ_discard(sc->ev);
+ GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
+ GNUNET_free(sc);
+ }
+ else
+ { /* request already sent, will have to wait for response */
+ sc->cont = NULL;
+ }
+
}
/**
@@ -209,29 +318,28 @@
GNUNET_PEERSTORE_Continuation cont,
void *cont_cls)
{
+ struct GNUNET_MQ_Envelope *ev;
struct GNUNET_PEERSTORE_StoreContext *sc;
- struct StoreRecordMessage *srm;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
size, sub_system, GNUNET_i2s (peer), key);
- sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
- sc->cont = cont;
- sc->cont_cls = cont_cls;
- sc->h = h;
- srm = PEERSTORE_create_record_message(sub_system,
+ ev = PEERSTORE_create_record_mq_envelope(sub_system,
peer,
key,
value,
size,
expiry,
GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
- GNUNET_CLIENT_transmit_and_get_response(h->client,
- (const struct GNUNET_MessageHeader *)srm,
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &store_response_receiver,
- sc);
+ GNUNET_MQ_send(h->mq, ev);
+ GNUNET_MQ_notify_sent(ev, &store_request_sent, ev);
+ sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
+ sc->ev = ev;
+ sc->cont = cont;
+ sc->cont_cls = cont_cls;
+ sc->h = h;
+ sc->request_sent = GNUNET_NO;
+ GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
return sc;
}
Modified: gnunet/src/peerstore/peerstore_common.c
===================================================================
--- gnunet/src/peerstore/peerstore_common.c 2014-05-16 12:54:18 UTC (rev
33308)
+++ gnunet/src/peerstore/peerstore_common.c 2014-05-16 16:45:43 UTC (rev
33309)
@@ -36,7 +36,7 @@
* @param expiry absolute time after which the record expires
* @param msg_type message type to be set in header
* @return pointer to record message struct
- */
+ *
struct StoreRecordMessage *
PEERSTORE_create_record_message(const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
@@ -83,6 +83,64 @@
memcpy(dummy, value, value_size);
return srm;
+}*/
+
+/**
+ * Creates a MQ envelope for a single record
+ *
+ * @param sub_system sub system string
+ * @param peer Peer identity (can be NULL)
+ * @param key record key string (can be NULL)
+ * @param value record value BLOB (can be NULL)
+ * @param value_size record value size in bytes (set to 0 if value is NULL)
+ * @param expiry time after which the record expires
+ * @param msg_type message type to be set in header
+ * @return pointer to record message struct
+ */
+struct GNUNET_MQ_Envelope *
+PEERSTORE_create_record_mq_envelope(const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ const void *value,
+ size_t value_size,
+ struct GNUNET_TIME_Absolute expiry,
+ uint16_t msg_type)
+{
+ struct StoreRecordMessage *srm;
+ struct GNUNET_MQ_Envelope *ev;
+ size_t ss_size;
+ size_t key_size;
+ size_t msg_size;
+ void *dummy;
+
+ ss_size = strlen(sub_system) + 1;
+ if(NULL == key)
+ key_size = 0;
+ else
+ key_size = strlen(key) + 1;
+ msg_size = ss_size +
+ key_size +
+ value_size;
+ ev = GNUNET_MQ_msg_extra(srm, msg_size, msg_type);
+ srm->key_size = htons(key_size);
+ srm->expiry = expiry;
+ if(NULL == peer)
+ srm->peer_set = htons(GNUNET_NO);
+ else
+ {
+ srm->peer_set = htons(GNUNET_YES);
+ srm->peer = *peer;
+ }
+ srm->sub_system_size = htons(ss_size);
+ srm->value_size = htons(value_size);
+ dummy = &srm[1];
+ memcpy(dummy, sub_system, ss_size);
+ dummy += ss_size;
+ memcpy(dummy, key, key_size);
+ dummy += key_size;
+ memcpy(dummy, value, value_size);
+
+ return ev;
}
/**
@@ -118,7 +176,8 @@
record->peer = GNUNET_new(struct GNUNET_PeerIdentity);
memcpy(record->peer, &srm->peer, sizeof(struct GNUNET_PeerIdentity));
}
- record->expiry = srm->expiry;
+ record->expiry = GNUNET_new(struct GNUNET_TIME_Absolute);
+ *(record->expiry) = srm->expiry;
dummy = (char *)&srm[1];
if(ss_size > 0)
{
Modified: gnunet/src/peerstore/peerstore_common.h
===================================================================
--- gnunet/src/peerstore/peerstore_common.h 2014-05-16 12:54:18 UTC (rev
33308)
+++ gnunet/src/peerstore/peerstore_common.h 2014-05-16 16:45:43 UTC (rev
33309)
@@ -27,45 +27,7 @@
#include "peerstore.h"
/**
- * PEERSTORE single record
- */
-struct GNUNET_PEERSTORE_Record
-{
-
- /**
- * Responsible sub system string
- */
- char *sub_system;
-
- /**
- * Peer Identity
- */
- struct GNUNET_PeerIdentity *peer;
-
- /**
- * Record key string
- */
- char *key;
-
- /**
- * Record value BLOB
- */
- void *value;
-
- /**
- * Size of value BLOB
- */
- size_t value_size;
-
- /**
- * Expiry time of record
- */
- struct GNUNET_TIME_Absolute expiry;
-
-};
-
-/**
- * Creates a record message ready to be sent
+ * Creates a MQ envelope for a single record
*
* @param sub_system sub system string
* @param peer Peer identity (can be NULL)
@@ -76,8 +38,8 @@
* @param msg_type message type to be set in header
* @return pointer to record message struct
*/
-struct StoreRecordMessage *
-PEERSTORE_create_record_message(const char *sub_system,
+struct GNUNET_MQ_Envelope *
+PEERSTORE_create_record_mq_envelope(const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
const void *value,
Modified: gnunet/src/peerstore/test_peerstore_api.c
===================================================================
--- gnunet/src/peerstore/test_peerstore_api.c 2014-05-16 12:54:18 UTC (rev
33308)
+++ gnunet/src/peerstore/test_peerstore_api.c 2014-05-16 16:45:43 UTC (rev
33309)
@@ -58,7 +58,7 @@
"peerstore-test-key",
val,
val_size,
- GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
&store_cont,
NULL);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r33309 - in gnunet/src: include peerstore,
gnunet <=