gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated: -fix properly emptying queue; add parall


From: gnunet
Subject: [gnunet] branch master updated: -fix properly emptying queue; add parallelization to monitor
Date: Thu, 20 Oct 2022 10:31:54 +0200

This is an automated email from the git hooks/post-receive script.

martin-schanzenbach pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 01b5953cb -fix properly emptying queue; add parallelization to monitor
01b5953cb is described below

commit 01b5953cb3d3c7f072115ffa7b72884c3614cbae
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Thu Oct 20 17:31:48 2022 +0900

    -fix properly emptying queue; add parallelization to monitor
---
 src/zonemaster/gnunet-service-zonemaster.c | 185 +++++++++++------------------
 1 file changed, 68 insertions(+), 117 deletions(-)

diff --git a/src/zonemaster/gnunet-service-zonemaster.c 
b/src/zonemaster/gnunet-service-zonemaster.c
index 42b3abf91..fb55fd718 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -423,7 +423,8 @@ shutdown_task (void *cls)
   }
   while (NULL != (ma = ma_head))
   {
-    GNUNET_DHT_put_cancel (ma->ph);
+    if (NULL != ma->ph)
+      GNUNET_DHT_put_cancel (ma->ph);
     ma_queue_length--;
     GNUNET_CONTAINER_DLL_remove (ma_head,
                                  ma_tail,
@@ -818,36 +819,25 @@ perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey 
*key,
     GNUNET_free (emsg);
   }
 
-  if (cache_keys)
-  {
-    GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create2 (key,
-                                                                expire_pub,
-                                                                label,
-                                                                rd_public,
-                                                                
rd_public_count,
-                                                                &block));
-  }
-  else
-  {
-    GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key,
-                                                               expire_pub,
-                                                               label,
-                                                               rd_public,
-                                                               rd_public_count,
-                                                               &block));
-  }
+  GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
+                                                                      
expire_pub,
+                                                                      label,
+                                                                      
rd_public,
+                                                                      
rd_public_count,
+                                                                      &block));
   if (NULL == block)
   {
     GNUNET_break (0);
     return NULL;   /* whoops */
   }
   if (rd_count != rd_public_count)
-    GNUNET_assert (GNUNET_OK ==  GNUNET_GNSRECORD_block_create (key,
-                                                                expire,
-                                                                label,
-                                                                rd,
-                                                                rd_count,
-                                                                &block_priv));
+    GNUNET_assert (GNUNET_OK ==  GNUNET_GNSRECORD_block_create_unsigned (key,
+                                                                         
expire,
+                                                                         label,
+                                                                         rd,
+                                                                         
rd_count,
+                                                                         &
+                                                                         
block_priv));
   else
     block_priv = block;
   block_size = GNUNET_GNSRECORD_block_get_size (block);
@@ -879,22 +869,31 @@ initiate_put_from_pipe_trigger (void *cls)
 {
   struct GNUNET_HashCode query;
   struct OpenSignJob *job;
+  const struct GNUNET_DISK_FileHandle *np_fh;
+  char buf[100];
+  ssize_t nf_count;
 
   pipe_read_task = NULL;
   GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
   job = results_head;
+  np_fh = GNUNET_DISK_pipe_handle (notification_pipe,
+                                   GNUNET_DISK_PIPE_END_READ);
+  pipe_read_task =
+    GNUNET_SCHEDULER_add_read_file (
+      GNUNET_TIME_UNIT_FOREVER_REL,
+      np_fh,
+      notification_pipe_cb,
+      NULL);
+  /* empty queue */
+  while (GNUNET_SYSERR !=
+         (nf_count = GNUNET_DISK_file_read (np_fh, buf, sizeof (buf))))
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Read %lld notifications from pipe\n",
+                nf_count);
   if (NULL == job)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Hmm... no results. Back to sleep.\n");
     GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
-    const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
-      notification_pipe,
-      GNUNET_DISK_PIPE_END_READ);
-    pipe_read_task =
-      GNUNET_SCHEDULER_add_read_file (
-        GNUNET_TIME_UNIT_FOREVER_REL,
-        np_fh,
-        notification_pipe_cb,
-        NULL);
     return;
   }
   GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job);
@@ -1119,8 +1118,6 @@ dht_put_monitor_continuation (void *cls)
 {
   struct DhtPutActivity *ma = cls;
 
-  GNUNET_NAMESTORE_zone_monitor_next (zmon,
-                                      1);
   ma_queue_length--;
   GNUNET_CONTAINER_DLL_remove (ma_head,
                                ma_tail,
@@ -1172,73 +1169,39 @@ perform_dht_put_monitor (const struct 
GNUNET_IDENTITY_PrivateKey *key,
     GNUNET_free (emsg);
   }
 
-  if (cache_keys)
-    GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create2 (key,
-                                                                expire_pub,
-                                                                label,
-                                                                rd_public,
-                                                                
rd_public_count,
-                                                                &block));
-  else
-    GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create (key,
-                                                               expire_pub,
-                                                               label,
-                                                               rd_public,
-                                                               rd_public_count,
-                                                               &block));
+  GNUNET_assert (GNUNET_OK == GNUNET_GNSRECORD_block_create_unsigned (key,
+                                                                      
expire_pub,
+                                                                      label,
+                                                                      
rd_public,
+                                                                      
rd_public_count,
+                                                                      &block));
   if (NULL == block)
   {
     GNUNET_break (0);
     return NULL;   /* whoops */
   }
   if (rd_count != rd_public_count)
-    GNUNET_assert (GNUNET_OK ==  GNUNET_GNSRECORD_block_create (key,
-                                                                expire,
-                                                                label,
-                                                                rd,
-                                                                rd_count,
-                                                                &block_priv));
+    GNUNET_assert (GNUNET_OK ==  GNUNET_GNSRECORD_block_create_unsigned (key,
+                                                                         
expire,
+                                                                         label,
+                                                                         rd,
+                                                                         
rd_count,
+                                                                         &
+                                                                         
block_priv));
   else
     block_priv = block;
   block_size = GNUNET_GNSRECORD_block_get_size (block);
   GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
   struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
-  job->block = GNUNET_malloc (block_size); // FIXME this does not need to be 
copied, can be freed by worker
-  memcpy (job->block, block, block_size);
+  job->block = block;
+  job->block_size = block_size;
+  job->block_priv = block_priv;
   job->zone = *key;
+  job->ma = ma;
   job->label = GNUNET_strdup (label);
+  job->expire_pub = expire_pub;
   GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
   GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
-  GNUNET_GNSRECORD_query_from_private_key (key,
-                                           label,
-                                           &query);
-  GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
-  GNUNET_STATISTICS_update (statistics,
-                            "DHT put operations initiated",
-                            1,
-                            GNUNET_NO);
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Storing %u public of %u record(s) for label `%s' in DHT with 
expiration `%s' under key %s\n",
-              rd_public_count,
-              rd_count,
-              label,
-              GNUNET_STRINGS_absolute_time_to_string (expire),
-              GNUNET_h2s (&query));
-  ret = GNUNET_DHT_put (dht_handle,
-                        &query,
-                        DHT_GNS_REPLICATION_LEVEL,
-                        GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
-                        GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
-                        block_size,
-                        block,
-                        expire_pub,
-                        &dht_put_monitor_continuation,
-                        ma);
-  refresh_block (block_priv);
-  if (block != block_priv)
-    GNUNET_free (block_priv);
-  GNUNET_free (block);
-  return ret;
 }
 
 /**
@@ -1277,41 +1240,26 @@ handle_monitor_event (void *cls,
                                         1);
     return;   /* nothing to do */
   }
-  ma = GNUNET_new (struct DhtPutActivity);
-  ma->start_date = GNUNET_TIME_absolute_get ();
-  ma->ph = perform_dht_put_monitor (zone,
-                                    label,
-                                    rd,
-                                    rd_count,
-                                    expire,
-                                    ma);
-  if (NULL == ma->ph)
+  if (dht_queue_length >= DHT_QUEUE_LIMIT)
   {
-    /* PUT failed, do not remember operation */
-    GNUNET_free (ma);
-    GNUNET_NAMESTORE_zone_monitor_next (zmon,
-                                        1);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "DHT PUT queue length exceeded (%u), aborting PUT\n",
+                DHT_QUEUE_LIMIT);
     return;
   }
+  ma = GNUNET_new (struct DhtPutActivity);
+  perform_dht_put_monitor (zone,
+                           label,
+                           rd,
+                           rd_count,
+                           expire,
+                           ma);
+  GNUNET_NAMESTORE_zone_monitor_next (zmon,
+                                      1);
   GNUNET_CONTAINER_DLL_insert_tail (ma_head,
                                     ma_tail,
                                     ma);
   ma_queue_length++;
-  if (ma_queue_length > DHT_QUEUE_LIMIT)
-  {
-    ma = ma_head;
-    GNUNET_CONTAINER_DLL_remove (ma_head,
-                                 ma_tail,
-                                 ma);
-    GNUNET_DHT_put_cancel (ma->ph);
-    ma_queue_length--;
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "DHT PUT unconfirmed after %s, aborting PUT\n",
-                GNUNET_STRINGS_relative_time_to_string (
-                  GNUNET_TIME_absolute_get_duration (ma->start_date),
-                  GNUNET_YES));
-    GNUNET_free (ma);
-  }
 }
 
 
@@ -1351,12 +1299,15 @@ sign_worker (void *)
     GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
     if (NULL != job)
     {
+      GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block);
+      if (job->block != job->block_priv)
+        GNUNET_GNSRECORD_block_sign (&job->zone, job->label, job->block_priv);
       GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
       GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job);
       GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
       job = NULL;
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Done, notifying main thread throug pipe!\n");
+                  "Done, notifying main thread through pipe!\n");
       GNUNET_DISK_file_write (fh, "!", 1);
     }
     else {

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]