[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] 01/02: TNG: Fix communicator test message ack handling.
From: |
gnunet |
Subject: |
[gnunet] 01/02: TNG: Fix communicator test message ack handling. |
Date: |
Sun, 30 Jul 2023 15:30:04 +0200 |
This is an automated email from the git hooks/post-receive script.
martin-schanzenbach pushed a commit to branch master
in repository gnunet.
commit f6dbfb2ada45aa0d8900177274ca5808c61c1319
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Sun Jul 30 15:27:19 2023 +0200
TNG: Fix communicator test message ack handling.
---
src/transport/gnunet-communicator-udp.c | 131 +++++++++++----------
src/transport/test_communicator_basic.c | 38 +++---
.../test_communicator_udp_backchannel_peer1.conf | 2 +-
.../test_communicator_udp_backchannel_peer2.conf | 2 +-
.../test_communicator_udp_basic_peer1.conf | 1 +
src/transport/transport-testing-communicator.c | 25 +++-
src/transport/transport-testing-communicator.h | 10 ++
7 files changed, 128 insertions(+), 81 deletions(-)
diff --git a/src/transport/gnunet-communicator-udp.c
b/src/transport/gnunet-communicator-udp.c
index 75f732d6c..2b014f890 100644
--- a/src/transport/gnunet-communicator-udp.c
+++ b/src/transport/gnunet-communicator-udp.c
@@ -37,8 +37,8 @@
* - support NAT connection reversal method (#5529)
* - support other UDP-specific NAT traversal methods (#)
*/
-#include "gnunet_common.h"
#include "platform.h"
+#include "gnunet_common.h"
#include "gnunet_util_lib.h"
#include "gnunet_protocols.h"
#include "gnunet_signatures.h"
@@ -94,7 +94,7 @@
*/
#define GCM_TAG_SIZE (128 / 8)
-#define GENERATE_AT_ONCE 2
+#define GENERATE_AT_ONCE 16
/**
* If we fall below this number of available KCNs,
@@ -105,7 +105,7 @@
* arrive before the sender runs out. So really this
* should ideally be based on the RTT.
*/
-#define KCN_THRESHOLD 92
+#define KCN_THRESHOLD 96
/**
* How many KCNs do we keep around *after* we hit
@@ -484,20 +484,7 @@ struct SharedSecret
*/
int rekey_initiated;
- /**
- * ID of kce working queue task
- */
- struct GNUNET_SCHEDULER_Task *kce_task;
- /**
- * Is the kce_task finished?
- */
- int kce_task_finished;
-
- /**
- * When KCE finishes, send ACK if GNUNET_YES
- */
- int kce_send_ack_on_finish;
};
@@ -563,6 +550,20 @@ struct SenderAddress
*/
int sender_destroy_called;
+ /**
+ * ID of kce working queue task
+ */
+ struct GNUNET_SCHEDULER_Task *kce_task;
+
+ /**
+ * Is the kce_task finished?
+ */
+ int kce_task_finished;
+
+ /**
+ * When KCE finishes, send ACK if GNUNET_YES
+ */
+ int kce_send_ack_on_finish;
};
@@ -1030,10 +1031,10 @@ secret_destroy (struct SharedSecret *ss, int withoutKce)
"# KIDs active",
GNUNET_CONTAINER_multishortmap_size (key_cache),
GNUNET_NO);
- if (NULL != ss->kce_task)
+ if (NULL != ss->sender->kce_task)
{
- GNUNET_SCHEDULER_cancel (ss->kce_task);
- ss->kce_task = NULL;
+ GNUNET_SCHEDULER_cancel (ss->sender->kce_task);
+ ss->sender->kce_task = NULL;
}
GNUNET_free (ss);
return GNUNET_YES;
@@ -1452,7 +1453,7 @@ add_acks (struct SharedSecret *ss, int acks_to_add)
1);
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Tell transport we have %u more acks!\n",
acks_to_add);
@@ -1501,25 +1502,25 @@ handle_ack (void *cls, const struct GNUNET_PeerIdentity
*pid, void *value)
if (allowed <= ss->sequence_allowed)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Ignoring ack, not giving us increased window\n.");
return GNUNET_NO;
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "New sequence allows until %u from %u. Acks available to us:
%u. For secret %s\n",
- allowed,
- ss->sequence_allowed,
- receiver->acks_available,
- GNUNET_h2s (&ss->master));
acks_to_add = (allowed - ss->sequence_allowed);
GNUNET_assert (0 != acks_to_add);
receiver->acks_available += (allowed - ss->sequence_allowed);
ss->sequence_allowed = allowed;
add_acks (ss, acks_to_add);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "New sequence allows until %u (+%u). Acks available to us:
%u. For secret %s\n",
+ allowed,
+ acks_to_add,
+ receiver->acks_available,
+ GNUNET_h2s (&ss->master));
return GNUNET_NO;
}
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Matching cmac not found for ack!\n");
return GNUNET_YES;
}
@@ -1556,7 +1557,7 @@ consider_ss_ack (struct SharedSecret *ss)
ack.header.size = htons (sizeof(ack));
ack.sequence_ack = htonl (ss->sequence_allowed);
ack.cmac = ss->cmac;
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Notifying transport with UDPAck %s, sequence %u and master
%s\n",
GNUNET_i2s_full (&ss->sender->target),
ss->sequence_allowed,
@@ -1572,30 +1573,40 @@ static void
kce_generate_cb (void *cls)
{
struct SharedSecret *ss = cls;
-
- ss->kce_task = NULL;
+ static uint64_t kce_last_available = 0;
+ ss->sender->kce_task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Precomputing %u keys for master %s\n",
GENERATE_AT_ONCE,
GNUNET_h2s (&(ss->master)));
if (KCN_TARGET < ss->sender->acks_available)
+ {
+ ss->sender->kce_task = GNUNET_SCHEDULER_add_delayed (
+ WORKING_QUEUE_INTERVALL,
+ kce_generate_cb,
+ ss);
return;
+ }
for (int i = 0; i < GENERATE_AT_ONCE; i++)
kce_generate (ss, ++ss->sequence_allowed);
+ /**
+ * As long as we loose over 30% of max acks in reschedule,
+ * We keep generating acks for this ss.
+ */
if (KCN_TARGET > ss->sender->acks_available)
{
- ss->kce_task = GNUNET_SCHEDULER_add_delayed (
+ ss->sender->kce_task = GNUNET_SCHEDULER_add_delayed (
WORKING_QUEUE_INTERVALL,
kce_generate_cb,
ss);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "We have enough keys.\n");
- ss->kce_task_finished = GNUNET_YES;
- if (ss->kce_send_ack_on_finish == GNUNET_YES)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "We have enough keys (ACKs: %u).\n", ss->sender->acks_available);
+ ss->sender->kce_task_finished = GNUNET_YES;
+ if (ss->sender->kce_send_ack_on_finish == GNUNET_YES)
consider_ss_ack (ss);
}
@@ -1660,8 +1671,9 @@ try_handle_plaintext (struct SenderAddress *sender,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"We have %u acks available.\n",
ss_rekey->sender->acks_available);
- ss_rekey->kce_send_ack_on_finish = GNUNET_NO;
- ss_rekey->kce_task = GNUNET_SCHEDULER_add_delayed (
+ ss_rekey->sender->kce_send_ack_on_finish = GNUNET_NO;
+ // FIXME
+ ss_rekey->sender->kce_task = GNUNET_SCHEDULER_add_delayed (
WORKING_QUEUE_INTERVALL,
kce_generate_cb,
ss_rekey);
@@ -1718,7 +1730,7 @@ decrypt_box (const struct UDPBox *box,
sizeof(out_buf),
out_buf))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed decryption.\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Failed decryption.\n");
GNUNET_STATISTICS_update (stats,
"# Decryption failures with valid KCE",
1,
@@ -1727,6 +1739,7 @@ decrypt_box (const struct UDPBox *box,
return;
}
kce_destroy (kce);
+ kce = NULL;
GNUNET_STATISTICS_update (stats,
"# bytes decrypted with BOX",
sizeof(out_buf),
@@ -1739,6 +1752,18 @@ decrypt_box (const struct UDPBox *box,
"decrypted UDPBox with kid %s\n",
GNUNET_sh2s (&box->kid));
try_handle_plaintext (ss->sender, out_buf, sizeof(out_buf));
+ if ((KCN_THRESHOLD > ss->sender->acks_available) &&
+ (NULL == ss->sender->kce_task) &&
+ (GNUNET_YES == ss->sender->kce_task_finished))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sender has %u ack left which is under threshold.\n",
+ ss->sender->acks_available);
+ ss->sender->kce_send_ack_on_finish = GNUNET_YES;
+ ss->sender->kce_task = GNUNET_SCHEDULER_add_now (
+ kce_generate_cb,
+ ss);
+ }
}
@@ -1960,19 +1985,6 @@ sock_read (void *cls)
"Found KCE with kid %s\n",
GNUNET_sh2s (&box->kid));
decrypt_box (box, (size_t) rcvd, kce);
- if ((NULL == kce->ss->kce_task) &&
- (GNUNET_YES == kce->ss->kce_task_finished) &&
- (kce->ss->sender->acks_available < KCN_THRESHOLD))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Sender has %u ack left which is under threshold.\n",
- kce->ss->sender->acks_available);
- kce->ss->kce_send_ack_on_finish = GNUNET_YES;
- kce->ss->kce_task = GNUNET_SCHEDULER_add_delayed (
- WORKING_QUEUE_INTERVALL,
- kce_generate_cb,
- kce->ss);
- }
continue;
}
}
@@ -2107,13 +2119,13 @@ sock_read (void *cls)
sender = setup_sender (&uc->sender, (const struct sockaddr *) &sa,
salen);
ss->sender = sender;
GNUNET_CONTAINER_DLL_insert (sender->ss_head, sender->ss_tail, ss);
- if ((NULL == ss->kce_task) && (GNUNET_NO ==
- ss->kce_task_finished))
+ if ((KCN_THRESHOLD > ss->sender->acks_available) &&
+ (NULL == ss->sender->kce_task) &&
+ (GNUNET_NO == ss->sender->kce_task_finished))
{
// TODO This task must be per sender! FIXME: This is a nice todo, but
I do not know what must be done here to fix.
- ss->kce_send_ack_on_finish = GNUNET_YES;
- ss->kce_task = GNUNET_SCHEDULER_add_delayed (
- WORKING_QUEUE_INTERVALL,
+ ss->sender->kce_send_ack_on_finish = GNUNET_YES;
+ ss->sender->kce_task = GNUNET_SCHEDULER_add_now (
kce_generate_cb,
ss);
}
@@ -2600,13 +2612,14 @@ mq_send_d (struct GNUNET_MQ_Handle *mq,
msize,
receiver->acks_available);
ss->bytes_sent += sizeof (dgram);
- GNUNET_MQ_impl_send_continue (mq);
receiver->acks_available--;
+ GNUNET_MQ_impl_send_continue (mq);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"No suitable ss found, sending as KX...\n");
send_msg_with_kx (msg, receiver);
+ GNUNET_MQ_impl_send_continue (mq);
}
diff --git a/src/transport/test_communicator_basic.c
b/src/transport/test_communicator_basic.c
index 975a0a837..bba8025af 100644
--- a/src/transport/test_communicator_basic.c
+++ b/src/transport/test_communicator_basic.c
@@ -91,7 +91,7 @@ static struct GNUNET_STATISTICS_GetHandle
*rekey_stats[NUM_PEERS];
#define ALLOWED_PACKET_LOSS 91
-#define BURST_PACKETS 15000
+#define BURST_PACKETS 5000
#define TOTAL_ITERATIONS 1
@@ -394,7 +394,8 @@ process_statistics (void *cls,
{
ret = 2;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Not enough BOX messages!\n");
+ "Not enough BOX messages! (want: %u, have %llu)\n",
+ 9000, value);
GNUNET_SCHEDULER_shutdown ();
}
if ((0 == strcmp ("rekey", test_name)) &&
@@ -433,11 +434,13 @@ short_test_cb (void *cls)
tc_h);
payload = make_payload (SHORT_MESSAGE_SIZE);
num_sent_short[peer_nr]++;
+ if (burst_packets_short == num_sent_short[peer_nr])
+ tc_h->cont = NULL;
+ else
+ tc_h->cont = short_test;
+ tc_h->cont_cls = cls;
GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
- (burst_packets_short ==
-
num_sent_short[peer_nr])
- ? NULL
- : &short_test,
+ NULL,
cls,
payload,
SHORT_MESSAGE_SIZE);
@@ -478,11 +481,13 @@ size_test (void *cls)
ack[peer_nr] += 10;
payload = make_payload (ack[peer_nr]);
num_sent_size[peer_nr]++;
+ if (ack[peer_nr] >= max_size)
+ tc_h->cont = NULL;
+ else
+ tc_h->cont = size_test;
+ tc_h->cont_cls = cls;
GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
- (ack[peer_nr] <
- max_size)
- ? &size_test
- : NULL,
+ NULL,
cls,
payload,
ack[peer_nr]);
@@ -512,11 +517,13 @@ long_test_cb (void *cls)
(unsigned int) num_received_long[peer_nr]);
payload = make_payload (long_message_size);
num_sent_long[peer_nr]++;
+ if (burst_packets_long == num_sent_long[peer_nr])
+ tc_h->cont = NULL;
+ else
+ tc_h->cont = long_test;
+ tc_h->cont_cls = cls;
GNUNET_TRANSPORT_TESTING_transport_communicator_send (tc_h,
- (burst_packets_long ==
-
num_sent_long[peer_nr])
- ? NULL
- : &long_test,
+ NULL,
cls,
payload,
long_message_size);
@@ -936,7 +943,8 @@ incoming_message_cb (
if (long_message_size != payload_len)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
- "Ignoring packet with wrong length\n");
+ "Ignoring packet with wrong length (have: %lu, want: %lu)\n",
+ payload_len, long_message_size);
return; // Ignore
}
num_received_long[peer_nr]++;
diff --git a/src/transport/test_communicator_udp_backchannel_peer1.conf
b/src/transport/test_communicator_udp_backchannel_peer1.conf
index 59e6d68e3..65f33bd6b 100644
--- a/src/transport/test_communicator_udp_backchannel_peer1.conf
+++ b/src/transport/test_communicator_udp_backchannel_peer1.conf
@@ -39,7 +39,7 @@ BINDTO = 60002
DISABLE_V6 = YES
[communicator-udp]
-#PREFIX = valgrind --leak-check=full --track-origins=yes
+# PREFIX = valgrind --leak-check=full --track-origins=yes
--log-file=/tmp/vg_comm1
BINDTO = 60002
DISABLE_V6 = YES
MAX_QUEUE_LENGTH=5000
diff --git a/src/transport/test_communicator_udp_backchannel_peer2.conf
b/src/transport/test_communicator_udp_backchannel_peer2.conf
index 3abf7999b..9875af724 100644
--- a/src/transport/test_communicator_udp_backchannel_peer2.conf
+++ b/src/transport/test_communicator_udp_backchannel_peer2.conf
@@ -39,7 +39,7 @@ BINDTO = 60003
DISABLE_V6 = YES
[communicator-udp]
-#PREFIX = valgrind --leak-check=full --track-origins=yes
+# PREFIX = valgrind --leak-check=full --track-origins=yes
--log-file=/tmp/vg_comm2
BINDTO = 60003
DISABLE_V6 = YES
MAX_QUEUE_LENGTH=5000
diff --git a/src/transport/test_communicator_udp_basic_peer1.conf
b/src/transport/test_communicator_udp_basic_peer1.conf
index c6ff024ee..ec8c19f62 100644
--- a/src/transport/test_communicator_udp_basic_peer1.conf
+++ b/src/transport/test_communicator_udp_basic_peer1.conf
@@ -36,3 +36,4 @@ UNIXPATH =
$GNUNET_RUNTIME_DIR/gnunet-service-resolver_test_1.sock
BINDTO = 60002
DISABLE_V6 = YES
MAX_QUEUE_LENGTH=5000
+
diff --git a/src/transport/transport-testing-communicator.c
b/src/transport/transport-testing-communicator.c
index d0e32a544..26895e457 100644
--- a/src/transport/transport-testing-communicator.c
+++ b/src/transport/transport-testing-communicator.c
@@ -260,7 +260,7 @@ handle_communicator_backchannel (void *cls,
}
/* Find client providing this communicator */
/* Finally, deliver backchannel message to communicator */
- LOG (GNUNET_ERROR_TYPE_ERROR,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"Delivering backchannel message of type %u to %s\n",
ntohs (msg->type),
target_communicator);
@@ -676,8 +676,13 @@ handle_send_message_ack (void *cls,
const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
{
struct MyClient *client = cls;
+ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle *tc_h =
+ client->tc;
+ static int mtr = 0;
+ mtr++;
+ if (tc_h->cont != NULL)
+ tc_h->cont (tc_h->cont_cls);
GNUNET_SERVICE_client_continue (client->client);
- // NOP
}
@@ -1166,8 +1171,9 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
size_t inbox_size;
struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue;
struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue *tc_queue_tmp;
-
+ static struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorQueue
*last_queue;
tc_queue = NULL;
+
for (tc_queue_tmp = tc_h->queue_head;
NULL != tc_queue_tmp;
tc_queue_tmp = tc_queue_tmp->next)
@@ -1194,13 +1200,22 @@ GNUNET_TRANSPORT_TESTING_transport_communicator_send
tc_queue = tc_queue_tmp;
}
}
+ if (last_queue != tc_queue)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Selected sending queue changed to %u with length %lu and MTU
%u\n",
+ ntohl (tc_queue->qid), tc_queue->q_len, tc_queue->mtu);
GNUNET_assert (NULL != tc_queue);
+ last_queue = tc_queue;
// Uncomment this for alternativ 1 of backchannel functionality
if (tc_queue->q_len != GNUNET_TRANSPORT_QUEUE_LENGTH_UNLIMITED)
tc_queue->q_len--;
// Until here for alternativ 1
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Sending message\n");
+ static int msg_count = 0;
+ msg_count++;
+ if (msg_count % 100 == 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending %u-th (%lu-th for queue) message on queue %u\n",
+ msg_count, tc_queue->mid, ntohl (tc_queue->qid));
inbox_size = sizeof (struct GNUNET_MessageHeader) + payload_size;
env = GNUNET_MQ_msg_extra (msg,
inbox_size,
diff --git a/src/transport/transport-testing-communicator.h
b/src/transport/transport-testing-communicator.h
index 1875258b4..122ac3efa 100644
--- a/src/transport/transport-testing-communicator.h
+++ b/src/transport/transport-testing-communicator.h
@@ -281,6 +281,16 @@ struct GNUNET_TRANSPORT_TESTING_TransportCommunicatorHandle
*/
void *cb_cls;
+ /**
+ * Callback to call when message ack received.
+ */
+ GNUNET_SCHEDULER_TaskCallback cont;
+
+ /**
+ * Closure for cont
+ */
+ void *cont_cls;
+
/**
* Backchannel supported
*/
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.