gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33309 - in gnunet/src: include peerstore


From: gnunet
Subject: [GNUnet-SVN] r33309 - in gnunet/src: include peerstore
Date: Fri, 16 May 2014 18:45:43 +0200

Author: otarabai
Date: 2014-05-16 18:45:43 +0200 (Fri, 16 May 2014)
New Revision: 33309

Modified:
   gnunet/src/include/gnunet_peerstore_service.h
   gnunet/src/peerstore/gnunet-service-peerstore.c
   gnunet/src/peerstore/peerstore_api.c
   gnunet/src/peerstore/peerstore_common.c
   gnunet/src/peerstore/peerstore_common.h
   gnunet/src/peerstore/test_peerstore_api.c
Log:
peerstore API now uses MQ


Modified: gnunet/src/include/gnunet_peerstore_service.h
===================================================================
--- gnunet/src/include/gnunet_peerstore_service.h       2014-05-16 12:54:18 UTC 
(rev 33308)
+++ gnunet/src/include/gnunet_peerstore_service.h       2014-05-16 16:45:43 UTC 
(rev 33309)
@@ -48,6 +48,44 @@
 struct GNUNET_PEERSTORE_StoreContext;
 
 /**
+ * Single PEERSTORE record
+ */
+struct GNUNET_PEERSTORE_Record
+{
+
+  /**
+   * Responsible sub system string
+   */
+  char *sub_system;
+
+  /**
+   * Peer Identity
+   */
+  struct GNUNET_PeerIdentity *peer;
+
+  /**
+   * Record key string
+   */
+  char *key;
+
+  /**
+   * Record value BLOB
+   */
+  void *value;
+
+  /**
+   * Size of 'value' BLOB
+   */
+  size_t value_size;
+
+  /**
+   * Expiry time of entry
+   */
+  struct GNUNET_TIME_Absolute *expiry;
+
+};
+
+/**
  * Continuation called with a status result.
  *
  * @param cls closure

Modified: gnunet/src/peerstore/gnunet-service-peerstore.c
===================================================================
--- gnunet/src/peerstore/gnunet-service-peerstore.c     2014-05-16 12:54:18 UTC 
(rev 33308)
+++ gnunet/src/peerstore/gnunet-service-peerstore.c     2014-05-16 16:45:43 UTC 
(rev 33309)
@@ -87,7 +87,7 @@
  * @param sub_system name of the GNUnet sub system responsible
  * @param value stored value
  * @param size size of stored value
- */
+ *
 int record_iterator(void *cls,
     const char *sub_system,
     const struct GNUNET_PeerIdentity *peer,
@@ -108,7 +108,7 @@
       GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE);
   GNUNET_SERVER_transmit_context_append_message(tc, (const struct 
GNUNET_MessageHeader *)srm);
   return GNUNET_YES;
-}
+}*/
 
 /**
  * Handle an iterate request from client
@@ -116,7 +116,7 @@
  * @param cls unused
  * @param client identification of the client
  * @param message the actual message
- */
+ *
 void handle_iterate (void *cls,
     struct GNUNET_SERVER_Client *client,
     const struct GNUNET_MessageHeader *message)
@@ -147,7 +147,7 @@
   {
 
   }
-}
+}*/
 
 /**
  * Handle a store request from client
@@ -190,7 +190,7 @@
       record->key,
       record->value,
       record->value_size,
-      record->expiry))
+      *record->expiry))
   {
     response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK;
   }
@@ -220,7 +220,7 @@
 {
   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
       {&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0},
-      {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
+//      {&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
       {NULL, NULL, 0, 0}
   };
   char *database;

Modified: gnunet/src/peerstore/peerstore_api.c
===================================================================
--- gnunet/src/peerstore/peerstore_api.c        2014-05-16 12:54:18 UTC (rev 
33308)
+++ gnunet/src/peerstore/peerstore_api.c        2014-05-16 16:45:43 UTC (rev 
33309)
@@ -50,6 +50,21 @@
    */
   struct GNUNET_CLIENT_Connection *client;
 
+  /**
+   * Message queue
+   */
+  struct GNUNET_MQ_Handle *mq;
+
+  /**
+   * Head of active STORE requests.
+   */
+  struct GNUNET_PEERSTORE_StoreContext *store_head;
+
+  /**
+   * Tail of active STORE requests.
+   */
+  struct GNUNET_PEERSTORE_StoreContext *store_tail;
+
 };
 
 /**
@@ -57,13 +72,27 @@
  */
 struct GNUNET_PEERSTORE_StoreContext
 {
+  /**
+   * Kept in a DLL.
+   */
+  struct GNUNET_PEERSTORE_StoreContext *next;
 
   /**
+   * Kept in a DLL.
+   */
+  struct GNUNET_PEERSTORE_StoreContext *prev;
+
+  /**
    * Handle to the PEERSTORE service.
    */
   struct GNUNET_PEERSTORE_Handle *h;
 
   /**
+   * MQ Envelope with store request message
+   */
+  struct GNUNET_MQ_Envelope *ev;
+
+  /**
    * Continuation called with service response
    */
   GNUNET_PEERSTORE_Continuation cont;
@@ -73,9 +102,27 @@
    */
   void *cont_cls;
 
+  /**
+   * #GNUNET_YES / #GNUNET_NO
+   * if sent, cannot be canceled
+   */
+  int request_sent;
+
 };
 
 
/******************************************************************************/
+/*******************             DECLARATIONS             
*********************/
+/******************************************************************************/
+
+/**
+ * When a response for store request is received
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ * @param msg message received, NULL on timeout or fatal error
+ */
+void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg);
+
+/******************************************************************************/
 /*******************         CONNECTION FUNCTIONS         
*********************/
 
/******************************************************************************/
 
@@ -85,7 +132,7 @@
  * @param h handle to the service
  */
 static void
-reconnect (struct GNUNET_PEERSTORE_Handle *h)
+reconnect (struct GNUNET_PEERSTORE_Handle *h) //FIXME: MQ friendly
 {
 
   LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
@@ -98,6 +145,12 @@
 
 }
 
+static void
+handle_client_error (void *cls, enum GNUNET_MQ_Error error) //FIXME: implement
+{
+  //struct GNUNET_PEERSTORE_Handle *h = cls;
+}
+
 /**
  * Connect to the PEERSTORE service.
  *
@@ -106,15 +159,30 @@
 struct GNUNET_PEERSTORE_Handle *
 GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
-  struct GNUNET_CLIENT_Connection *client;
   struct GNUNET_PEERSTORE_Handle *h;
+  static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
+      {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK, 
sizeof(struct GNUNET_MessageHeader)},
+      {&handle_store_result, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL, 
sizeof(struct GNUNET_MessageHeader)},
+      GNUNET_MQ_HANDLERS_END
+  };
 
-  client = GNUNET_CLIENT_connect ("peerstore", cfg);
-  if(NULL == client)
+  h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
+  h->client = GNUNET_CLIENT_connect ("peerstore", cfg);
+  if(NULL == h->client)
+  {
+    GNUNET_free(h);
     return NULL;
-  h = GNUNET_new (struct GNUNET_PEERSTORE_Handle);
-  h->client = client;
+  }
   h->cfg = cfg;
+  h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
+      mq_handlers,
+      &handle_client_error,
+      h);
+  if(NULL == h->mq)
+  {
+    GNUNET_free(h);
+    return NULL;
+  }
   LOG(GNUNET_ERROR_TYPE_DEBUG, "New connection created\n");
   return h;
 }
@@ -128,6 +196,11 @@
 void
 GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
 {
+  if(NULL != h->mq)
+  {
+    GNUNET_MQ_destroy(h->mq);
+    h->mq = NULL;
+  }
   if (NULL != h->client)
   {
     GNUNET_CLIENT_disconnect (h->client);
@@ -139,7 +212,7 @@
 
 
 
/******************************************************************************/
-/*******************             ADD FUNCTIONS            
*********************/
+/*******************            STORE FUNCTIONS           
*********************/
 
/******************************************************************************/
 
 /**
@@ -148,41 +221,77 @@
  * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
  * @param msg message received, NULL on timeout or fatal error
  */
-void store_response_receiver (void *cls, const struct GNUNET_MessageHeader 
*msg)
+void handle_store_result (void *cls, const struct GNUNET_MessageHeader *msg) 
//FIXME: MQ friendly
 {
-  struct GNUNET_PEERSTORE_StoreContext *sc = cls;
+  struct GNUNET_PEERSTORE_Handle *h = cls;
+  struct GNUNET_PEERSTORE_StoreContext *sc;
   uint16_t msg_type;
+  GNUNET_PEERSTORE_Continuation cont;
+  void *cont_cls;
 
-  if(NULL == sc->cont)
+  sc = h->store_head;
+  if(NULL == sc)
+  {
+    GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Unexpected store response, this 
should not happen.\n");
+    reconnect(h);
     return;
-  if(NULL == msg)
+  }
+  cont = sc->cont;
+  cont_cls = sc->cont_cls;
+  GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
+  GNUNET_free(sc);
+  if(NULL == msg) /* Connection error */
   {
-    sc->cont(sc->cont_cls, GNUNET_SYSERR);
-    reconnect(sc->h);
+    if(NULL != cont)
+      cont(cont_cls, GNUNET_SYSERR);
+    reconnect(h);
     return;
   }
-  msg_type = ntohs(msg->type);
-  if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
-    sc->cont(sc->cont_cls, GNUNET_OK);
-  else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
-    sc->cont(sc->cont_cls, GNUNET_SYSERR);
-  else
+  if(NULL != cont) /* Run continuation */
   {
-    LOG(GNUNET_ERROR_TYPE_ERROR, "Invalid response from `PEERSTORE' 
service.\n");
-    sc->cont(sc->cont_cls, GNUNET_SYSERR);
+    msg_type = ntohs(msg->type);
+    if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK == msg_type)
+      cont(cont_cls, GNUNET_OK);
+    else if(GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL == msg_type)
+      cont(cont_cls, GNUNET_SYSERR);
   }
 
 }
 
 /**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ */
+void store_request_sent (void *cls)
+{
+  struct GNUNET_PEERSTORE_StoreContext *sc = cls;
+
+  sc->request_sent = GNUNET_YES;
+}
+
+/**
  * Cancel a store request
  *
  * @param sc Store request context
  */
 void
-GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc)
+GNUNET_PEERSTORE_store_cancel (struct GNUNET_PEERSTORE_StoreContext *sc) 
//FIXME: MQ friendly
 {
-  sc->cont = NULL;
+  GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
+          "Canceling store request.\n");
+  if(GNUNET_NO == sc->request_sent)
+  {
+    if(NULL != sc->ev)
+      GNUNET_MQ_discard(sc->ev);
+    GNUNET_CONTAINER_DLL_remove(sc->h->store_head, sc->h->store_tail, sc);
+    GNUNET_free(sc);
+  }
+  else
+  { /* request already sent, will have to wait for response */
+    sc->cont = NULL;
+  }
+
 }
 
 /**
@@ -209,29 +318,28 @@
     GNUNET_PEERSTORE_Continuation cont,
     void *cont_cls)
 {
+  struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_PEERSTORE_StoreContext *sc;
-  struct StoreRecordMessage *srm;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Storing value (size: %lu) for subsytem `%s', peer `%s', key `%s'\n",
       size, sub_system, GNUNET_i2s (peer), key);
-  sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
-  sc->cont = cont;
-  sc->cont_cls = cont_cls;
-  sc->h = h;
-  srm = PEERSTORE_create_record_message(sub_system,
+  ev = PEERSTORE_create_record_mq_envelope(sub_system,
       peer,
       key,
       value,
       size,
       expiry,
       GNUNET_MESSAGE_TYPE_PEERSTORE_STORE);
-  GNUNET_CLIENT_transmit_and_get_response(h->client,
-      (const struct GNUNET_MessageHeader *)srm,
-      GNUNET_TIME_UNIT_FOREVER_REL,
-      GNUNET_YES,
-      &store_response_receiver,
-      sc);
+  GNUNET_MQ_send(h->mq, ev);
+  GNUNET_MQ_notify_sent(ev, &store_request_sent, ev);
+  sc = GNUNET_new(struct GNUNET_PEERSTORE_StoreContext);
+  sc->ev = ev;
+  sc->cont = cont;
+  sc->cont_cls = cont_cls;
+  sc->h = h;
+  sc->request_sent = GNUNET_NO;
+  GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
   return sc;
 
 }

Modified: gnunet/src/peerstore/peerstore_common.c
===================================================================
--- gnunet/src/peerstore/peerstore_common.c     2014-05-16 12:54:18 UTC (rev 
33308)
+++ gnunet/src/peerstore/peerstore_common.c     2014-05-16 16:45:43 UTC (rev 
33309)
@@ -36,7 +36,7 @@
  * @param expiry absolute time after which the record expires
  * @param msg_type message type to be set in header
  * @return pointer to record message struct
- */
+ *
 struct StoreRecordMessage *
 PEERSTORE_create_record_message(const char *sub_system,
     const struct GNUNET_PeerIdentity *peer,
@@ -83,6 +83,64 @@
   memcpy(dummy, value, value_size);
   return srm;
 
+}*/
+
+/**
+ * Creates a MQ envelope for a single record
+ *
+ * @param sub_system sub system string
+ * @param peer Peer identity (can be NULL)
+ * @param key record key string (can be NULL)
+ * @param value record value BLOB (can be NULL)
+ * @param value_size record value size in bytes (set to 0 if value is NULL)
+ * @param expiry time after which the record expires
+ * @param msg_type message type to be set in header
+ * @return pointer to record message struct
+ */
+struct GNUNET_MQ_Envelope *
+PEERSTORE_create_record_mq_envelope(const char *sub_system,
+    const struct GNUNET_PeerIdentity *peer,
+    const char *key,
+    const void *value,
+    size_t value_size,
+    struct GNUNET_TIME_Absolute expiry,
+    uint16_t msg_type)
+{
+  struct StoreRecordMessage *srm;
+  struct GNUNET_MQ_Envelope *ev;
+  size_t ss_size;
+  size_t key_size;
+  size_t msg_size;
+  void *dummy;
+
+  ss_size = strlen(sub_system) + 1;
+  if(NULL == key)
+    key_size = 0;
+  else
+    key_size = strlen(key) + 1;
+  msg_size = ss_size +
+      key_size +
+      value_size;
+  ev = GNUNET_MQ_msg_extra(srm, msg_size, msg_type);
+  srm->key_size = htons(key_size);
+  srm->expiry = expiry;
+  if(NULL == peer)
+    srm->peer_set = htons(GNUNET_NO);
+  else
+  {
+    srm->peer_set = htons(GNUNET_YES);
+    srm->peer = *peer;
+  }
+  srm->sub_system_size = htons(ss_size);
+  srm->value_size = htons(value_size);
+  dummy = &srm[1];
+  memcpy(dummy, sub_system, ss_size);
+  dummy += ss_size;
+  memcpy(dummy, key, key_size);
+  dummy += key_size;
+  memcpy(dummy, value, value_size);
+
+  return ev;
 }
 
 /**
@@ -118,7 +176,8 @@
     record->peer = GNUNET_new(struct GNUNET_PeerIdentity);
     memcpy(record->peer, &srm->peer, sizeof(struct GNUNET_PeerIdentity));
   }
-  record->expiry = srm->expiry;
+  record->expiry = GNUNET_new(struct GNUNET_TIME_Absolute);
+  *(record->expiry) = srm->expiry;
   dummy = (char *)&srm[1];
   if(ss_size > 0)
   {

Modified: gnunet/src/peerstore/peerstore_common.h
===================================================================
--- gnunet/src/peerstore/peerstore_common.h     2014-05-16 12:54:18 UTC (rev 
33308)
+++ gnunet/src/peerstore/peerstore_common.h     2014-05-16 16:45:43 UTC (rev 
33309)
@@ -27,45 +27,7 @@
 #include "peerstore.h"
 
 /**
- * PEERSTORE single record
- */
-struct GNUNET_PEERSTORE_Record
-{
-
-  /**
-   * Responsible sub system string
-   */
-  char *sub_system;
-
-  /**
-   * Peer Identity
-   */
-  struct GNUNET_PeerIdentity *peer;
-
-  /**
-   * Record key string
-   */
-  char *key;
-
-  /**
-   * Record value BLOB
-   */
-  void *value;
-
-  /**
-   * Size of value BLOB
-   */
-  size_t value_size;
-
-  /**
-   * Expiry time of record
-   */
-  struct GNUNET_TIME_Absolute expiry;
-
-};
-
-/**
- * Creates a record message ready to be sent
+ * Creates a MQ envelope for a single record
  *
  * @param sub_system sub system string
  * @param peer Peer identity (can be NULL)
@@ -76,8 +38,8 @@
  * @param msg_type message type to be set in header
  * @return pointer to record message struct
  */
-struct StoreRecordMessage *
-PEERSTORE_create_record_message(const char *sub_system,
+struct GNUNET_MQ_Envelope *
+PEERSTORE_create_record_mq_envelope(const char *sub_system,
     const struct GNUNET_PeerIdentity *peer,
     const char *key,
     const void *value,

Modified: gnunet/src/peerstore/test_peerstore_api.c
===================================================================
--- gnunet/src/peerstore/test_peerstore_api.c   2014-05-16 12:54:18 UTC (rev 
33308)
+++ gnunet/src/peerstore/test_peerstore_api.c   2014-05-16 16:45:43 UTC (rev 
33309)
@@ -58,7 +58,7 @@
       "peerstore-test-key",
       val,
       val_size,
-      GNUNET_TIME_UNIT_FOREVER_REL,
+      GNUNET_TIME_UNIT_FOREVER_ABS,
       &store_cont,
       NULL);
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]