[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] branch master updated: -improve job queue handling
From: |
gnunet |
Subject: |
[gnunet] branch master updated: -improve job queue handling |
Date: |
Wed, 26 Oct 2022 05:14:03 +0200 |
This is an automated email from the git hooks/post-receive script.
martin-schanzenbach pushed a commit to branch master
in repository gnunet.
The following commit(s) were added to refs/heads/master by this push:
new 85b690809 -improve job queue handling
85b690809 is described below
commit 85b690809e76aaddb83fd9be96a69426706ec617
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Wed Oct 26 12:13:56 2022 +0900
-improve job queue handling
---
src/zonemaster/gnunet-service-zonemaster.c | 146 ++++++++++++++++-------------
1 file changed, 79 insertions(+), 67 deletions(-)
diff --git a/src/zonemaster/gnunet-service-zonemaster.c
b/src/zonemaster/gnunet-service-zonemaster.c
index 09053a676..863716a44 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -57,15 +57,9 @@
#define NS_BLOCK_SIZE 1000
/**
- * How many pending DHT operations do we allow at most?
+ * How many open jobs (and with it maximum amount of pending DHT operations)
do we allow at most
*/
-#define DHT_QUEUE_LIMIT 5000
-
-/**
- * How many events may the namestore give us before it has to wait
- * for us to keep up?
- */
-#define NAMESTORE_QUEUE_LIMIT 50
+#define JOB_QUEUE_LIMIT 5000
/**
* How many events may the namestore give us before it has to wait
@@ -122,6 +116,16 @@ static pthread_cond_t empty_jobs;
*/
static int in_shutdown = GNUNET_NO;
+/**
+ * Iterator halted?
+ */
+static int iterator_halted = GNUNET_NO;
+
+/**
+ * Monitor halted?
+ */
+static int monitor_halted = GNUNET_NO;
+
/**
* Our notification pipe
*/
@@ -274,9 +278,9 @@ static struct DhtPutActivity *it_head;
static struct DhtPutActivity *it_tail;
/**
- * Number of entries in the DHT queue #it_head.
+ * Number of entries in the job queue #jobs_head.
*/
-static unsigned int dht_queue_length;
+static unsigned int job_queue_length;
/**
* Useful for zone update for DHT put
@@ -419,6 +423,7 @@ shutdown_task (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Removing incomplete jobs\n");
GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job);
+ job_queue_length--;
free_job (job);
}
GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
@@ -439,7 +444,6 @@ shutdown_task (void *cls)
GNUNET_CONTAINER_DLL_remove (it_head,
it_tail,
ma);
- dht_queue_length--;
GNUNET_free (ma);
}
if (NULL != statistics)
@@ -706,8 +710,8 @@ update_velocity (unsigned int cnt)
}
}
GNUNET_STATISTICS_set (statistics,
- "# size of the DHT queue (it)",
- dht_queue_length,
+ "# dispatched jobs",
+ job_queue_length,
GNUNET_NO);
GNUNET_STATISTICS_set (statistics,
"% speed increase needed for target velocity",
@@ -769,14 +773,21 @@ dht_put_continuation (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"PUT complete\n");
/* When we just fall under the limit, trigger monitor/iterator again
- * creating a race condition, but we may actually have finished more
- * PUTs by the time they come back and both can carry on */
- if (dht_queue_length == DHT_QUEUE_LIMIT)
+ * if halted. We can only safely trigger one, prefer iterator. */
+ if (job_queue_length <= JOB_QUEUE_LIMIT)
{
- GNUNET_NAMESTORE_zone_monitor_next (zmon, 1);
- GNUNET_NAMESTORE_zone_iterator_next (namestore_iter, 1);
+ if (GNUNET_YES == iterator_halted)
+ {
+ GNUNET_NAMESTORE_zone_iterator_next (namestore_iter, 1);
+ iterator_halted = GNUNET_NO;
+ }
+ else if (GNUNET_YES == monitor_halted)
+ {
+ GNUNET_NAMESTORE_zone_monitor_next (zmon, 1);
+ monitor_halted = GNUNET_NO;
+ }
}
- dht_queue_length--;
+ job_queue_length--;
GNUNET_CONTAINER_DLL_remove (it_head,
it_tail,
ma);
@@ -795,12 +806,12 @@ dht_put_continuation (void *cls)
* @return DHT PUT handle, NULL on error
*/
static void
-perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey *key,
- const char *label,
- const struct GNUNET_GNSRECORD_Data *rd,
- unsigned int rd_count,
- const struct GNUNET_TIME_Absolute expire,
- struct DhtPutActivity *ma)
+dispatch_job (const struct GNUNET_IDENTITY_PrivateKey *key,
+ const char *label,
+ const struct GNUNET_GNSRECORD_Data *rd,
+ unsigned int rd_count,
+ const struct GNUNET_TIME_Absolute expire,
+ struct DhtPutActivity *ma)
{
struct GNUNET_GNSRECORD_Data rd_public[rd_count];
struct GNUNET_GNSRECORD_Block *block;
@@ -1024,12 +1035,12 @@ zone_iteration_finished (void *cls)
* @param rd the record data
*/
static void
-put_gns_record (void *cls,
- const struct GNUNET_IDENTITY_PrivateKey *key,
- const char *label,
- unsigned int rd_count,
- const struct GNUNET_GNSRECORD_Data *rd,
- struct GNUNET_TIME_Absolute expire)
+handle_record (void *cls,
+ const struct GNUNET_IDENTITY_PrivateKey *key,
+ const char *label,
+ unsigned int rd_count,
+ const struct GNUNET_GNSRECORD_Data *rd,
+ struct GNUNET_TIME_Absolute expire)
{
struct DhtPutActivity *ma;
@@ -1061,26 +1072,26 @@ put_gns_record (void *cls,
put_cnt++;
if (0 == put_cnt % DELTA_INTERVAL)
update_velocity (DELTA_INTERVAL);
- if (dht_queue_length >= DHT_QUEUE_LIMIT)
+ ma = GNUNET_new (struct DhtPutActivity);
+ dispatch_job (key,
+ label,
+ rd,
+ rd_count,
+ expire,
+ ma);
+ GNUNET_CONTAINER_DLL_insert_tail (it_head,
+ it_tail,
+ ma);
+ job_queue_length++;
+ if (job_queue_length >= JOB_QUEUE_LIMIT)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "DHT PUT queue length exceeded (%u), aborting PUT\n",
- DHT_QUEUE_LIMIT);
+ "Job queue length exceeded (%u). Halting namestore
iteration.\n",
+ JOB_QUEUE_LIMIT);
+ iterator_halted = GNUNET_YES;
return;
}
check_zone_namestore_next ();
-
- ma = GNUNET_new (struct DhtPutActivity);
- perform_dht_put (key,
- label,
- rd,
- rd_count,
- expire,
- ma);
- dht_queue_length++;
- GNUNET_CONTAINER_DLL_insert_tail (it_head,
- it_tail,
- ma);
}
@@ -1109,7 +1120,7 @@ publish_zone_dht_start (void *cls)
NULL, /* All zones */
&zone_iteration_error,
NULL,
- &put_gns_record,
+ &handle_record,
NULL,
&zone_iteration_finished,
NULL,
@@ -1129,12 +1140,12 @@ publish_zone_dht_start (void *cls)
* @return DHT PUT handle, NULL on error
*/
static void
-perform_dht_put_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
- const char *label,
- const struct GNUNET_GNSRECORD_Data *rd,
- unsigned int rd_count,
- struct GNUNET_TIME_Absolute expire,
- struct DhtPutActivity *ma)
+dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
+ const char *label,
+ const struct GNUNET_GNSRECORD_Data *rd,
+ unsigned int rd_count,
+ struct GNUNET_TIME_Absolute expire,
+ struct DhtPutActivity *ma)
{
struct GNUNET_GNSRECORD_Data rd_public[rd_count];
struct GNUNET_GNSRECORD_Block *block;
@@ -1232,26 +1243,27 @@ handle_monitor_event (void *cls,
1);
return; /* nothing to do */
}
- if (dht_queue_length >= DHT_QUEUE_LIMIT)
+ ma = GNUNET_new (struct DhtPutActivity);
+ dispatch_job_monitor (zone,
+ label,
+ rd,
+ rd_count,
+ expire,
+ ma);
+ GNUNET_CONTAINER_DLL_insert_tail (it_head,
+ it_tail,
+ ma);
+ job_queue_length++;
+ if (job_queue_length >= JOB_QUEUE_LIMIT)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "DHT PUT queue length exceeded (%u), aborting PUT\n",
- DHT_QUEUE_LIMIT);
+ "Job queue length exceeded (%u). Halting monitor.\n",
+ JOB_QUEUE_LIMIT);
+ monitor_halted = GNUNET_YES;
return;
}
- ma = GNUNET_new (struct DhtPutActivity);
- perform_dht_put_monitor (zone,
- label,
- rd,
- rd_count,
- expire,
- ma);
GNUNET_NAMESTORE_zone_monitor_next (zmon,
1);
- GNUNET_CONTAINER_DLL_insert_tail (it_head,
- it_tail,
- ma);
- dht_queue_length++;
}
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet] branch master updated: -improve job queue handling,
gnunet <=