[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet] branch master updated: -cleanup
From: |
gnunet |
Subject: |
[gnunet] branch master updated: -cleanup |
Date: |
Wed, 26 Oct 2022 06:22:52 +0200 |
This is an automated email from the git hooks/post-receive script.
martin-schanzenbach pushed a commit to branch master
in repository gnunet.
The following commit(s) were added to refs/heads/master by this push:
new 27929f3f8 -cleanup
27929f3f8 is described below
commit 27929f3f87f6c94e74d88d3ad49602bb546a0e4c
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Wed Oct 26 13:22:45 2022 +0900
-cleanup
---
src/zonemaster/gnunet-service-zonemaster.c | 281 ++++++++++++++---------------
1 file changed, 132 insertions(+), 149 deletions(-)
diff --git a/src/zonemaster/gnunet-service-zonemaster.c
b/src/zonemaster/gnunet-service-zonemaster.c
index 863716a44..8e5d157fd 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -97,19 +97,19 @@
static pthread_t * worker;
/**
- * Lock for the open jobs queue.
+ * Lock for the sign jobs queue.
*/
-static pthread_mutex_t jobs_lock;
+static pthread_mutex_t sign_jobs_lock;
/**
- * Lock for the finished results queue.
+ * Lock for the DHT put jobs queue.
*/
-static pthread_mutex_t results_lock;
+static pthread_mutex_t sign_results_lock;
/**
- * Wait condition on new jobs
+ * Wait condition on new sign jobs
*/
-static pthread_cond_t empty_jobs;
+static pthread_cond_t sign_jobs_cond;
/**
* For threads to know we are shutting down
@@ -136,77 +136,94 @@ static struct GNUNET_DISK_PipeHandle *notification_pipe;
*/
static struct GNUNET_SCHEDULER_Task *pipe_read_task;
-struct OpenSignJob
+struct RecordPublicationJob
{
- struct OpenSignJob *next;
+ /**
+ * DLL
+ */
+ struct RecordPublicationJob *next;
- struct OpenSignJob *prev;
+ /**
+ * DLL
+ */
+ struct RecordPublicationJob *prev;
+ /**
+ * The zone key to sign the block with
+ */
struct GNUNET_IDENTITY_PrivateKey zone;
+ /**
+ * The block to sign
+ */
struct GNUNET_GNSRECORD_Block *block;
+ /**
+ * The private block to sign, may point to block in case
+ * the public and private blocks are the same.
+ */
struct GNUNET_GNSRECORD_Block *block_priv;
- struct DhtPutActivity *ma;
-
+ /**
+ * The size of the public block for the DHT put.
+ */
size_t block_size;
+ /**
+ * The expiration time of the public block for the DHT put.
+ */
struct GNUNET_TIME_Absolute expire_pub;
+ /**
+ * The label of the block needed for signing
+ */
char *label;
+ /**
+ * Handle for the DHT PUT operation.
+ */
+ struct GNUNET_DHT_PutHandle *ph;
+
+ /**
+ * When was this PUT initiated?
+ */
+ struct GNUNET_TIME_Absolute start_date;
};
/**
- * DLL
+ * The DLL for workers to retrieve open jobs that require
+ * signing of blocks.
*/
-static struct OpenSignJob *jobs_head;
+static struct RecordPublicationJob *sign_jobs_head;
/**
- * DLL
+ * See above
*/
-static struct OpenSignJob *jobs_tail;
+static struct RecordPublicationJob *sign_jobs_tail;
/**
- * DLL
+ * The DLL for workers to place jobs that are signed.
*/
-static struct OpenSignJob *results_head;
+static struct RecordPublicationJob *sign_results_head;
/**
- * DLL
+ * See above
*/
-static struct OpenSignJob *results_tail;
+static struct RecordPublicationJob *sign_results_tail;
/**
- * Handle for DHT PUT activity triggered from the namestore monitor.
+ * The DLL for jobs currently in the process of being dispatched into the DHT.
*/
-struct DhtPutActivity
-{
- /**
- * Kept in a DLL.
- */
- struct DhtPutActivity *next;
+static struct RecordPublicationJob *dht_jobs_head;
- /**
- * Kept in a DLL.
- */
- struct DhtPutActivity *prev;
-
- /**
- * Handle for the DHT PUT operation.
- */
- struct GNUNET_DHT_PutHandle *ph;
-
- /**
- * When was this PUT initiated?
- */
- struct GNUNET_TIME_Absolute start_date;
+/**
+ * See above
+ */
+static struct RecordPublicationJob *dht_jobs_tail;
-};
/**
* Pending operation on the namecache.
@@ -267,16 +284,6 @@ static int disable_namecache;
*/
static struct GNUNET_NAMESTORE_ZoneIterator *namestore_iter;
-/**
- * Head of iteration put activities; kept in a DLL.
- */
-static struct DhtPutActivity *it_head;
-
-/**
- * Tail of iteration put activities; kept in a DLL.
- */
-static struct DhtPutActivity *it_tail;
-
/**
* Number of entries in the job queue #jobs_head.
*/
@@ -378,7 +385,7 @@ static struct CacheOperation *cop_tail;
static void
-free_job (struct OpenSignJob *job)
+free_job (struct RecordPublicationJob *job)
{
if (job->block != job->block_priv)
GNUNET_free (job->block_priv);
@@ -397,9 +404,8 @@ free_job (struct OpenSignJob *job)
static void
shutdown_task (void *cls)
{
- struct DhtPutActivity *ma;
struct CacheOperation *cop;
- struct OpenSignJob *job;
+ struct RecordPublicationJob *job;
(void) cls;
in_shutdown = GNUNET_YES;
@@ -417,36 +423,35 @@ shutdown_task (void *cls)
GNUNET_CONTAINER_DLL_remove (cop_head, cop_tail, cop);
GNUNET_free (cop);
}
- GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
- while (NULL != (job = jobs_head))
+ GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
+ while (NULL != (job = sign_jobs_head))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Removing incomplete jobs\n");
- GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job);
+ GNUNET_CONTAINER_DLL_remove (sign_jobs_head, sign_jobs_tail, job);
job_queue_length--;
free_job (job);
}
- GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
- GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
- while (NULL != (job = results_head))
+ GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
+ GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock));
+ while (NULL != (job = sign_results_head))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Removing incomplete jobs\n");
- GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job);
+ GNUNET_CONTAINER_DLL_remove (sign_results_head, sign_results_tail, job);
free_job (job);
}
- GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
-
- while (NULL != (ma = it_head))
+ GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
+ while (NULL != (job = dht_jobs_head))
{
- if (NULL != ma->ph)
- GNUNET_DHT_put_cancel (ma->ph);
- GNUNET_CONTAINER_DLL_remove (it_head,
- it_tail,
- ma);
- GNUNET_free (ma);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Removing incomplete jobs\n");
+ GNUNET_CONTAINER_DLL_remove (dht_jobs_head, dht_jobs_tail, job);
+ if (NULL != job->ph)
+ GNUNET_DHT_put_cancel (job->ph);
+ free_job (job);
}
- if (NULL != statistics)
+if (NULL != statistics)
{
GNUNET_STATISTICS_destroy (statistics,
GNUNET_NO);
@@ -763,12 +768,11 @@ check_zone_namestore_next ()
/**
* Continuation called from DHT once the PUT operation is done.
*
- * @param cls a `struct DhtPutActivity`
*/
static void
dht_put_continuation (void *cls)
{
- struct DhtPutActivity *ma = cls;
+ struct RecordPublicationJob *job = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"PUT complete\n");
@@ -788,10 +792,10 @@ dht_put_continuation (void *cls)
}
}
job_queue_length--;
- GNUNET_CONTAINER_DLL_remove (it_head,
- it_tail,
- ma);
- GNUNET_free (ma);
+ GNUNET_CONTAINER_DLL_remove (dht_jobs_head,
+ dht_jobs_tail,
+ job);
+ free_job (job);
}
@@ -810,8 +814,7 @@ dispatch_job (const struct GNUNET_IDENTITY_PrivateKey *key,
const char *label,
const struct GNUNET_GNSRECORD_Data *rd,
unsigned int rd_count,
- const struct GNUNET_TIME_Absolute expire,
- struct DhtPutActivity *ma)
+ const struct GNUNET_TIME_Absolute expire)
{
struct GNUNET_GNSRECORD_Data rd_public[rd_count];
struct GNUNET_GNSRECORD_Block *block;
@@ -858,18 +861,17 @@ dispatch_job (const struct GNUNET_IDENTITY_PrivateKey
*key,
else
block_priv = block;
block_size = GNUNET_GNSRECORD_block_get_size (block);
- GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
- struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
+ GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
+ struct RecordPublicationJob *job = GNUNET_new (struct RecordPublicationJob);
job->block = block;
job->block_size = block_size;
job->block_priv = block_priv;
job->zone = *key;
- job->ma = ma;
job->label = GNUNET_strdup (label);
job->expire_pub = expire_pub;
- GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
- GNUNET_assert (0 == pthread_cond_signal (&empty_jobs));
- GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
+ GNUNET_CONTAINER_DLL_insert (sign_jobs_head, sign_jobs_tail, job);
+ GNUNET_assert (0 == pthread_cond_signal (&sign_jobs_cond));
+ GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating job with %u record(s) for label `%s', expiration
`%s'\n",
rd_public_count,
@@ -887,7 +889,7 @@ static void
initiate_put_from_pipe_trigger (void *cls)
{
struct GNUNET_HashCode query;
- struct OpenSignJob *job;
+ struct RecordPublicationJob *job;
const struct GNUNET_DISK_FileHandle *np_fh;
char buf[100];
ssize_t nf_count;
@@ -907,20 +909,37 @@ initiate_put_from_pipe_trigger (void *cls)
(long long) nf_count);
while (true)
{
- GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
- if (NULL == results_head)
+ GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock));
+ if (NULL == sign_results_head)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"No more results. Back to sleep.\n");
- GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
+ GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
return;
}
- job = results_head;
- GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job);
- GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
+ job = sign_results_head;
+ GNUNET_CONTAINER_DLL_remove (sign_results_head, sign_results_tail, job);
+ GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
GNUNET_GNSRECORD_query_from_private_key (&job->zone,
job->label,
&query);
+ job->ph = GNUNET_DHT_put (dht_handle,
+ &query,
+ DHT_GNS_REPLICATION_LEVEL,
+ GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
+ GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
+ job->block_size,
+ job->block,
+ job->expire_pub,
+ &dht_put_continuation,
+ job);
+ if (NULL == job->ph)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Could not perform DHT PUT, is the DHT running?\n");
+ free_job (job);
+ return;
+ }
GNUNET_STATISTICS_update (statistics,
"DHT put operations initiated",
1,
@@ -929,26 +948,8 @@ initiate_put_from_pipe_trigger (void *cls)
"Storing record(s) for label `%s' in DHT under key %s\n",
job->label,
GNUNET_h2s (&query));
- job->ma->ph = GNUNET_DHT_put (dht_handle,
- &query,
- DHT_GNS_REPLICATION_LEVEL,
- GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
- GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
- job->block_size,
- job->block,
- job->expire_pub,
- &dht_put_continuation,
- job->ma);
- if (NULL == job->ma->ph)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Could not perform DHT PUT, is the DHT running?\n");
- GNUNET_free (job->ma);
- free_job (job);
- return;
- }
refresh_block (job->block_priv);
- free_job (job);
+ GNUNET_CONTAINER_DLL_insert (dht_jobs_head, dht_jobs_tail, job);
}
}
@@ -1042,8 +1043,6 @@ handle_record (void *cls,
const struct GNUNET_GNSRECORD_Data *rd,
struct GNUNET_TIME_Absolute expire)
{
- struct DhtPutActivity *ma;
-
(void) cls;
ns_iteration_left--;
if (0 == rd_count)
@@ -1072,16 +1071,11 @@ handle_record (void *cls,
put_cnt++;
if (0 == put_cnt % DELTA_INTERVAL)
update_velocity (DELTA_INTERVAL);
- ma = GNUNET_new (struct DhtPutActivity);
dispatch_job (key,
label,
rd,
rd_count,
- expire,
- ma);
- GNUNET_CONTAINER_DLL_insert_tail (it_head,
- it_tail,
- ma);
+ expire);
job_queue_length++;
if (job_queue_length >= JOB_QUEUE_LIMIT)
{
@@ -1136,16 +1130,13 @@ publish_zone_dht_start (void *cls)
* @param label label to store under
* @param rd_public public record data
* @param rd_public_count number of records in @a rd_public
- * @param ma handle for the PUT operation
- * @return DHT PUT handle, NULL on error
*/
static void
dispatch_job_monitor (const struct GNUNET_IDENTITY_PrivateKey *key,
const char *label,
const struct GNUNET_GNSRECORD_Data *rd,
unsigned int rd_count,
- struct GNUNET_TIME_Absolute expire,
- struct DhtPutActivity *ma)
+ struct GNUNET_TIME_Absolute expire)
{
struct GNUNET_GNSRECORD_Data rd_public[rd_count];
struct GNUNET_GNSRECORD_Block *block;
@@ -1192,18 +1183,17 @@ dispatch_job_monitor (const struct
GNUNET_IDENTITY_PrivateKey *key,
else
block_priv = block;
block_size = GNUNET_GNSRECORD_block_get_size (block);
- GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
- struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
+ GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
+ struct RecordPublicationJob *job = GNUNET_new (struct RecordPublicationJob);
job->block = block;
job->block_size = block_size;
job->block_priv = block_priv;
job->zone = *key;
- job->ma = ma;
job->label = GNUNET_strdup (label);
job->expire_pub = expire_pub;
- GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
- GNUNET_assert (0 == pthread_cond_signal (&empty_jobs));
- GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
+ GNUNET_CONTAINER_DLL_insert (sign_jobs_head, sign_jobs_tail, job);
+ GNUNET_assert (0 == pthread_cond_signal (&sign_jobs_cond));
+ GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
}
@@ -1226,8 +1216,6 @@ handle_monitor_event (void *cls,
const struct GNUNET_GNSRECORD_Data *rd,
struct GNUNET_TIME_Absolute expire)
{
- struct DhtPutActivity *ma;
-
(void) cls;
GNUNET_STATISTICS_update (statistics,
"Namestore monitor events received",
@@ -1243,16 +1231,11 @@ handle_monitor_event (void *cls,
1);
return; /* nothing to do */
}
- ma = GNUNET_new (struct DhtPutActivity);
dispatch_job_monitor (zone,
label,
rd,
rd_count,
- expire,
- ma);
- GNUNET_CONTAINER_DLL_insert_tail (it_head,
- it_tail,
- ma);
+ expire);
job_queue_length++;
if (job_queue_length >= JOB_QUEUE_LIMIT)
{
@@ -1287,31 +1270,31 @@ handle_monitor_error (void *cls)
static void*
sign_worker (void *cls)
{
- struct OpenSignJob *job;
+ struct RecordPublicationJob *job;
const struct GNUNET_DISK_FileHandle *fh;
fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE);
while (GNUNET_YES != in_shutdown)
{
- GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
- while (NULL == jobs_head)
- GNUNET_assert (0 == pthread_cond_wait (&empty_jobs, &jobs_lock));
+ GNUNET_assert (0 == pthread_mutex_lock (&sign_jobs_lock));
+ while (NULL == sign_jobs_head)
+ GNUNET_assert (0 == pthread_cond_wait (&sign_jobs_cond,
&sign_jobs_lock));
if (GNUNET_YES == in_shutdown)
{
- GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
+ GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
return NULL;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Taking on Job for %s\n", jobs_head->label);
- job = jobs_head;
- GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job);
- GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
+ "Taking on Job for %s\n", sign_jobs_head->label);
+ job = sign_jobs_head;
+ GNUNET_CONTAINER_DLL_remove (sign_jobs_head, sign_jobs_tail, job);
+ GNUNET_assert (0 == pthread_mutex_unlock (&sign_jobs_lock));
GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block);
if (job->block != job->block_priv)
GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv);
- GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
- GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job);
- GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
+ GNUNET_assert (0 == pthread_mutex_lock (&sign_results_lock));
+ GNUNET_CONTAINER_DLL_insert (sign_results_head, sign_results_tail, job);
+ GNUNET_assert (0 == pthread_mutex_unlock (&sign_results_lock));
job = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Done, notifying main thread through pipe!\n");
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet] branch master updated: -cleanup,
gnunet <=