[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] 01/03: Transport: Added cleanup task to remove QueueEntry we go
From: |
gnunet |
Subject: |
[gnunet] 01/03: Transport: Added cleanup task to remove QueueEntry we got no ACK for. |
Date: |
Wed, 20 Dec 2023 09:04:14 +0100 |
This is an automated email from the git hooks/post-receive script.
t3sserakt pushed a commit to branch master
in repository gnunet.
commit 36a9952f00f7b9316d41ce2f970b0a46f3f2fb51
Author: t3sserakt <t3ss@posteo.de>
AuthorDate: Wed Dec 20 08:51:23 2023 +0100
Transport: Added cleanup task to remove QueueEntry we got no ACK for.
---
src/service/transport/gnunet-service-transport.c | 208 +++++++++++++++--------
1 file changed, 140 insertions(+), 68 deletions(-)
diff --git a/src/service/transport/gnunet-service-transport.c
b/src/service/transport/gnunet-service-transport.c
index d0d605465..b722e51d9 100644
--- a/src/service/transport/gnunet-service-transport.c
+++ b/src/service/transport/gnunet-service-transport.c
@@ -318,6 +318,12 @@
*/
#define QUEUE_LENGTH_LIMIT 32
+/**
+ *
+ */
+#define QUEUE_ENTRY_TIMEOUT \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
GNUNET_NETWORK_STRUCT_BEGIN
@@ -1820,6 +1826,11 @@ struct QueueEntry
* Message ID used for this message with the queue used for transmission.
*/
uint64_t mid;
+
+ /**
+ * Timestamp this QueueEntry was created.
+ */
+ struct GNUNET_TIME_Absolute creation_timestamp;
};
@@ -2546,6 +2557,11 @@ struct TransportClient
*/
unsigned int total_queue_length;
+ /**
+ * Task to check for timed out QueueEntry.
+ */
+ struct GNUNET_SCHEDULER_Task *free_queue_entry_task;
+
/**
* Characteristics of this communicator.
*/
@@ -4025,6 +4041,8 @@ client_disconnect_cb (void *cls,
struct Queue *q;
struct AddressListEntry *ale;
+ if (NULL != tc->details.communicator.free_queue_entry_task)
+ GNUNET_SCHEDULER_cancel
(tc->details.communicator.free_queue_entry_task);
while (NULL != (q = tc->details.communicator.queue_head))
free_queue (q);
while (NULL != (ale = tc->details.communicator.addr_head))
@@ -4482,6 +4500,38 @@ sign_ephemeral (struct DistanceVector *dv)
}
+static void
+free_queue_entry (struct QueueEntry *qe,
+ struct TransportClient *tc);
+
+
+static void
+free_timedout_queue_entry (void *cls)
+{
+ struct TransportClient *tc = cls;
+ struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
+
+ for (struct Queue *queue = tc->details.communicator.queue_head; NULL !=
queue;
+ queue = queue->next_client)
+ {
+ for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
+ qep = qep->next)
+ {
+ struct GNUNET_TIME_Relative diff = GNUNET_TIME_absolute_get_difference
(qep->creation_timestamp, now);
+ if (GNUNET_TIME_relative_cmp (QUEUE_ENTRY_TIMEOUT, < , diff))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Freeing timed out QueueEntry with MID %" PRIu64
+ " and QID %u\n",
+ qep->mid,
+ queue->qid);
+ free_queue_entry(qep, tc);
+ }
+ }
+ }
+}
+
+
/**
* Send the message @a payload on @a queue.
*
@@ -4522,6 +4572,7 @@ queue_send_msg (struct Queue *queue,
struct QueueEntry *qe;
qe = GNUNET_new (struct QueueEntry);
+ qe->creation_timestamp = GNUNET_TIME_absolute_get ();
qe->mid = queue->mid_gen;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Create QueueEntry with MID %" PRIu64
@@ -4552,11 +4603,14 @@ queue_send_msg (struct Queue *queue,
{
// Messages without FC or fragments can get here.
if (NULL != pm)
+ {
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Message %" PRIu64
" (pm type %u) was not send because queue has no
capacity.\n",
pm->logging_uuid,
pm->pmt);
+ pm->qe = NULL;
+ }
GNUNET_free (env);
GNUNET_free (qe);
return;
@@ -4579,6 +4633,15 @@ queue_send_msg (struct Queue *queue,
if (0 == queue->q_capacity)
queue->idle = GNUNET_NO;
+ if (GNUNET_NO == queue->idle)
+ {
+ struct TransportClient *tc = queue->tc;
+
+ if (NULL == tc->details.communicator.free_queue_entry_task)
+ tc->details.communicator.free_queue_entry_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+
&free_timedout_queue_entry,
+
tc);
+ }
if (NULL != pm && NULL != (pa = pm->pa_head))
{
while (pm != pa->pm)
@@ -10368,77 +10431,12 @@ handle_del_queue_message (void *cls,
}
-/**
- * Message was transmitted. Process the request.
- *
- * @param cls the client
- * @param sma the send message that was sent
- */
static void
-handle_send_message_ack (void *cls,
- const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
+free_queue_entry (struct QueueEntry *qe,
+ struct TransportClient *tc)
{
- struct TransportClient *tc = cls;
- struct QueueEntry *qe;
struct PendingMessage *pm;
- if (CT_COMMUNICATOR != tc->type)
- {
- GNUNET_break (0);
- GNUNET_SERVICE_client_drop (tc->client);
- return;
- }
-
- /* find our queue entry matching the ACK */
- qe = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Looking for queue for PID %s\n",
- GNUNET_i2s (&sma->receiver));
- for (struct Queue *queue = tc->details.communicator.queue_head; NULL !=
queue;
- queue = queue->next_client)
- {
- if (0 != GNUNET_memcmp (&queue->neighbour->pid, &sma->receiver))
- continue;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Found PID %s\n",
- GNUNET_i2s (&queue->neighbour->pid));
-
-
- for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
- qep = qep->next)
- {
- if (qep->mid != GNUNET_ntohll (sma->mid) || queue->qid != ntohl (
- sma->qid))
- continue;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "QueueEntry MID: %" PRIu64 " on queue QID: %u, Ack MID: %"
- PRIu64 " Ack QID %u\n",
- qep->mid,
- queue->qid,
- GNUNET_ntohll (sma->mid),
- ntohl (sma->qid));
- qe = qep;
- if ((NULL != qe->pm) && (qe->pm->qe != qe))
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "For pending message %" PRIu64 " we had
retransmissions.\n",
- qe->pm->logging_uuid);
- break;
- }
- }
- if (NULL == qe)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "No QueueEntry found for Ack MID %" PRIu64 " QID: %u\n",
- GNUNET_ntohll (sma->mid),
- ntohl (sma->qid));
- // TODO I guess this can happen, if the Ack from the peer comes before the
Ack from the queue.
- // Update: Maybe QueueEntry was accidentally freed during freeing
PendingMessage.
- /* this should never happen */
- // GNUNET_break (0);
- // GNUNET_SERVICE_client_drop (tc->client);
- GNUNET_SERVICE_client_continue (tc->client);
- return;
- }
GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
qe->queue->queue_tail,
qe);
@@ -10451,7 +10449,6 @@ handle_send_message_ack (void *cls,
GNUNET_i2s (&qe->queue->neighbour->pid),
qe->queue->queue_length,
tc->details.communicator.total_queue_length);
- GNUNET_SERVICE_client_continue (tc->client);
/* if applicable, resume transmissions that waited on ACK */
if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 ==
@@ -10525,6 +10522,81 @@ handle_send_message_ack (void *cls,
}
+/**
+ * Message was transmitted. Process the request.
+ *
+ * @param cls the client
+ * @param sma the send message that was sent
+ */
+static void
+handle_send_message_ack (void *cls,
+ const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
+{
+ struct TransportClient *tc = cls;
+ struct QueueEntry *qe;
+
+ if (CT_COMMUNICATOR != tc->type)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (tc->client);
+ return;
+ }
+
+ /* find our queue entry matching the ACK */
+ qe = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Looking for queue for PID %s\n",
+ GNUNET_i2s (&sma->receiver));
+ for (struct Queue *queue = tc->details.communicator.queue_head; NULL !=
queue;
+ queue = queue->next_client)
+ {
+ if (0 != GNUNET_memcmp (&queue->neighbour->pid, &sma->receiver))
+ continue;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found PID %s\n",
+ GNUNET_i2s (&queue->neighbour->pid));
+
+
+ for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
+ qep = qep->next)
+ {
+ if (qep->mid != GNUNET_ntohll (sma->mid) || queue->qid != ntohl (
+ sma->qid))
+ continue;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "QueueEntry MID: %" PRIu64 " on queue QID: %u, Ack MID: %"
+ PRIu64 " Ack QID %u\n",
+ qep->mid,
+ queue->qid,
+ GNUNET_ntohll (sma->mid),
+ ntohl (sma->qid));
+ qe = qep;
+ if ((NULL != qe->pm) && (qe->pm->qe != qe))
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "For pending message %" PRIu64 " we had
retransmissions.\n",
+ qe->pm->logging_uuid);
+ break;
+ }
+ }
+ if (NULL == qe)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "No QueueEntry found for Ack MID %" PRIu64 " QID: %u\n",
+ GNUNET_ntohll (sma->mid),
+ ntohl (sma->qid));
+ // TODO I guess this can happen, if the Ack from the peer comes before the
Ack from the queue.
+ // Update: Maybe QueueEntry was accidentally freed during freeing
PendingMessage.
+ /* this should never happen */
+ // GNUNET_break (0);
+ // GNUNET_SERVICE_client_drop (tc->client);
+ GNUNET_SERVICE_client_continue (tc->client);
+ return;
+ }
+ free_queue_entry (qe, tc);
+ GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
/**
* Iterator telling new MONITOR client about all existing
* queues to peers.
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.