gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28958 - in gnunet/src: include scalarproduct


From: gnunet
Subject: [GNUnet-SVN] r28958 - in gnunet/src: include scalarproduct
Date: Mon, 2 Sep 2013 23:09:53 +0200

Author: cfuchs
Date: 2013-09-02 23:09:53 +0200 (Mon, 02 Sep 2013)
New Revision: 28958

Modified:
   gnunet/src/include/gnunet_scalarproduct_service.h
   gnunet/src/scalarproduct/scalarproduct_api.c
Log:
rewrote API minus cancel function


Modified: gnunet/src/include/gnunet_scalarproduct_service.h
===================================================================
--- gnunet/src/include/gnunet_scalarproduct_service.h   2013-09-02 21:02:40 UTC 
(rev 28957)
+++ gnunet/src/include/gnunet_scalarproduct_service.h   2013-09-02 21:09:53 UTC 
(rev 28958)
@@ -45,7 +45,6 @@
 {
   GNUNET_SCALARPRODUCT_Status_Success = 0,
   GNUNET_SCALARPRODUCT_Status_Failure,
-  GNUNET_SCALARPRODUCT_Status_Timeout,
   GNUNET_SCALARPRODUCT_Status_InvalidResponse,
   GNUNET_SCALARPRODUCT_Status_ServiceDisconnected
 };
@@ -89,8 +88,8 @@
  * @param cont Callback function
  * @param cont_cls Closure for the callback function
  */
-struct GNUNET_SCALARPRODUCT_Handle *
-GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *h,
+struct GNUNET_SCALARPRODUCT_ComputationHandle *
+GNUNET_SCALARPRODUCT_request (const struct GNUNET_CONFIGURATION_Handle *cfg,
                               const struct GNUNET_HashCode * key,
                               const struct GNUNET_PeerIdentity *peer,
                               const int32_t * elements,
@@ -103,15 +102,15 @@
 /**
  * Used by Bob's client to cooperate with Alice, 
  * 
- * @param h handle to the master context
+ * @param h handle to our configuration
  * @param key Session key - unique to the requesting client
  * @param elements Array of elements of the vector
  * @param element_count Number of elements in the vector
  * @param cont Callback function
  * @param cont_cls Closure for the callback function
  */
-struct GNUNET_SCALARPRODUCT_Handle *
-GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *h,
+struct GNUNET_SCALARPRODUCT_ComputationHandle *
+GNUNET_SCALARPRODUCT_response (const struct GNUNET_CONFIGURATION_Handle *cfg,
                                const struct GNUNET_HashCode * key,
                                const int32_t * elements,
                                uint32_t element_count,

Modified: gnunet/src/scalarproduct/scalarproduct_api.c
===================================================================
--- gnunet/src/scalarproduct/scalarproduct_api.c        2013-09-02 21:02:40 UTC 
(rev 28957)
+++ gnunet/src/scalarproduct/scalarproduct_api.c        2013-09-02 21:09:53 UTC 
(rev 28958)
@@ -119,11 +119,11 @@
 /**
  * Head of the active sessions queue
  */
-struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
+static struct GNUNET_SCALARPRODUCT_ComputationHandle *head;
 /**
  * Tail of the active sessions queue
  */
-struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
+static struct GNUNET_SCALARPRODUCT_ComputationHandle *tail;
 
 /**************************************************************
  ***  Function Declarations                          **********
@@ -170,10 +170,7 @@
 {
   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
 
-  GNUNET_assert (qe != NULL);
-
-  if (qe->cont_status != NULL)
-    qe->cont_status (qe->cont_cls, &qe->msg->key, status);
+  qe->cont_status (qe->cont_cls, status);
 }
 
 
@@ -190,17 +187,38 @@
                         enum GNUNET_SCALARPRODUCT_ResponseStatus status)
 {
   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
+  const struct GNUNET_SCALARPRODUCT_client_response *message =
+          (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
+  gcry_mpi_t result = NULL;
 
-  GNUNET_assert (qe != NULL);
+  if (GNUNET_SCALARPRODUCT_Status_Success == status
+      && qe->cont_datum != NULL)
+    {
+      size_t product_len = ntohl(message->product_length);
+      result = gcry_mpi_new(0);
+      
+      if (0 < product_len)
+        {
+          gcry_mpi_t num;
+          size_t read = 0;
 
-  if (msg == NULL && qe->cont_datum != NULL)
-    {
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Timeout reached or session 
terminated.\n");
+          if (0 != gcry_mpi_scan (&num, GCRYMPI_FMT_USG, &msg[1], product_len, 
&read)){
+              LOG (GNUNET_ERROR_TYPE_ERROR, "Could not convert to mpi to 
value!\n");
+              gcry_mpi_release(result);
+              result = NULL;
+              status = GNUNET_SCALARPRODUCT_Status_InvalidResponse;
+            }
+          else
+            {
+              if (message->range > 0)
+                gcry_mpi_add(result, result, num);
+              else
+                gcry_mpi_sub(result, result, num);
+              gcry_mpi_release(num);
+            }
+        }
     }
-  if (qe->cont_datum != NULL)
-    {
-      qe->cont_datum (qe->cont_cls, &qe->msg->key, &qe->msg->peer, status, 
(struct GNUNET_SCALARPRODUCT_client_response *) msg);
-    }
+    qe->cont_datum (qe->cont_cls, status, result);
 }
 
 
@@ -215,76 +233,34 @@
 static void
 receive_cb (void *cls, const struct GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_SCALARPRODUCT_ComputationHandle *h = cls;
-  struct GNUNET_SCALARPRODUCT_ComputationHandle *qe;
-  int16_t was_transmitted;
-  struct GNUNET_SCALARPRODUCT_client_response *message =
-          (struct GNUNET_SCALARPRODUCT_client_response *) msg;
+  struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
+  const struct GNUNET_SCALARPRODUCT_client_response *message =
+          (const struct GNUNET_SCALARPRODUCT_client_response *) msg;
+  enum GNUNET_SCALARPRODUCT_ResponseStatus status = 
GNUNET_SCALARPRODUCT_Status_InvalidResponse;
 
-  h->in_receive = GNUNET_NO;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply from VectorProduct\n");
-
-  if (NULL == (qe = free_queue_head_entry (h)))
+  if (NULL == msg)
     {
-      /**
-       * The queue head will be NULL if the client disconnected,
-       * * In case of Alice, client disconnected after sending request, before 
receiving response
-       * * In case of Bob, client disconnected after preparing response, 
before getting request from Alice.
-       */
-      process_queue (h);
-      return;
+      LOG (GNUNET_ERROR_TYPE_WARNING, "Disconnected by Service.\n");
+      status = GNUNET_SCALARPRODUCT_Status_ServiceDisconnected;
     }
-
-  if (h->client == NULL)
+  else if ( GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT != ntohs 
(msg->type))
     {
-      // GKUKREJA : handle this correctly
-      /**
-       * The queue head will be NULL if the client disconnected,
-       * * In case of Alice, client disconnected after sending request, before 
receiving response
-       * * In case of Bob, client disconnected after preparing response, 
before getting request from Alice.
-       */
-      process_queue (h);
-      return;
+      LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid message type received\n");
     }
-
-  was_transmitted = qe->was_transmitted;
-  // Control will only come here, when the request was transmitted to service,
-  // and service responded.
-  GNUNET_assert (was_transmitted == GNUNET_YES);
-
-  if (msg == NULL)
+  else if (0 < ntohl (message->product_length) || (0 == message->range))
     {
-      LOG (GNUNET_ERROR_TYPE_WARNING, "Service responded with NULL!\n");
-      qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
-    }
-  else if ((ntohs (msg->type) != 
GNUNET_MESSAGE_TYPE_SCALARPRODUCT_SERVICE_TO_CLIENT))
-    {
-      LOG (GNUNET_ERROR_TYPE_WARNING, "Invalid Message Received\n");
-      qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_InvalidResponse);
-    }
-  else if (ntohl (message->product_length) == 0)
-    {
       // response for the responder client, successful
-      GNUNET_STATISTICS_update (h->stats,
+      GNUNET_STATISTICS_update (qe->stats,
                                 gettext_noop ("# SUC responder result messages 
received"), 1,
                                 GNUNET_NO);
 
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from service without 
product attached.\n");
-      qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
+      status = GNUNET_SCALARPRODUCT_Status_Success;
     }
-  else if (ntohl (message->product_length) > 0)
-    {
-      // response for the requester client, successful
-      GNUNET_STATISTICS_update (h->stats,
-                                gettext_noop ("# SUC requester result messages 
received"), 1,
-                                GNUNET_NO);
+  
+  if (qe->cont_datum != NULL)
+    qe->response_proc (qe, msg, status);
 
-      LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message from requester service 
for requester client.\n");
-      qe->response_proc (qe, msg, GNUNET_SCALARPRODUCT_Status_Success);
-    }
-
   GNUNET_free (qe);
-  process_queue (h);
 }
 
 
@@ -302,31 +278,31 @@
                   void *buf)
 {
   struct GNUNET_SCALARPRODUCT_ComputationHandle *qe = cls;
-  size_t msize;
   
-  if (buf == NULL)
+  if (NULL == buf)
     {
       LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to 
SCALARPRODUCT.\n");
       GNUNET_STATISTICS_update (qe->stats,
                                 gettext_noop ("# transmission request 
failures"),
                                 1, GNUNET_NO);
-      GNUNET_SCALARPRODUCT_disconnect (qe);
+      
+      // notify caller about the error, done here.
+      if (qe->cont_datum != NULL)
+        qe->response_proc (qe, NULL, GNUNET_SCALARPRODUCT_Status_Failure);
+      GNUNET_SCALARPRODUCT_cancel(cls);
       return 0;
     }
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u byte request to 
SCALARPRODUCT\n",
-       msize);
-
   memcpy (buf, qe->msg, size);
+  
   GNUNET_free (qe->msg);
-  qe->was_transmitted = GNUNET_YES;
-
+  qe->msg = NULL;
   qe->th = NULL;
 
-  GNUNET_CLIENT_receive (h->client, &receive_cb, h,
+  GNUNET_CLIENT_receive (qe->client, &receive_cb, qe,
                          GNUNET_TIME_UNIT_FOREVER_REL);
 
 #if INSANE_STATISTICS
-  GNUNET_STATISTICS_update (h->stats,
+  GNUNET_STATISTICS_update (qe->stats,
                             gettext_noop ("# bytes sent to scalarproduct"), 1,
                             GNUNET_NO);
 #endif
@@ -389,7 +365,7 @@
   
   size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * 
sizeof (int32_t);
   
-  h->cont_datum = cont;
+  h->cont_status = cont;
   h->cont_cls = cont_cls;
   h->response_proc = &process_result_message;
   h->cfg = cfg;
@@ -416,7 +392,7 @@
     {
       LOG (GNUNET_ERROR_TYPE_ERROR,
            _ ("Failed to send a message to the scalarproduct service\n"));
-      GNUNET_STATISTICS_destroy(h->GNUNET_YES);
+      GNUNET_STATISTICS_destroy(h->stats, GNUNET_YES);
       GNUNET_CLIENT_disconnect(h->client);
       GNUNET_free(h->msg);
       GNUNET_free(h);
@@ -459,7 +435,7 @@
   
   GNUNET_assert (GNUNET_SERVER_MAX_MESSAGE_SIZE >= sizeof (struct 
GNUNET_SCALARPRODUCT_client_request)
                                                    + element_count * sizeof 
(int32_t)
-                                                   + mask_length);
+                                                   + mask_bytes);
   
   h = GNUNET_new (struct GNUNET_SCALARPRODUCT_ComputationHandle);
   h->client = GNUNET_CLIENT_connect ("scalarproduct", cfg);
@@ -479,7 +455,7 @@
       return NULL;
   }
   
-  size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * 
sizeof (int32_t) + mask_length;
+  size = sizeof (struct GNUNET_SCALARPRODUCT_client_request) + element_count * 
sizeof (int32_t) + mask_bytes;
   
   h->cont_datum = cont;
   h->cont_cls = cont_cls;
@@ -492,7 +468,7 @@
   msg->header.size = htons (size);
   msg->header.type = htons (GNUNET_MESSAGE_TYPE_SCALARPRODUCT_CLIENT_TO_ALICE);
   msg->element_count = htons (element_count);
-  msg->mask_length = htons (mask_length);
+  msg->mask_length = htons (mask_bytes);
   
   vector = (int32_t*) &msg[1];
   // copy each element over to the message
@@ -501,7 +477,7 @@
 
   memcpy (&msg->peer, peer, sizeof (struct GNUNET_PeerIdentity));
   memcpy (&msg->key, key, sizeof (struct GNUNET_HashCode));
-  memcpy (&vector[element_count], mask, mask_length);
+  memcpy (&vector[element_count], mask, mask_bytes);
   
   h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, size,
                                                GNUNET_TIME_UNIT_FOREVER_REL,
@@ -511,7 +487,7 @@
     {
       LOG (GNUNET_ERROR_TYPE_ERROR,
            _ ("Failed to send a message to the scalarproduct service\n"));
-      GNUNET_STATISTICS_destroy(h->GNUNET_YES);
+      GNUNET_STATISTICS_destroy(h->stats, GNUNET_YES);
       GNUNET_CLIENT_disconnect(h->client);
       GNUNET_free(h->msg);
       GNUNET_free(h);
@@ -524,26 +500,28 @@
 /**
  * Disconnect from the scalarproduct service.
  * 
- * @param h handle to the scalarproduct
+ * @param h a computation handle to cancel
  */
 void
-GNUNET_SCALARPRODUCT_disconnect (struct GNUNET_SCALARPRODUCT_ComputationHandle 
* h)
+GNUNET_SCALARPRODUCT_cancel (struct GNUNET_SCALARPRODUCT_ComputationHandle * h)
 {
   struct GNUNET_SCALARPRODUCT_ComputationHandle * qe;
 
-  LOG (GNUNET_ERROR_TYPE_INFO,
-       "Disconnecting from VectorProduct\n");
-
   for (qe = head; head != NULL; qe = head)
     {
-      GNUNET_CONTAINER_DLL_remove (head, tail, qe);
-      if (NULL == qe->th)
-        GNUNET_CLIENT_notify_transmit_ready_cancel(qe->th);
-      GNUNET_CLIENT_disconnect (h->client);
-      GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
-      qe->response_proc (qe, NULL, 
GNUNET_SCALARPRODUCT_Status_ServiceDisconnected);
-      GNUNET_free(qe->msg);
-      GNUNET_free(qe);
+      if (qe == h)
+        {
+          GNUNET_CONTAINER_DLL_remove (head, tail, qe);
+          LOG (GNUNET_ERROR_TYPE_INFO,
+               "Disconnecting from VectorProduct\n");
+          if (NULL == qe->th)
+            GNUNET_CLIENT_notify_transmit_ready_cancel (qe->th);
+          GNUNET_CLIENT_disconnect (h->client);
+          GNUNET_STATISTICS_destroy (h->stats, GNUNET_YES);
+          GNUNET_free (qe->msg);
+          GNUNET_free (qe);
+          break;
+        }
     }
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]