gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated: -improve job queue handling


From: gnunet
Subject: [gnunet] branch master updated: -improve job queue handling
Date: Wed, 26 Oct 2022 05:14:03 +0200

This is an automated email from the git hooks/post-receive script.

martin-schanzenbach pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 85b690809 -improve job queue handling
85b690809 is described below

commit 85b690809e76aaddb83fd9be96a69426706ec617
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Wed Oct 26 12:13:56 2022 +0900

    -improve job queue handling
---
 src/zonemaster/gnunet-service-zonemaster.c | 146 ++++++++++++++++-------------
 1 file changed, 79 insertions(+), 67 deletions(-)

diff --git a/src/zonemaster/gnunet-service-zonemaster.c 
b/src/zonemaster/gnunet-service-zonemaster.c
index 09053a676..863716a44 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -57,15 +57,9 @@
 #define NS_BLOCK_SIZE 1000
 
 /**
- * How many pending DHT operations do we allow at most?
+ * How many open jobs (and with it maximum amount of pending DHT operations) 
do we allow at most
  */
-#define DHT_QUEUE_LIMIT 5000
-
-/**
- * How many events may the namestore give us before it has to wait
- * for us to keep up?
- */
-#define NAMESTORE_QUEUE_LIMIT 50
+#define JOB_QUEUE_LIMIT 5000
 
 /**
  * How many events may the namestore give us before it has to wait
@@ -122,6 +116,16 @@ static pthread_cond_t empty_jobs;
  */
 static int in_shutdown = GNUNET_NO;
 
+/**
+ * Iterator halted?
+ */
+static int iterator_halted = GNUNET_NO;
+
+/**
+ * Monitor halted?
+ */
+static int monitor_halted = GNUNET_NO;
+
 /**
  * Our notification pipe
  */
@@ -274,9 +278,9 @@ static struct DhtPutActivity *it_head;
 static struct DhtPutActivity *it_tail;
 
 /**
- * Number of entries in the DHT queue #it_head.
+ * Number of entries in the job queue #jobs_head.
  */
-static unsigned int dht_queue_length;
+static unsigned int job_queue_length;
 
 /**
  * Useful for zone update for DHT put
@@ -419,6 +423,7 @@ shutdown_task (void *cls)
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "Removing incomplete jobs\n");
     GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job);
+    job_queue_length--;
     free_job (job);
   }
   GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
@@ -439,7 +444,6 @@ shutdown_task (void *cls)
     GNUNET_CONTAINER_DLL_remove (it_head,
                                  it_tail,
                                  ma);
-    dht_queue_length--;
     GNUNET_free (ma);
   }
   if (NULL != statistics)
@@ -706,8 +710,8 @@ update_velocity (unsigned int cnt)
     }
   }
   GNUNET_STATISTICS_set (statistics,
-                         "# size of the DHT queue (it)",
-                         dht_queue_length,
+                         "# dispatched jobs",
+                         job_queue_length,
                          GNUNET_NO);
   GNUNET_STATISTICS_set (statistics,
                          "% speed increase needed for target velocity",
@@ -769,14 +773,21 @@ dht_put_continuation (void *cls)
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "PUT complete\n");
   /* When we just fall under the limit, trigger monitor/iterator again
-   * creating a race condition, but we may actually have finished more
-   * PUTs by the time they come back and both can carry on */
-  if (dht_queue_length == DHT_QUEUE_LIMIT)
+   * if halted. We can only safely trigger one, prefer iterator. */
+  if (job_queue_length <= JOB_QUEUE_LIMIT)
   {
-    GNUNET_NAMESTORE_zone_monitor_next (zmon, 1);
-    GNUNET_NAMESTORE_zone_iterator_next (namestore_iter, 1);
+    if (GNUNET_YES == iterator_halted)
+    {
+      GNUNET_NAMESTORE_zone_iterator_next (namestore_iter, 1);
+      iterator_halted = GNUNET_NO;
+    }
+    else if (GNUNET_YES == monitor_halted)
+    {
+      GNUNET_NAMESTORE_zone_monitor_next (zmon, 1);
+      monitor_halted = GNUNET_NO;
+    }
   }
-  dht_queue_length--;
+  job_queue_length--;
   GNUNET_CONTAINER_DLL_remove (it_head,
                                it_tail,
                                ma);
@@ -795,12 +806,12 @@ dht_put_continuation (void *cls)
  * @return DHT PUT handle, NULL on error
  */
 static void
-perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey *key,
-                 const char *label,
-                 const struct GNUNET_GNSRECORD_Data *rd,
-                 unsigned int rd_count,
-                 const struct GNUNET_TIME_Absolute expire,
-                 struct DhtPutActivity *ma)
+dispatch_job (const struct GNUNET_IDENTITY_PrivateKey *key,
+              const char *label,
+              const struct GNUNET_GNSRECORD_Data *rd,
+              unsigned int rd_count,
+              const struct GNUNET_TIME_Absolute expire,
+              struct DhtPutActivity *ma)
 {
   struct GNUNET_GNSRECORD_Data rd_public[rd_count];
   struct GNUNET_GNSRECORD_Block *block;
@@ -1024,12 +1035,12 @@ zone_iteration_finished (void *cls)
  * @param rd the record data
  */
 static void
-put_gns_record (void *cls,
-                const struct GNUNET_IDENTITY_PrivateKey *key,
-                const char *label,
-                unsigned int rd_count,
-                const struct GNUNET_GNSRECORD_Data *rd,
-                struct GNUNET_TIME_Absolute expire)
+handle_record (void *cls,
+               const struct GNUNET_IDENTITY_PrivateKey *key,
+               const char *label,
+               unsigned int rd_count,
+               const struct GNUNET_GNSRECORD_Data *rd,
+               struct GNUNET_TIME_Absolute expire)
 {
   struct DhtPutActivity *ma;
 
@@ -1061,26 +1072,26 @@ put_gns_record (void *cls,
   put_cnt++;
   if (0 == put_cnt % DELTA_INTERVAL)
     update_velocity (DELTA_INTERVAL);
-  if (dht_queue_length >= DHT_QUEUE_LIMIT)
+  ma = GNUNET_new (struct DhtPutActivity);
+  dispatch_job (key,
+                label,
+                rd,
+                rd_count,
+                expire,
+                ma);
+  GNUNET_CONTAINER_DLL_insert_tail (it_head,
+                                    it_tail,
+                                    ma);
+  job_queue_length++;
+  if (job_queue_length >= JOB_QUEUE_LIMIT)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "DHT PUT queue length exceeded (%u), aborting PUT\n",
-                DHT_QUEUE_LIMIT);
+                "Job queue length exceeded (%u). Halting namestore 
iteration.\n",
+                JOB_QUEUE_LIMIT);
+    iterator_halted = GNUNET_YES;
     return;
   }
   check_zone_namestore_next ();
-
-  ma = GNUNET_new (struct DhtPutActivity);
-  perform_dht_put (key,
-                   label,
-                   rd,
-                   rd_count,
-                   expire,
-                   ma);
-  dht_queue_length++;
-  GNUNET_CONTAINER_DLL_insert_tail (it_head,
-                                    it_tail,
-                                    ma);
 }
 
 
@@ -1109,7 +1120,7 @@ publish_zone_dht_start (void *cls)
                                               NULL, /* All zones */
                                               &zone_iteration_error,
                                               NULL,
-                                              &put_gns_record,
+                                              &handle_record,
                                               NULL,
                                               &zone_iteration_finished,
                                               NULL,
@@ -1129,12 +1140,12 @@ publish_zone_dht_start (void *cls)
  * @return DHT PUT handle, NULL on error
  */
 static void
-perform_dht_put_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
-                         const char *label,
-                         const struct GNUNET_GNSRECORD_Data *rd,
-                         unsigned int rd_count,
-                         struct GNUNET_TIME_Absolute expire,
-                         struct DhtPutActivity *ma)
+dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
+                      const char *label,
+                      const struct GNUNET_GNSRECORD_Data *rd,
+                      unsigned int rd_count,
+                      struct GNUNET_TIME_Absolute expire,
+                      struct DhtPutActivity *ma)
 {
   struct GNUNET_GNSRECORD_Data rd_public[rd_count];
   struct GNUNET_GNSRECORD_Block *block;
@@ -1232,26 +1243,27 @@ handle_monitor_event (void *cls,
                                         1);
     return;   /* nothing to do */
   }
-  if (dht_queue_length >= DHT_QUEUE_LIMIT)
+  ma = GNUNET_new (struct DhtPutActivity);
+  dispatch_job_monitor (zone,
+                        label,
+                        rd,
+                        rd_count,
+                        expire,
+                        ma);
+  GNUNET_CONTAINER_DLL_insert_tail (it_head,
+                                    it_tail,
+                                    ma);
+  job_queue_length++;
+  if (job_queue_length >= JOB_QUEUE_LIMIT)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "DHT PUT queue length exceeded (%u), aborting PUT\n",
-                DHT_QUEUE_LIMIT);
+                "Job queue length exceeded (%u). Halting monitor.\n",
+                JOB_QUEUE_LIMIT);
+    monitor_halted = GNUNET_YES;
     return;
   }
-  ma = GNUNET_new (struct DhtPutActivity);
-  perform_dht_put_monitor (zone,
-                           label,
-                           rd,
-                           rd_count,
-                           expire,
-                           ma);
   GNUNET_NAMESTORE_zone_monitor_next (zmon,
                                       1);
-  GNUNET_CONTAINER_DLL_insert_tail (it_head,
-                                    it_tail,
-                                    ma);
-  dht_queue_length++;
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]