gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] 100/164: Added message flow control


From: gnunet
Subject: [gnunet] 100/164: Added message flow control
Date: Fri, 30 Jul 2021 15:32:46 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

commit 261276cc3b0c1d2a36309b6a76c4c731af244fd9
Author: Elias Summermatter <elias.summermatter@seccom.ch>
AuthorDate: Tue May 18 01:40:38 2021 +0200

    Added message flow control
---
 src/setu/gnunet-service-setu.c | 332 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 309 insertions(+), 23 deletions(-)

diff --git a/src/setu/gnunet-service-setu.c b/src/setu/gnunet-service-setu.c
index 4ff91d81e..8139ec7d7 100644
--- a/src/setu/gnunet-service-setu.c
+++ b/src/setu/gnunet-service-setu.c
@@ -473,6 +473,16 @@ struct Operation
      * Mode of operation that was chosen by the algorithm
      */
     uint8_t mode_of_operation;
+
+    /**
+     * Hashmap to keep track of the send/received messages
+     */
+    struct GNUNET_CONTAINER_MultiHashMap *message_control_flow;
+
+    /**
+    * Hashmap to keep track of the send/received inquiries (ibf keys)
+    */
+    struct GNUNET_CONTAINER_MultiHashMap *inquiries_sent;
 };
 
 
@@ -737,6 +747,28 @@ struct perf_rtt_struct
 struct perf_rtt_struct perf_rtt;
 
 
+enum MESSAGE_CONTROL_FLOW_STATE
+{
+    MESSAGE_EMPTY,
+    MESSAGE_SENT,
+    MESSAGE_EXPECTED,
+    MESSAGE_RECEIVED,
+};
+
+enum MESSAGE_TYPE
+{
+    OFFER_MESSAGE,
+    DEMAND_MESSAGE,
+    ELEMENT_MESSAGE,
+};
+
+struct message_control_flow_element
+{
+    enum MESSAGE_CONTROL_FLOW_STATE offer;
+    enum MESSAGE_CONTROL_FLOW_STATE demand;
+    enum MESSAGE_CONTROL_FLOW_STATE element;
+};
+
 /*
  * Calcuate
  */
@@ -902,13 +934,6 @@ estimate_best_mode_of_operation(uint64_t avg_element_size,
                  (RTT_MIN_FULL + 0.5) * bandwith_latency_tradeoff + \
                  SIZEOF_REQUEST_FULL;
 
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "YYYYY::::: est_set_diff_remote %u, local_set_size %u \n", 
est_set_diff_remote, local_set_size);
-
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "XXX::::: total_elements_to_send_local_send_first %u, 
total_elements_to_send_remote_send_first %u \n",
-         total_elements_to_send_local_send_first, 
total_elements_to_send_remote_send_first);
-
     /*
     * Calculate bytes for differential Sync
     */
@@ -948,12 +973,6 @@ estimate_best_mode_of_operation(uint64_t avg_element_size,
     uint64_t full_min = MIN(total_bytes_full_local_send_first, 
total_bytes_full_local_send_first);
 
     /* Decide between full and differential sync */
-    LOG (GNUNET_ERROR_TYPE_ERROR,
-         "OK::::: full_min: %u total_bytes_diff: %u 
total_bytes_full_remote_send_first:%u total_bytes_full_local_send_first:%u\n",
-         full_min,
-         total_bytes_diff,
-         total_bytes_full_remote_send_first,
-         total_bytes_full_local_send_first);
 
     if (full_min < total_bytes_diff) {
         /* Decide between sending all element first or receiving all elements 
*/
@@ -971,8 +990,8 @@ static int check_valid_phase(uint8_t allowed_phases[], 
size_t size_phases, struc
     for(uint32_t phase_ctr=0; phase_ctr < size_phases; phase_ctr++) {
         uint8_t phase = allowed_phases[phase_ctr];
         if (phase == op->phase) {
-            LOG (GNUNET_ERROR_TYPE_ERROR,
-                 "Found correct phase\n");
+            LOG (GNUNET_ERROR_TYPE_DEBUG,
+                 "Message received in valid phase\n");
             return GNUNET_YES;
         }
     }
@@ -982,6 +1001,92 @@ static int check_valid_phase(uint8_t allowed_phases[], 
size_t size_phases, struc
 }
 
 
+static int
+update_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap *hash_map,
+                            enum MESSAGE_CONTROL_FLOW_STATE new_mcfs,
+                            struct GNUNET_HashCode *hash_code,
+                            enum MESSAGE_TYPE mt)
+{
+    struct message_control_flow_element *cfe = NULL;
+    enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
+
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "%u NEW_STATE %u\n", *hash_code->bits, new_mcfs);
+
+    cfe = GNUNET_CONTAINER_multihashmap_get(hash_map, hash_code);
+    if(NULL == cfe) {
+        cfe = (struct message_control_flow_element*) 
GNUNET_malloc(sizeof(struct message_control_flow_element));
+        LOG (GNUNET_ERROR_TYPE_ERROR,
+             "%u CREATE NEW!\n", *hash_code->bits);
+    }
+
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "ID: %u OFFER: %u DEMAND: %u ELEMEMT: %u %u\n", *hash_code->bits, 
cfe->offer, cfe->demand, cfe->element);
+
+    if ( OFFER_MESSAGE == mt) {
+        mcfs = &cfe->offer;
+    } else if ( DEMAND_MESSAGE == mt ) {
+        mcfs = &cfe->demand;
+    } else if ( ELEMENT_MESSAGE == mt) {
+        mcfs = &cfe->element;
+        if(new_mcfs != MESSAGE_SENT && MESSAGE_RECEIVED != cfe->offer) {
+            LOG (GNUNET_ERROR_TYPE_ERROR, "Received an element without sent 
offer!\n");
+            return GNUNET_NO;
+        }
+        /* Check that only requested elements are received! */
+        if(new_mcfs != MESSAGE_SENT && cfe->demand != MESSAGE_SENT) {
+            LOG (GNUNET_ERROR_TYPE_ERROR, "Received an element that was not 
demanded\n");
+            return GNUNET_NO;
+        }
+    } else {
+        return GNUNET_SYSERR;
+    }
+
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "%u VALUE %u < %u \n", *hash_code->bits , new_mcfs, *mcfs);
+
+    if(new_mcfs <= *mcfs) {
+        return GNUNET_NO;
+    }
+
+    *mcfs = new_mcfs;
+
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "ID: %u OFFER: %u DEMAND: %u ELEMEMT: %u\n", *hash_code->bits, 
cfe->offer, cfe->demand, cfe->element);
+
+    GNUNET_CONTAINER_multihashmap_put(hash_map, 
hash_code,cfe,GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE);
+    return GNUNET_YES;
+}
+
+static int
+is_message_in_message_control_flow(struct GNUNET_CONTAINER_MultiHashMap 
*hash_map,
+                            struct GNUNET_HashCode *hash_code,
+                            enum MESSAGE_TYPE mt)
+{
+    struct message_control_flow_element *cfe = NULL;
+    enum MESSAGE_CONTROL_FLOW_STATE *mcfs;
+
+    cfe = GNUNET_CONTAINER_multihashmap_get(hash_map, hash_code);
+    if(NULL == cfe) {
+        cfe = (struct message_control_flow_element*) 
GNUNET_malloc(sizeof(struct message_control_flow_element));
+    }
+
+    if ( OFFER_MESSAGE == mt) {
+        mcfs = &cfe->offer;
+    } else if ( DEMAND_MESSAGE == mt ) {
+        mcfs = &cfe->demand;
+    } else if ( ELEMENT_MESSAGE == mt) {
+        mcfs = &cfe->element;
+    } else {
+        return GNUNET_SYSERR;
+    }
+    if(*mcfs != MESSAGE_EMPTY) {
+        return GNUNET_NO;
+    }
+    return GNUNET_YES;
+}
+
+
 /**
  * Iterator over hash map entries, called to
  * destroy the linked list of colliding ibf key entries.
@@ -1746,6 +1851,9 @@ handle_union_p2p_strata_estimator (void *cls,
   size_t len;
   int is_compressed;
 
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+         "START OPERATION %u\n", op->peer_site);
+
   // Setting peer site to receiving peer
   op->peer_site = 1;
 
@@ -1923,6 +2031,52 @@ send_offers_iterator (void *cls,
   if (ke->ibf_key.key_val != sec->ibf_key.key_val)
     return GNUNET_YES;
 
+
+  /* Prevent implementation from sending a offer multible times in case of 
roll switch */
+  if (GNUNET_YES !=
+        is_message_in_message_control_flow(
+                op->message_control_flow,
+                &ke->element->element_hash,
+                OFFER_MESSAGE)
+                )
+  {
+      LOG (GNUNET_ERROR_TYPE_DEBUG,
+           "Skipping already sent processed element offer!\n");
+      return GNUNET_YES;
+  }
+
+  /* Save send offer message for message control */
+  if (GNUNET_YES !=
+      update_message_control_flow(
+                op->message_control_flow,
+                MESSAGE_SENT,
+                &ke->element->element_hash,
+                OFFER_MESSAGE)
+           )
+  {
+        LOG (GNUNET_ERROR_TYPE_ERROR,
+             "Double offer message sent found!\n");
+        GNUNET_break (0);
+        fail_union_operation (op);
+        return GNUNET_NO;
+  };
+
+  /* Mark element to be expected to received */
+  if (GNUNET_YES !=
+        update_message_control_flow(
+                op->message_control_flow,
+                MESSAGE_EXPECTED,
+                &ke->element->element_hash,
+                DEMAND_MESSAGE)
+        )
+  {
+      LOG (GNUNET_ERROR_TYPE_ERROR,
+           "Double demand received found!\n");
+      GNUNET_break (0);
+      fail_union_operation (op);
+      return GNUNET_NO;
+  };
+
   perf_rtt.offer.sent += 1;
   perf_rtt.offer.sent_var_bytes += sizeof(struct GNUNET_HashCode);
 
@@ -1947,7 +2101,6 @@ send_offers_iterator (void *cls,
  * @param op union operation
  * @param ibf_key IBF key of interest
  */
-static void
 send_offers_for_key (struct Operation *op,
                      struct IBF_Key ibf_key)
 {
@@ -2103,8 +2256,6 @@ decode_and_send (struct Operation *op)
     if (1 == side)
     {
       struct IBF_Key unsalted_key;
-
-
       unsalt_key (&key,
                   op->salt_receive,
                   &unsalted_key);
@@ -2118,6 +2269,22 @@ decode_and_send (struct Operation *op)
 
       perf_rtt.inquery.sent += 1;
       perf_rtt.inquery.sent_var_bytes += sizeof(struct IBF_Key);
+
+      /** Add sent inquiries to hashmap for flow control **/
+      struct GNUNET_HashContext *hashed_key_context = 
GNUNET_CRYPTO_hash_context_start ();
+      struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) 
GNUNET_malloc(sizeof(struct GNUNET_HashCode));;
+      enum MESSAGE_CONTROL_FLOW_STATE mcfs = MESSAGE_SENT;
+      GNUNET_CRYPTO_hash_context_read (hashed_key_context,
+                                      &key,
+                                     sizeof(struct IBF_Key));
+      GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
+                                         hashed_key);
+      GNUNET_CONTAINER_multihashmap_put(op->inquiries_sent,
+                                            hashed_key,
+                                            &mcfs,
+                                            
GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
+      );
+
       /* It may be nice to merge multiple requests, but with CADET's corking 
it is not worth
        * the effort additional complexity. */
       ev = GNUNET_MQ_msg_extra (msg,
@@ -2460,6 +2627,21 @@ handle_union_p2p_elements (void *cls,
     return;
   }
 
+  if ( GNUNET_OK !=
+        update_message_control_flow(
+                op->message_control_flow,
+                MESSAGE_RECEIVED,
+                &ee->element_hash,
+                ELEMENT_MESSAGE)
+        )
+  {
+      LOG (GNUNET_ERROR_TYPE_ERROR,
+           "An element has been received more than once!\n");
+      GNUNET_break (0);
+      fail_union_operation (op);
+      return;
+  }
+
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Got element (size %u, hash %s) from peer\n",
        (unsigned int) element_size,
@@ -2699,10 +2881,25 @@ handle_union_p2p_inquiry (void *cls,
   num_keys = (ntohs (msg->header.size) - sizeof(struct InquiryMessage))
              / sizeof(struct IBF_Key);
   ibf_key = (const struct IBF_Key *) &msg[1];
+
+  /** Add received inquiries to hashmap for flow control **/
+  struct GNUNET_HashContext *hashed_key_context = 
GNUNET_CRYPTO_hash_context_start ();
+  struct GNUNET_HashCode *hashed_key = (struct GNUNET_HashCode*) 
GNUNET_malloc(sizeof(struct GNUNET_HashCode));;
+  enum MESSAGE_CONTROL_FLOW_STATE mcfs = MESSAGE_RECEIVED;
+  GNUNET_CRYPTO_hash_context_read (hashed_key_context,
+                                   &ibf_key,
+                                   sizeof(struct IBF_Key));
+  GNUNET_CRYPTO_hash_context_finish (hashed_key_context,
+                                     hashed_key);
+  GNUNET_CONTAINER_multihashmap_put(op->inquiries_sent,
+                                    hashed_key,
+                                    &mcfs,
+                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_REPLACE
+  );
+
   while (0 != num_keys--)
   {
     struct IBF_Key unsalted_key;
-
     unsalt_key (ibf_key,
                 ntohl (msg->salt),
                 &unsalted_key);
@@ -2929,8 +3126,40 @@ handle_union_p2p_demand (void *cls,
        num_hashes > 0;
        hash++, num_hashes--)
   {
-    ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
-                                            hash);
+      ee = GNUNET_CONTAINER_multihashmap_get (op->set->content->elements,
+                                              hash);
+
+      /* Save send demand message for message control */
+      if (GNUNET_YES !=
+          update_message_control_flow(
+                  op->message_control_flow,
+                  MESSAGE_RECEIVED,
+                  &ee->element_hash,
+                  DEMAND_MESSAGE)
+              )
+      {
+          LOG (GNUNET_ERROR_TYPE_ERROR,
+               "Double demand message received found!\n");
+          GNUNET_break (0);
+          fail_union_operation (op);
+          return;
+      };
+
+      /* Mark element to be expected to received */
+      if (GNUNET_YES !=
+          update_message_control_flow(
+                  op->message_control_flow,
+                  MESSAGE_SENT,
+                  &ee->element_hash,
+                  ELEMENT_MESSAGE)
+              )
+      {
+          LOG (GNUNET_ERROR_TYPE_ERROR,
+               "Double element message sent found!\n");
+          GNUNET_break (0);
+          fail_union_operation (op);
+          return;
+      };
     if (NULL == ee)
     {
       /* Demand for non-existing element. */
@@ -3076,6 +3305,55 @@ handle_union_p2p_offer (void *cls,
     ev = GNUNET_MQ_msg_header_extra (demands,
                                      sizeof(struct GNUNET_HashCode),
                                      GNUNET_MESSAGE_TYPE_SETU_P2P_DEMAND);
+      /* Save send demand message for message control */
+      if (GNUNET_YES !=
+          update_message_control_flow(
+                  op->message_control_flow,
+                  MESSAGE_SENT,
+                  hash,
+                  DEMAND_MESSAGE)
+              )
+      {
+          LOG (GNUNET_ERROR_TYPE_ERROR,
+               "Double demand message sent found!\n");
+          GNUNET_break (0);
+          fail_union_operation (op);
+          return;
+      };
+
+      /* Mark offer as received received */
+      if (GNUNET_YES !=
+          update_message_control_flow(
+                  op->message_control_flow,
+                  MESSAGE_RECEIVED,
+                  hash,
+                  OFFER_MESSAGE)
+              )
+      {
+          LOG (GNUNET_ERROR_TYPE_ERROR,
+               "Double offer message received found!\n");
+          GNUNET_break (0);
+          fail_union_operation (op);
+          return;
+      };
+
+      /* Mark element to be expected to received */
+      if (GNUNET_YES !=
+          update_message_control_flow(
+                  op->message_control_flow,
+                  MESSAGE_EXPECTED,
+                  hash,
+                  ELEMENT_MESSAGE)
+              )
+      {
+          LOG (GNUNET_ERROR_TYPE_ERROR,
+               "Element already expected!\n");
+          GNUNET_break (0);
+          fail_union_operation (op);
+          return;
+      };
+
+
     GNUNET_memcpy (&demands[1],
                    hash,
                    sizeof(struct GNUNET_HashCode));
@@ -3917,8 +4195,12 @@ handle_client_evaluate (void *cls,
   op->symmetric = msg->symmetric;
   context = GNUNET_MQ_extract_nested_mh (msg);
 
-    /* load config */
-    load_config(op);
+  /* create hashmap for message control */
+  op->message_control_flow = 
GNUNET_CONTAINER_multihashmap_create(32,GNUNET_NO);
+  op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create(32,GNUNET_NO);
+
+  /* load config */
+  load_config(op);
 
   /* Advance generation values, so that
      mutations won't interfer with the running operation. */
@@ -4109,6 +4391,10 @@ handle_client_accept (void *cls,
   op->force_delta = msg->force_delta;
   op->symmetric = msg->symmetric;
 
+  /* create hashmap for message control */
+  op->message_control_flow = 
GNUNET_CONTAINER_multihashmap_create(32,GNUNET_NO);
+  op->inquiries_sent = GNUNET_CONTAINER_multihashmap_create(32,GNUNET_NO);
+
   /* load config */
   load_config(op);
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]