gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] 01/03: Transport: Added cleanup task to remove QueueEntry we go


From: gnunet
Subject: [gnunet] 01/03: Transport: Added cleanup task to remove QueueEntry we got no ACK for.
Date: Wed, 20 Dec 2023 09:04:14 +0100

This is an automated email from the git hooks/post-receive script.

t3sserakt pushed a commit to branch master
in repository gnunet.

commit 36a9952f00f7b9316d41ce2f970b0a46f3f2fb51
Author: t3sserakt <t3ss@posteo.de>
AuthorDate: Wed Dec 20 08:51:23 2023 +0100

    Transport: Added cleanup task to remove QueueEntry we got no ACK for.
---
 src/service/transport/gnunet-service-transport.c | 208 +++++++++++++++--------
 1 file changed, 140 insertions(+), 68 deletions(-)

diff --git a/src/service/transport/gnunet-service-transport.c 
b/src/service/transport/gnunet-service-transport.c
index d0d605465..b722e51d9 100644
--- a/src/service/transport/gnunet-service-transport.c
+++ b/src/service/transport/gnunet-service-transport.c
@@ -318,6 +318,12 @@
  */
 #define QUEUE_LENGTH_LIMIT 32
 
+/**
+ *
+ */
+#define QUEUE_ENTRY_TIMEOUT \
+        GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
+
 
 GNUNET_NETWORK_STRUCT_BEGIN
 
@@ -1820,6 +1826,11 @@ struct QueueEntry
    * Message ID used for this message with the queue used for transmission.
    */
   uint64_t mid;
+
+  /**
+   * Timestamp this QueueEntry was created.
+   */
+  struct GNUNET_TIME_Absolute creation_timestamp;
 };
 
 
@@ -2546,6 +2557,11 @@ struct TransportClient
        */
       unsigned int total_queue_length;
 
+      /**
+       * Task to check for timed out QueueEntry.
+       */
+      struct GNUNET_SCHEDULER_Task *free_queue_entry_task;
+
       /**
        * Characteristics of this communicator.
        */
@@ -4025,6 +4041,8 @@ client_disconnect_cb (void *cls,
       struct Queue *q;
       struct AddressListEntry *ale;
 
+      if (NULL != tc->details.communicator.free_queue_entry_task)
+        GNUNET_SCHEDULER_cancel 
(tc->details.communicator.free_queue_entry_task);
       while (NULL != (q = tc->details.communicator.queue_head))
         free_queue (q);
       while (NULL != (ale = tc->details.communicator.addr_head))
@@ -4482,6 +4500,38 @@ sign_ephemeral (struct DistanceVector *dv)
 }
 
 
+static void
+free_queue_entry (struct QueueEntry *qe,
+                  struct TransportClient *tc);
+
+
+static void
+free_timedout_queue_entry (void *cls)
+{
+  struct TransportClient *tc = cls;
+  struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
+
+  for (struct Queue *queue = tc->details.communicator.queue_head; NULL != 
queue;
+       queue = queue->next_client)
+  {
+    for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
+      qep = qep->next)
+    {
+      struct GNUNET_TIME_Relative diff = GNUNET_TIME_absolute_get_difference 
(qep->creation_timestamp, now);
+      if (GNUNET_TIME_relative_cmp (QUEUE_ENTRY_TIMEOUT, < , diff))
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Freeing timed out QueueEntry with MID %" PRIu64
+                " and QID %u\n",
+                qep->mid,
+                queue->qid);
+        free_queue_entry(qep, tc);
+      }
+    }
+  }
+}
+
+
 /**
  * Send the message @a payload on @a queue.
  *
@@ -4522,6 +4572,7 @@ queue_send_msg (struct Queue *queue,
     struct QueueEntry *qe;
 
     qe = GNUNET_new (struct QueueEntry);
+    qe->creation_timestamp = GNUNET_TIME_absolute_get ();
     qe->mid = queue->mid_gen;
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Create QueueEntry with MID %" PRIu64
@@ -4552,11 +4603,14 @@ queue_send_msg (struct Queue *queue,
     {
       // Messages without FC or fragments can get here.
       if (NULL != pm)
+      {
         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                     "Message %" PRIu64
                     " (pm type %u) was not send because queue has no 
capacity.\n",
                     pm->logging_uuid,
                     pm->pmt);
+        pm->qe = NULL;
+      }
       GNUNET_free (env);
       GNUNET_free (qe);
       return;
@@ -4579,6 +4633,15 @@ queue_send_msg (struct Queue *queue,
     if (0 == queue->q_capacity)
       queue->idle = GNUNET_NO;
 
+    if (GNUNET_NO == queue->idle)
+    {
+      struct TransportClient *tc = queue->tc;
+
+      if (NULL == tc->details.communicator.free_queue_entry_task)
+        tc->details.communicator.free_queue_entry_task = 
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+                                                                               
        &free_timedout_queue_entry,
+                                                                               
        tc);
+    }
     if (NULL != pm && NULL != (pa = pm->pa_head))
     {
       while (pm != pa->pm)
@@ -10368,77 +10431,12 @@ handle_del_queue_message (void *cls,
 }
 
 
-/**
- * Message was transmitted.  Process the request.
- *
- * @param cls the client
- * @param sma the send message that was sent
- */
 static void
-handle_send_message_ack (void *cls,
-                         const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
+free_queue_entry (struct QueueEntry *qe,
+                  struct TransportClient *tc)
 {
-  struct TransportClient *tc = cls;
-  struct QueueEntry *qe;
   struct PendingMessage *pm;
 
-  if (CT_COMMUNICATOR != tc->type)
-  {
-    GNUNET_break (0);
-    GNUNET_SERVICE_client_drop (tc->client);
-    return;
-  }
-
-  /* find our queue entry matching the ACK */
-  qe = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Looking for queue for PID %s\n",
-              GNUNET_i2s (&sma->receiver));
-  for (struct Queue *queue = tc->details.communicator.queue_head; NULL != 
queue;
-       queue = queue->next_client)
-  {
-    if (0 != GNUNET_memcmp (&queue->neighbour->pid, &sma->receiver))
-      continue;
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Found PID %s\n",
-                GNUNET_i2s (&queue->neighbour->pid));
-
-
-    for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
-         qep = qep->next)
-    {
-      if (qep->mid != GNUNET_ntohll (sma->mid) || queue->qid != ntohl (
-            sma->qid))
-        continue;
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "QueueEntry MID: %" PRIu64 " on queue QID: %u, Ack MID: %"
-                  PRIu64 " Ack QID %u\n",
-                  qep->mid,
-                  queue->qid,
-                  GNUNET_ntohll (sma->mid),
-                  ntohl (sma->qid));
-      qe = qep;
-      if ((NULL != qe->pm) && (qe->pm->qe != qe))
-        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                    "For pending message %" PRIu64 " we had 
retransmissions.\n",
-                    qe->pm->logging_uuid);
-      break;
-    }
-  }
-  if (NULL == qe)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "No QueueEntry found for Ack MID %" PRIu64 " QID: %u\n",
-                GNUNET_ntohll (sma->mid),
-                ntohl (sma->qid));
-    // TODO I guess this can happen, if the Ack from the peer comes before the 
Ack from the queue.
-    // Update: Maybe QueueEntry was accidentally freed during freeing 
PendingMessage.
-    /* this should never happen */
-    // GNUNET_break (0);
-    // GNUNET_SERVICE_client_drop (tc->client);
-    GNUNET_SERVICE_client_continue (tc->client);
-    return;
-  }
   GNUNET_CONTAINER_DLL_remove (qe->queue->queue_head,
                                qe->queue->queue_tail,
                                qe);
@@ -10451,7 +10449,6 @@ handle_send_message_ack (void *cls,
               GNUNET_i2s (&qe->queue->neighbour->pid),
               qe->queue->queue_length,
               tc->details.communicator.total_queue_length);
-  GNUNET_SERVICE_client_continue (tc->client);
 
   /* if applicable, resume transmissions that waited on ACK */
   if (COMMUNICATOR_TOTAL_QUEUE_LIMIT - 1 ==
@@ -10525,6 +10522,81 @@ handle_send_message_ack (void *cls,
 }
 
 
+/**
+ * Message was transmitted.  Process the request.
+ *
+ * @param cls the client
+ * @param sma the send message that was sent
+ */
+static void
+handle_send_message_ack (void *cls,
+                         const struct GNUNET_TRANSPORT_SendMessageToAck *sma)
+{
+  struct TransportClient *tc = cls;
+  struct QueueEntry *qe;
+
+  if (CT_COMMUNICATOR != tc->type)
+  {
+    GNUNET_break (0);
+    GNUNET_SERVICE_client_drop (tc->client);
+    return;
+  }
+
+  /* find our queue entry matching the ACK */
+  qe = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Looking for queue for PID %s\n",
+              GNUNET_i2s (&sma->receiver));
+  for (struct Queue *queue = tc->details.communicator.queue_head; NULL != 
queue;
+       queue = queue->next_client)
+  {
+    if (0 != GNUNET_memcmp (&queue->neighbour->pid, &sma->receiver))
+      continue;
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Found PID %s\n",
+                GNUNET_i2s (&queue->neighbour->pid));
+
+
+    for (struct QueueEntry *qep = queue->queue_head; NULL != qep;
+         qep = qep->next)
+    {
+      if (qep->mid != GNUNET_ntohll (sma->mid) || queue->qid != ntohl (
+            sma->qid))
+        continue;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "QueueEntry MID: %" PRIu64 " on queue QID: %u, Ack MID: %"
+                  PRIu64 " Ack QID %u\n",
+                  qep->mid,
+                  queue->qid,
+                  GNUNET_ntohll (sma->mid),
+                  ntohl (sma->qid));
+      qe = qep;
+      if ((NULL != qe->pm) && (qe->pm->qe != qe))
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "For pending message %" PRIu64 " we had 
retransmissions.\n",
+                    qe->pm->logging_uuid);
+      break;
+    }
+  }
+  if (NULL == qe)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "No QueueEntry found for Ack MID %" PRIu64 " QID: %u\n",
+                GNUNET_ntohll (sma->mid),
+                ntohl (sma->qid));
+    // TODO I guess this can happen, if the Ack from the peer comes before the 
Ack from the queue.
+    // Update: Maybe QueueEntry was accidentally freed during freeing 
PendingMessage.
+    /* this should never happen */
+    // GNUNET_break (0);
+    // GNUNET_SERVICE_client_drop (tc->client);
+    GNUNET_SERVICE_client_continue (tc->client);
+    return;
+  }
+  free_queue_entry (qe, tc);
+  GNUNET_SERVICE_client_continue (tc->client);
+}
+
+
 /**
  * Iterator telling new MONITOR client about all existing
  * queues to peers.

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]