gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] 01/02: TNG: Fix communicator test message ack handling.


From: gnunet
Subject: [gnunet] 01/02: TNG: Fix communicator test message ack handling.
Date: Sun, 30 Jul 2023 15:30:04 +0200

This is an automated email from the git hooks/post-receive script.

martin-schanzenbach pushed a commit to branch master
in repository gnunet.

commit f6dbfb2ada45aa0d8900177274ca5808c61c1319
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Sun Jul 30 15:27:19 2023 +0200

    TNG: Fix communicator test message ack handling.
---
 src/transport/gnunet-communicator-udp.c            | 131 +++++++++++----------
 src/transport/test_communicator_basic.c            |  38 +++---
 .../test_communicator_udp_backchannel_peer1.conf   |   2 +-
 .../test_communicator_udp_backchannel_peer2.conf   |   2 +-
 .../test_communicator_udp_basic_peer1.conf         |   1 +
 src/transport/transport-testing-communicator.c     |  25 +++-
 src/transport/transport-testing-communicator.h     |  10 ++
 7 files changed, 128 insertions(+), 81 deletions(-)

diff --git a/src/transport/gnunet-communicator-udp.c 
b/src/transport/gnunet-communicator-udp.c
index 75f732d6c..2b014f890 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -37,8 +37,8 @@
  * - support NAT connection reversal method (#5529)
  * - support other UDP-specific NAT traversal methods (#)
  */
-#include "gnunet_common.h"
 #include "platform.h"
+#include "gnunet_common.h"
 #include "gnunet_util_lib.h"
 #include "gnunet_protocols.h"
 #include "gnunet_signatures.h"
@@ -94,7 +94,7 @@
  */
 #define GCM_TAG_SIZE (128 / 8)
 
-#define GENERATE_AT_ONCE 2
+#define GENERATE_AT_ONCE 16
 
 /**
  * If we fall below this number of available KCNs,
@@ -105,7 +105,7 @@
  * arrive before the sender runs out. So really this
  * should ideally be based on the RTT.
  */
-#define KCN_THRESHOLD 92
+#define KCN_THRESHOLD 96
 
 /**
  * How many KCNs do we keep around *after* we hit
@@ -484,20 +484,7 @@ struct SharedSecret
    */
   int rekey_initiated;
 
-  /**
-   * ID of kce working queue task
-   */
-  struct GNUNET_SCHEDULER_Task *kce_task;
 
-  /**
-   * Is the kce_task finished?
-   */
-  int kce_task_finished;
-
-  /**
-   * When KCE finishes, send ACK if GNUNET_YES
-   */
-  int kce_send_ack_on_finish;
 };
 
 
@@ -563,6 +550,20 @@ struct SenderAddress
    */
   int sender_destroy_called;
 
+  /**
+   * ID of kce working queue task
+   */
+  struct GNUNET_SCHEDULER_Task *kce_task;
+
+  /**
+   * Is the kce_task finished?
+   */
+  int kce_task_finished;
+
+  /**
+   * When KCE finishes, send ACK if GNUNET_YES
+   */
+  int kce_send_ack_on_finish;
 };
 
 
@@ -1030,10 +1031,10 @@ secret_destroy (struct SharedSecret *ss, int withoutKce)
                          "# KIDs active",
                          GNUNET_CONTAINER_multishortmap_size (key_cache),
                          GNUNET_NO);
-  if (NULL != ss->kce_task)
+  if (NULL != ss->sender->kce_task)
   {
-    GNUNET_SCHEDULER_cancel (ss->kce_task);
-    ss->kce_task = NULL;
+    GNUNET_SCHEDULER_cancel (ss->sender->kce_task);
+    ss->sender->kce_task = NULL;
   }
   GNUNET_free (ss);
   return GNUNET_YES;
@@ -1452,7 +1453,7 @@ add_acks (struct SharedSecret *ss, int acks_to_add)
                                              1);
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Tell transport we have %u more acks!\n",
               acks_to_add);
 
@@ -1501,25 +1502,25 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity 
*pid, void *value)
 
       if (allowed <= ss->sequence_allowed)
       {
-        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                     "Ignoring ack, not giving us increased window\n.");
         return GNUNET_NO;
       }
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "New sequence allows until %u from %u. Acks available to us: 
%u. For secret %s\n",
-                  allowed,
-                  ss->sequence_allowed,
-                  receiver->acks_available,
-                  GNUNET_h2s (&ss->master));
       acks_to_add = (allowed - ss->sequence_allowed);
       GNUNET_assert (0 != acks_to_add);
       receiver->acks_available += (allowed - ss->sequence_allowed);
       ss->sequence_allowed = allowed;
       add_acks (ss, acks_to_add);
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "New sequence allows until %u (+%u). Acks available to us: 
%u. For secret %s\n",
+                  allowed,
+                  acks_to_add,
+                  receiver->acks_available,
+                  GNUNET_h2s (&ss->master));
       return GNUNET_NO;
     }
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Matching cmac not found for ack!\n");
   return GNUNET_YES;
 }
@@ -1556,7 +1557,7 @@ consider_ss_ack (struct SharedSecret *ss)
   ack.header.size = htons (sizeof(ack));
   ack.sequence_ack = htonl (ss->sequence_allowed);
   ack.cmac = ss->cmac;
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Notifying transport with UDPAck %s, sequence %u and master 
%s\n",
               GNUNET_i2s_full (&ss->sender->target),
               ss->sequence_allowed,
@@ -1572,30 +1573,40 @@ static void
 kce_generate_cb (void *cls)
 {
   struct SharedSecret *ss = cls;
-
-  ss->kce_task = NULL;
+  static uint64_t kce_last_available = 0;
+  ss->sender->kce_task = NULL;
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Precomputing %u keys for master %s\n",
               GENERATE_AT_ONCE,
               GNUNET_h2s (&(ss->master)));
   if (KCN_TARGET < ss->sender->acks_available)
+  {
+    ss->sender->kce_task = GNUNET_SCHEDULER_add_delayed (
+      WORKING_QUEUE_INTERVALL,
+      kce_generate_cb,
+      ss);
     return;
+  }
   for (int i = 0; i < GENERATE_AT_ONCE; i++)
     kce_generate (ss, ++ss->sequence_allowed);
 
+  /**
+   * As long as we loose over 30% of max acks in reschedule,
+   * We keep generating acks for this ss.
+   */
   if (KCN_TARGET > ss->sender->acks_available)
   {
-    ss->kce_task = GNUNET_SCHEDULER_add_delayed (
+    ss->sender->kce_task = GNUNET_SCHEDULER_add_delayed (
       WORKING_QUEUE_INTERVALL,
       kce_generate_cb,
       ss);
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-              "We have enough keys.\n");
-  ss->kce_task_finished = GNUNET_YES;
-  if (ss->kce_send_ack_on_finish == GNUNET_YES)
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "We have enough keys (ACKs: %u).\n", ss->sender->acks_available);
+  ss->sender->kce_task_finished = GNUNET_YES;
+  if (ss->sender->kce_send_ack_on_finish == GNUNET_YES)
     consider_ss_ack (ss);
 }
 
@@ -1660,8 +1671,9 @@ try_handle_plaintext (struct SenderAddress *sender,
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "We have %u acks available.\n",
                 ss_rekey->sender->acks_available);
-    ss_rekey->kce_send_ack_on_finish = GNUNET_NO;
-    ss_rekey->kce_task = GNUNET_SCHEDULER_add_delayed (
+    ss_rekey->sender->kce_send_ack_on_finish = GNUNET_NO;
+    // FIXME
+    ss_rekey->sender->kce_task = GNUNET_SCHEDULER_add_delayed (
       WORKING_QUEUE_INTERVALL,
       kce_generate_cb,
       ss_rekey);
@@ -1718,7 +1730,7 @@ decrypt_box (const struct UDPBox *box,
                                 sizeof(out_buf),
                                 out_buf))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed decryption.\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed decryption.\n");
     GNUNET_STATISTICS_update (stats,
                               "# Decryption failures with valid KCE",
                               1,
@@ -1727,6 +1739,7 @@ decrypt_box (const struct UDPBox *box,
     return;
   }
   kce_destroy (kce);
+  kce = NULL;
   GNUNET_STATISTICS_update (stats,
                             "# bytes decrypted with BOX",
                             sizeof(out_buf),
@@ -1739,6 +1752,18 @@ decrypt_box (const struct UDPBox *box,
               "decrypted UDPBox with kid %s\n",
               GNUNET_sh2s (&box->kid));
   try_handle_plaintext (ss->sender, out_buf, sizeof(out_buf));
+  if ((KCN_THRESHOLD > ss->sender->acks_available) &&
+      (NULL == ss->sender->kce_task) &&
+      (GNUNET_YES == ss->sender->kce_task_finished))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sender has %u ack left which is under threshold.\n",
+                ss->sender->acks_available);
+    ss->sender->kce_send_ack_on_finish = GNUNET_YES;
+    ss->sender->kce_task = GNUNET_SCHEDULER_add_now (
+      kce_generate_cb,
+      ss);
+  }
 }
 
 
@@ -1960,19 +1985,6 @@ sock_read (void *cls)
                     "Found KCE with kid %s\n",
                     GNUNET_sh2s (&box->kid));
         decrypt_box (box, (size_t) rcvd, kce);
-        if ((NULL == kce->ss->kce_task) &&
-            (GNUNET_YES == kce->ss->kce_task_finished) &&
-            (kce->ss->sender->acks_available < KCN_THRESHOLD))
-        {
-          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                      "Sender has %u ack left which is under threshold.\n",
-                      kce->ss->sender->acks_available);
-          kce->ss->kce_send_ack_on_finish = GNUNET_YES;
-          kce->ss->kce_task = GNUNET_SCHEDULER_add_delayed (
-            WORKING_QUEUE_INTERVALL,
-            kce_generate_cb,
-            kce->ss);
-        }
         continue;
       }
     }
@@ -2107,13 +2119,13 @@ sock_read (void *cls)
       sender = setup_sender (&uc->sender, (const struct sockaddr *) &sa, 
salen);
       ss->sender = sender;
       GNUNET_CONTAINER_DLL_insert (sender->ss_head, sender->ss_tail, ss);
-      if ((NULL == ss->kce_task) && (GNUNET_NO ==
-                                     ss->kce_task_finished))
+      if ((KCN_THRESHOLD > ss->sender->acks_available) &&
+          (NULL == ss->sender->kce_task) &&
+          (GNUNET_NO == ss->sender->kce_task_finished))
       {
         // TODO This task must be per sender! FIXME: This is a nice todo, but 
I do not know what must be done here to fix.
-        ss->kce_send_ack_on_finish = GNUNET_YES;
-        ss->kce_task = GNUNET_SCHEDULER_add_delayed (
-          WORKING_QUEUE_INTERVALL,
+        ss->sender->kce_send_ack_on_finish = GNUNET_YES;
+        ss->sender->kce_task = GNUNET_SCHEDULER_add_now (
           kce_generate_cb,
           ss);
       }
@@ -2600,13 +2612,14 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
                 msize,
                 receiver->acks_available);
     ss->bytes_sent += sizeof (dgram);
-    GNUNET_MQ_impl_send_continue (mq);
     receiver->acks_available--;
+    GNUNET_MQ_impl_send_continue (mq);
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "No suitable ss found, sending as KX...\n");
   send_msg_with_kx (msg, receiver);
+  GNUNET_MQ_impl_send_continue (mq);
 }
 
 
diff --git a/src/transport/test_communicator_basic.c 
b/src/transport/test_communicator_basic.c
index 975a0a837..bba8025af 100644
--- a/src/transport/test_communicator_basic.c
+++ b/src/transport/test_communicator_basic.c
@@ -91,7 +91,7 @@ static struct GNUNET_STATISTICS_GetHandle 
*rekey_stats[NUM_PEERS];
 
 #define ALLOWED_PACKET_LOSS 91
 
-#define BURST_PACKETS 15000
+#define BURST_PACKETS 5000
 
 #define TOTAL_ITERATIONS 1
 
@@ -394,7 +394,8 @@ process_statistics (void *cls,
   {
     ret = 2;
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Not enough BOX messages!\n");
+                "Not enough BOX messages! (want: %u, have %llu)\n",
+                9000, value);
     GNUNET_SCHEDULER_shutdown ();
   }
   if ((0 == strcmp ("rekey", test_name)) &&
@@ -433,11 +434,13 @@ short_test_cb (void *cls)
        tc_h);
   payload = make_payload (SHORT_MESSAGE_SIZE);
   num_sent_short[peer_nr]++;
+  if (burst_packets_short == num_sent_short[peer_nr])
+    tc_h->cont = NULL;
+  else
+    tc_h->cont = short_test;
+  tc_h->cont_cls = cls;
   GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
-                                                        (burst_packets_short ==
-                                                         
num_sent_short[peer_nr])
-                                                        ? NULL
-                                                        : &short_test,
+                                                        NULL,
                                                         cls,
                                                         payload,
                                                         SHORT_MESSAGE_SIZE);
@@ -478,11 +481,13 @@ size_test (void *cls)
   ack[peer_nr] += 10;
   payload = make_payload (ack[peer_nr]);
   num_sent_size[peer_nr]++;
+  if (ack[peer_nr] >= max_size)
+    tc_h->cont = NULL;
+  else
+    tc_h->cont = size_test;
+  tc_h->cont_cls = cls;
   GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
-                                                        (ack[peer_nr] <
-                                                         max_size)
-                                                        ? &size_test
-                                                        : NULL,
+                                                        NULL,
                                                         cls,
                                                         payload,
                                                         ack[peer_nr]);
@@ -512,11 +517,13 @@ long_test_cb (void *cls)
        (unsigned int) num_received_long[peer_nr]);
   payload = make_payload (long_message_size);
   num_sent_long[peer_nr]++;
+  if (burst_packets_long == num_sent_long[peer_nr])
+    tc_h->cont = NULL;
+  else
+    tc_h->cont = long_test;
+  tc_h->cont_cls = cls;
   GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
-                                                        (burst_packets_long ==
-                                                         
num_sent_long[peer_nr])
-                                                        ? NULL
-                                                        : &long_test,
+                                                        NULL,
                                                         cls,
                                                         payload,
                                                         long_message_size);
@@ -936,7 +943,8 @@ incoming_message_cb (
       if (long_message_size != payload_len)
       {
         LOG (GNUNET_ERROR_TYPE_WARNING,
-             "Ignoring packet with wrong length\n");
+             "Ignoring packet with wrong length (have: %lu, want: %lu)\n",
+             payload_len, long_message_size);
         return;   // Ignore
       }
       num_received_long[peer_nr]++;
diff --git a/src/transport/test_communicator_udp_backchannel_peer1.conf 
b/src/transport/test_communicator_udp_backchannel_peer1.conf
index 59e6d68e3..65f33bd6b 100644
--- a/src/transport/test_communicator_udp_backchannel_peer1.conf
+++ b/src/transport/test_communicator_udp_backchannel_peer1.conf
@@ -39,7 +39,7 @@ BINDTO = 60002
 DISABLE_V6 = YES
 
 [communicator-udp]
-#PREFIX = valgrind --leak-check=full --track-origins=yes
+# PREFIX = valgrind --leak-check=full --track-origins=yes 
--log-file=/tmp/vg_comm1
 BINDTO = 60002
 DISABLE_V6 = YES
 MAX_QUEUE_LENGTH=5000
diff --git a/src/transport/test_communicator_udp_backchannel_peer2.conf 
b/src/transport/test_communicator_udp_backchannel_peer2.conf
index 3abf7999b..9875af724 100644
--- a/src/transport/test_communicator_udp_backchannel_peer2.conf
+++ b/src/transport/test_communicator_udp_backchannel_peer2.conf
@@ -39,7 +39,7 @@ BINDTO = 60003
 DISABLE_V6 = YES
 
 [communicator-udp]
-#PREFIX = valgrind --leak-check=full --track-origins=yes
+# PREFIX = valgrind --leak-check=full --track-origins=yes 
--log-file=/tmp/vg_comm2
 BINDTO = 60003
 DISABLE_V6 = YES
 MAX_QUEUE_LENGTH=5000
diff --git a/src/transport/test_communicator_udp_basic_peer1.conf 
b/src/transport/test_communicator_udp_basic_peer1.conf
index c6ff024ee..ec8c19f62 100644
--- a/src/transport/test_communicator_udp_basic_peer1.conf
+++ b/src/transport/test_communicator_udp_basic_peer1.conf
@@ -36,3 +36,4 @@ UNIXPATH = 
$GNUNET_RUNTIME_DIR/gnunet-service-resolver_test_1.sock
 BINDTO = 60002
 DISABLE_V6 = YES
 MAX_QUEUE_LENGTH=5000
+
diff --git a/src/transport/transport-testing-communicator.c 
b/src/transport/transport-testing-communicator.c
index d0e32a544..26895e457 100644
--- a/src/transport/transport-testing-communicator.c
+++ b/src/transport/transport-testing-communicator.c
@@ -260,7 +260,7 @@ handle_communicator_backchannel (void *cls,
   }
   /* Find client providing this communicator */
   /* Finally, deliver backchannel message to communicator */
-  LOG (GNUNET_ERROR_TYPE_ERROR,
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Delivering backchannel message of type %u to %s\n",
        ntohs (msg->type),
        target_communicator);
@@ -676,8 +676,13 @@ handle_send_message_ack (void *cls,
                          const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
 {
   struct MyClient *client = cls;
+  struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+    client->tc;
+  static int mtr = 0;
+  mtr++;
+  if (tc_h->cont != NULL)
+    tc_h->cont (tc_h->cont_cls);
   GNUNET_SERVICE_client_continue (client->client);
-  // NOP
 }
 
 
@@ -1166,8 +1171,9 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
   size_t inbox_size;
   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
   struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
-
+  static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue 
*last_queue;
   tc_queue = NULL;
+
   for (tc_queue_tmp = tc_h->queue_head;
        NULL != tc_queue_tmp;
        tc_queue_tmp = tc_queue_tmp->next)
@@ -1194,13 +1200,22 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
       tc_queue = tc_queue_tmp;
     }
   }
+  if (last_queue != tc_queue)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Selected sending queue changed to %u with length %lu and MTU 
%u\n",
+                ntohl (tc_queue->qid), tc_queue->q_len, tc_queue->mtu);
   GNUNET_assert (NULL != tc_queue);
+  last_queue = tc_queue;
   // Uncomment this for alternativ 1 of backchannel functionality
   if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
     tc_queue->q_len--;
   // Until here for alternativ 1
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending message\n");
+  static int msg_count = 0;
+  msg_count++;
+  if (msg_count % 100 == 0)
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Sending %u-th (%lu-th for queue) message on queue %u\n",
+                msg_count, tc_queue->mid, ntohl (tc_queue->qid));
   inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
   env = GNUNET_MQ_msg_extra (msg,
                              inbox_size,
diff --git a/src/transport/transport-testing-communicator.h 
b/src/transport/transport-testing-communicator.h
index 1875258b4..122ac3efa 100644
--- a/src/transport/transport-testing-communicator.h
+++ b/src/transport/transport-testing-communicator.h
@@ -281,6 +281,16 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
    */
   void *cb_cls;
 
+  /**
+   * Callback to call when message ack received.
+   */
+  GNUNET_SCHEDULER_TaskCallback cont;
+
+  /**
+   * Closure for cont
+   */
+  void *cont_cls;
+
   /**
    * Backchannel supported
    */

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]