gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated: ZONEMASTER: Use parallel worker thread f


From: gnunet
Subject: [gnunet] branch master updated: ZONEMASTER: Use parallel worker thread for GNS block signing
Date: Thu, 20 Oct 2022 10:01:55 +0200

This is an automated email from the git hooks/post-receive script.

martin-schanzenbach pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 64aefd7b6 ZONEMASTER: Use parallel worker thread for GNS block signing
64aefd7b6 is described below

commit 64aefd7b6fb27b8625af12783201f3c87da41f47
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Thu Oct 20 17:01:48 2022 +0900

    ZONEMASTER: Use parallel worker thread for GNS block signing
---
 src/gnsrecord/gnsrecord_crypto.c           | 233 +++++++++++++++------
 src/include/gnunet_gnsrecord_lib.h         |  40 +++-
 src/zonemaster/gnunet-service-zonemaster.c | 311 ++++++++++++++++++++++++-----
 src/zonemaster/zonemaster.conf.in          |   1 +
 4 files changed, 472 insertions(+), 113 deletions(-)

diff --git a/src/gnsrecord/gnsrecord_crypto.c b/src/gnsrecord/gnsrecord_crypto.c
index 6c1bc6045..d794c9cb4 100644
--- a/src/gnsrecord/gnsrecord_crypto.c
+++ b/src/gnsrecord/gnsrecord_crypto.c
@@ -95,7 +95,8 @@ eddsa_symmetric_decrypt (
   if (ctlen < 0)
     return GNUNET_SYSERR;
   if (0 != crypto_secretbox_open_detached (result,
-                                           ((unsigned char*) block) + 
crypto_secretbox_MACBYTES, // Ciphertext
+                                           ((unsigned char*) block)
+                                           + crypto_secretbox_MACBYTES,        
                  // Ciphertext
                                            block, // Tag
                                            ctlen,
                                            nonce, key))
@@ -193,6 +194,116 @@ block_get_size_ecdsa (const struct GNUNET_GNSRECORD_Data 
*rd,
   return len;
 }
 
+enum GNUNET_GenericReturnValue
+block_sign_ecdsa (const struct
+                  GNUNET_CRYPTO_EcdsaPrivateKey *key,
+                  const struct
+                  GNUNET_CRYPTO_EcdsaPublicKey *pkey,
+                  const char *label,
+                  struct GNUNET_GNSRECORD_Block *block)
+{
+  struct GNRBlockPS *gnr_block;
+  struct GNUNET_GNSRECORD_EcdsaBlock *ecblock;
+  size_t size = ntohl (block->size) - sizeof (*block) + sizeof (*gnr_block);
+
+  gnr_block = GNUNET_malloc (size);
+  ecblock = &(block)->ecdsa_block;
+  gnr_block->purpose.size = htonl (size);
+  gnr_block->purpose.purpose =
+    htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
+  gnr_block->expiration_time = ecblock->expiration_time;
+  /* encrypt and sign */
+  GNUNET_memcpy (&gnr_block[1], &ecblock[1],
+                 size - sizeof (*gnr_block));
+  GNUNET_CRYPTO_ecdsa_public_key_derive (pkey,
+                                         label,
+                                         "gns",
+                                         &ecblock->derived_key);
+  if (GNUNET_OK !=
+      GNUNET_CRYPTO_ecdsa_sign_derived (key,
+                                        label,
+                                        "gns",
+                                        &gnr_block->purpose,
+                                        &ecblock->signature))
+  {
+    GNUNET_break (0);
+    GNUNET_free (gnr_block);
+    return GNUNET_SYSERR;
+  }
+  GNUNET_free (gnr_block);
+  return GNUNET_OK;
+}
+
+
+enum GNUNET_GenericReturnValue
+block_sign_eddsa (const struct
+                  GNUNET_CRYPTO_EddsaPrivateKey *key,
+                  const struct
+                  GNUNET_CRYPTO_EddsaPublicKey *pkey,
+                  const char *label,
+                  struct GNUNET_GNSRECORD_Block *block)
+{
+  struct GNRBlockPS *gnr_block;
+  struct GNUNET_GNSRECORD_EddsaBlock *edblock;
+  size_t size = ntohl (block->size) - sizeof (*block) + sizeof (*gnr_block);
+  gnr_block = GNUNET_malloc (size);
+  edblock = &(block)->eddsa_block;
+  gnr_block->purpose.size = htonl (size);
+  gnr_block->purpose.purpose =
+    htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
+  gnr_block->expiration_time = edblock->expiration_time;
+  GNUNET_memcpy (&gnr_block[1], &edblock[1],
+                 size - sizeof (*gnr_block));
+  /* encrypt and sign */
+  GNUNET_CRYPTO_eddsa_public_key_derive (pkey,
+                                         label,
+                                         "gns",
+                                         &edblock->derived_key);
+  GNUNET_CRYPTO_eddsa_sign_derived (key,
+                                    label,
+                                    "gns",
+                                    &gnr_block->purpose,
+                                    &edblock->signature);
+  GNUNET_free (gnr_block);
+  return GNUNET_OK;
+}
+
+
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_sign (const struct
+                             GNUNET_IDENTITY_PrivateKey *key,
+                             const char *label,
+                             struct GNUNET_GNSRECORD_Block *block)
+{
+  struct GNUNET_IDENTITY_PublicKey pkey;
+  enum GNUNET_GenericReturnValue res = GNUNET_SYSERR;
+  char *norm_label;
+
+  GNUNET_IDENTITY_key_get_public (key,
+                                  &pkey);
+  norm_label = GNUNET_GNSRECORD_string_normalize (label);
+
+  switch (ntohl (key->type))
+  {
+  case GNUNET_GNSRECORD_TYPE_PKEY:
+    res = block_sign_ecdsa (&key->ecdsa_key,
+                            &pkey.ecdsa_key,
+                            norm_label,
+                            block);
+    break;
+  case GNUNET_GNSRECORD_TYPE_EDKEY:
+    res = block_sign_eddsa (&key->eddsa_key,
+                            &pkey.eddsa_key,
+                            norm_label,
+                            block);
+    break;
+  default:
+    GNUNET_assert (0);
+  }
+  GNUNET_free (norm_label);
+  return res;
+}
+
 
 /**
  * Sign name and records
@@ -204,6 +315,7 @@ block_get_size_ecdsa (const struct GNUNET_GNSRECORD_Data 
*rd,
  * @param rd record data
  * @param rd_count number of records
  * @param block the block result. Must be allocated sufficiently.
+ * @param sign sign the block GNUNET_NO if block will be signed later.
  * @return GNUNET_SYSERR on error (otherwise GNUNET_OK)
  */
 static enum GNUNET_GenericReturnValue
@@ -213,12 +325,12 @@ block_create_ecdsa (const struct 
GNUNET_CRYPTO_EcdsaPrivateKey *key,
                     const char *label,
                     const struct GNUNET_GNSRECORD_Data *rd,
                     unsigned int rd_count,
-                    struct GNUNET_GNSRECORD_Block **block)
+                    struct GNUNET_GNSRECORD_Block **block,
+                    int sign)
 {
   ssize_t payload_len = GNUNET_GNSRECORD_records_get_size (rd_count,
                                                            rd);
   struct GNUNET_GNSRECORD_EcdsaBlock *ecblock;
-  struct GNRBlockPS *gnr_block;
   unsigned char ctr[GNUNET_CRYPTO_AES_KEY_LENGTH / 2];
   unsigned char skey[GNUNET_CRYPTO_AES_KEY_LENGTH];
   struct GNUNET_GNSRECORD_Data rdc[GNUNET_NZL (rd_count)];
@@ -251,7 +363,7 @@ block_create_ecdsa (const struct 
GNUNET_CRYPTO_EcdsaPrivateKey *key,
   }
   /* serialize */
   *block = GNUNET_malloc (sizeof (struct GNUNET_GNSRECORD_Block) + 
payload_len);
-  (*block)->size = htonl(sizeof (struct GNUNET_GNSRECORD_Block) + payload_len);
+  (*block)->size = htonl (sizeof (struct GNUNET_GNSRECORD_Block) + 
payload_len);
   {
     char payload[payload_len];
 
@@ -260,19 +372,9 @@ block_create_ecdsa (const struct 
GNUNET_CRYPTO_EcdsaPrivateKey *key,
                                                        rdc,
                                                        payload_len,
                                                        payload));
-    gnr_block = GNUNET_malloc (sizeof (struct GNRBlockPS) + payload_len);
     ecblock = &(*block)->ecdsa_block;
     (*block)->type = htonl (GNUNET_GNSRECORD_TYPE_PKEY);
-    gnr_block->purpose.size = htonl (sizeof(struct GNRBlockPS) + payload_len);
-    gnr_block->purpose.purpose =
-      htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
-    gnr_block->expiration_time = GNUNET_TIME_absolute_hton (expire);
-    ecblock->expiration_time = gnr_block->expiration_time;
-    /* encrypt and sign */
-    GNUNET_CRYPTO_ecdsa_public_key_derive (pkey,
-                                           label,
-                                           "gns",
-                                           &ecblock->derived_key);
+    ecblock->expiration_time = GNUNET_TIME_absolute_hton (expire);
     GNR_derive_block_aes_key (ctr,
                               skey,
                               label,
@@ -284,21 +386,16 @@ block_create_ecdsa (const struct 
GNUNET_CRYPTO_EcdsaPrivateKey *key,
                                             skey,
                                             ctr,
                                             &ecblock[1]));
-    GNUNET_memcpy (&gnr_block[1], &ecblock[1], payload_len);
   }
+  if (GNUNET_YES != sign)
+    return GNUNET_OK;
   if (GNUNET_OK !=
-      GNUNET_CRYPTO_ecdsa_sign_derived (key,
-                                        label,
-                                        "gns",
-                                        &gnr_block->purpose,
-                                        &ecblock->signature))
+      block_sign_ecdsa (key, pkey, label, *block))
   {
     GNUNET_break (0);
     GNUNET_free (*block);
-    GNUNET_free (gnr_block);
     return GNUNET_SYSERR;
   }
-  GNUNET_free (gnr_block);
   return GNUNET_OK;
 }
 
@@ -327,6 +424,7 @@ block_get_size_eddsa (const struct GNUNET_GNSRECORD_Data 
*rd,
  * @param rd record data
  * @param rd_count number of records
  * @param block where to store the block. Must be allocated sufficiently.
+ * @param sign GNUNET_YES if block shall be signed as well
  * @return GNUNET_SYSERR on error (otherwise GNUNET_OK)
  */
 enum GNUNET_GenericReturnValue
@@ -336,12 +434,12 @@ block_create_eddsa (const struct 
GNUNET_CRYPTO_EddsaPrivateKey *key,
                     const char *label,
                     const struct GNUNET_GNSRECORD_Data *rd,
                     unsigned int rd_count,
-                    struct GNUNET_GNSRECORD_Block **block)
+                    struct GNUNET_GNSRECORD_Block **block,
+                    int sign)
 {
   ssize_t payload_len = GNUNET_GNSRECORD_records_get_size (rd_count,
                                                            rd);
   struct GNUNET_GNSRECORD_EddsaBlock *edblock;
-  struct GNRBlockPS *gnr_block;
   unsigned char nonce[crypto_secretbox_NONCEBYTES];
   unsigned char skey[crypto_secretbox_KEYBYTES];
   struct GNUNET_GNSRECORD_Data rdc[GNUNET_NZL (rd_count)];
@@ -375,8 +473,8 @@ block_create_eddsa (const struct 
GNUNET_CRYPTO_EddsaPrivateKey *key,
   /* serialize */
   *block = GNUNET_malloc (sizeof (struct GNUNET_GNSRECORD_Block)
                           + payload_len + crypto_secretbox_MACBYTES);
-  (*block)->size = htonl(sizeof (struct GNUNET_GNSRECORD_Block)
-                 + payload_len + crypto_secretbox_MACBYTES);
+  (*block)->size = htonl (sizeof (struct GNUNET_GNSRECORD_Block)
+                          + payload_len + crypto_secretbox_MACBYTES);
   {
     char payload[payload_len];
 
@@ -385,24 +483,9 @@ block_create_eddsa (const struct 
GNUNET_CRYPTO_EddsaPrivateKey *key,
                                                        rdc,
                                                        payload_len,
                                                        payload));
-    gnr_block = GNUNET_malloc (sizeof (struct GNRBlockPS)
-                               + payload_len
-                               + crypto_secretbox_MACBYTES);
     edblock = &(*block)->eddsa_block;
     (*block)->type = htonl (GNUNET_GNSRECORD_TYPE_EDKEY);
-    gnr_block->purpose.size =
-      htonl (sizeof(struct GNRBlockPS)
-             + payload_len
-             + crypto_secretbox_MACBYTES);
-    gnr_block->purpose.purpose =
-      htonl (GNUNET_SIGNATURE_PURPOSE_GNS_RECORD_SIGN);
-    gnr_block->expiration_time = GNUNET_TIME_absolute_hton (expire);
-    edblock->expiration_time = gnr_block->expiration_time;
-    /* encrypt and sign */
-    GNUNET_CRYPTO_eddsa_public_key_derive (pkey,
-                                           label,
-                                           "gns",
-                                           &edblock->derived_key);
+    edblock->expiration_time = GNUNET_TIME_absolute_hton (expire);
     GNR_derive_block_xsalsa_key (nonce,
                                  skey,
                                  label,
@@ -414,14 +497,9 @@ block_create_eddsa (const struct 
GNUNET_CRYPTO_EddsaPrivateKey *key,
                                             skey,
                                             nonce,
                                             &edblock[1]));
-    GNUNET_memcpy (&gnr_block[1], &edblock[1],
-                   payload_len + crypto_secretbox_MACBYTES);
-
-    GNUNET_CRYPTO_eddsa_sign_derived (key,
-                                      label,
-                                      "gns",
-                                      &gnr_block->purpose,
-                                      &edblock->signature);
+    if (GNUNET_YES != sign)
+      return GNUNET_OK;
+    block_sign_eddsa (key, pkey, label, *block);
   }
   return GNUNET_OK;
 }
@@ -477,7 +555,8 @@ GNUNET_GNSRECORD_block_create (const struct 
GNUNET_IDENTITY_PrivateKey *key,
                               norm_label,
                               rd,
                               rd_count,
-                              result);
+                              result,
+                              GNUNET_YES);
     break;
   case GNUNET_GNSRECORD_TYPE_EDKEY:
     res = block_create_eddsa (&key->eddsa_key,
@@ -486,7 +565,8 @@ GNUNET_GNSRECORD_block_create (const struct 
GNUNET_IDENTITY_PrivateKey *key,
                               norm_label,
                               rd,
                               rd_count,
-                              result);
+                              result,
+                              GNUNET_YES);
     break;
   default:
     GNUNET_assert (0);
@@ -513,13 +593,14 @@ struct KeyCacheLine
 };
 
 
-enum GNUNET_GenericReturnValue
-GNUNET_GNSRECORD_block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey,
-                                struct GNUNET_TIME_Absolute expire,
-                                const char *label,
-                                const struct GNUNET_GNSRECORD_Data *rd,
-                                unsigned int rd_count,
-                                struct GNUNET_GNSRECORD_Block **result)
+static enum GNUNET_GenericReturnValue
+block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey,
+               struct GNUNET_TIME_Absolute expire,
+               const char *label,
+               const struct GNUNET_GNSRECORD_Data *rd,
+               unsigned int rd_count,
+               struct GNUNET_GNSRECORD_Block **result,
+               int sign)
 {
   const struct GNUNET_CRYPTO_EcdsaPrivateKey *key;
   struct GNUNET_CRYPTO_EddsaPublicKey edpubkey;
@@ -552,7 +633,8 @@ GNUNET_GNSRECORD_block_create2 (const struct 
GNUNET_IDENTITY_PrivateKey *pkey,
                               norm_label,
                               rd,
                               rd_count,
-                              result);
+                              result,
+                              sign);
   }
   else if (GNUNET_IDENTITY_TYPE_EDDSA == ntohl (pkey->type))
   {
@@ -564,13 +646,40 @@ GNUNET_GNSRECORD_block_create2 (const struct 
GNUNET_IDENTITY_PrivateKey *pkey,
                               norm_label,
                               rd,
                               rd_count,
-                              result);
+                              result,
+                              sign);
   }
   GNUNET_free (norm_label);
   return res;
 }
 
 
+
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_create_unsigned (const struct
+                                        GNUNET_IDENTITY_PrivateKey *pkey,
+                                        struct GNUNET_TIME_Absolute expire,
+                                        const char *label,
+                                        const struct GNUNET_GNSRECORD_Data *rd,
+                                        unsigned int rd_count,
+                                        struct GNUNET_GNSRECORD_Block **result)
+{
+  return block_create2 (pkey, expire, label, rd, rd_count, result, GNUNET_NO);
+}
+
+
+
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_create2 (const struct GNUNET_IDENTITY_PrivateKey *pkey,
+                                struct GNUNET_TIME_Absolute expire,
+                                const char *label,
+                                const struct GNUNET_GNSRECORD_Data *rd,
+                                unsigned int rd_count,
+                                struct GNUNET_GNSRECORD_Block **result)
+{
+  return block_create2 (pkey, expire, label, rd, rd_count, result, GNUNET_YES);
+}
+
 /**
  * Check if a signature is valid.  This API is used by the GNS Block
  * to validate signatures received from the network.
diff --git a/src/include/gnunet_gnsrecord_lib.h 
b/src/include/gnunet_gnsrecord_lib.h
index 357f87587..85a42d459 100644
--- a/src/include/gnunet_gnsrecord_lib.h
+++ b/src/include/gnunet_gnsrecord_lib.h
@@ -150,7 +150,7 @@ enum GNUNET_GNSRECORD_Filter
    * Filter public records.
    * FIXME: Not implemented
    */
-  //GNUNET_NAMESTORE_FILTER_OMIT_PUBLIC = 4,
+  // GNUNET_NAMESTORE_FILTER_OMIT_PUBLIC = 4,
 };
 
 
@@ -554,6 +554,19 @@ GNUNET_GNSRECORD_block_calculate_size (const struct
                                        const struct GNUNET_GNSRECORD_Data *rd,
                                        unsigned int rd_count);
 
+/**
+ * Sign a block create with #GNUNET_GNSRECORD_block_create_unsigned
+ *
+ * @param key the private key
+ * @param label the label of the block
+ * @param block the unsigned block
+ * @return GNUNET_OK on success
+ */
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_sign (const struct
+                             GNUNET_IDENTITY_PrivateKey *key,
+                             const char *label,
+                             struct GNUNET_GNSRECORD_Block *block);
 
 /**
  * Sign name and records
@@ -575,6 +588,31 @@ GNUNET_GNSRECORD_block_create (const struct 
GNUNET_IDENTITY_PrivateKey *key,
                                struct GNUNET_GNSRECORD_Block **block);
 
 
+/**
+ * Create name and records but do not sign!
+ * Sign later with #GNUNET_GNSRECORD_block_sign().
+ * Cache derived public key (also keeps the
+ * private key in static memory, so do not use this function if
+ * keeping the private key in the process'es RAM is a major issue).
+ *
+ * @param key the private key
+ * @param expire block expiration
+ * @param label the name for the records
+ * @param rd record data
+ * @param rd_count number of records in @a rd
+ * @param result the block buffer. Will be allocated.
+ * @return GNUNET_OK on success.
+ */
+enum GNUNET_GenericReturnValue
+GNUNET_GNSRECORD_block_create_unsigned (const struct
+                                        GNUNET_IDENTITY_PrivateKey *key,
+                                        struct GNUNET_TIME_Absolute expire,
+                                        const char *label,
+                                        const struct GNUNET_GNSRECORD_Data *rd,
+                                        unsigned int rd_count,
+                                        struct GNUNET_GNSRECORD_Block 
**result);
+
+
 /**
  * Sign name and records, cache derived public key (also keeps the
  * private key in static memory, so do not use this function if
diff --git a/src/zonemaster/gnunet-service-zonemaster.c 
b/src/zonemaster/gnunet-service-zonemaster.c
index f5c1d781b..42b3abf91 100644
--- a/src/zonemaster/gnunet-service-zonemaster.c
+++ b/src/zonemaster/gnunet-service-zonemaster.c
@@ -96,6 +96,81 @@
  */
 #define DHT_GNS_REPLICATION_LEVEL 5
 
+/**
+ * Our workers
+ */
+static pthread_t * worker;
+
+/**
+ * Lock for the open jobs queue.
+ */
+static pthread_mutex_t jobs_lock;
+
+/**
+ * Lock for the finished results queue.
+ */
+static pthread_mutex_t results_lock;
+
+/**
+ * For threads to know we are shutting down
+ */
+static int in_shutdown = GNUNET_NO;
+
+/**
+ * Our notification pipe
+ */
+static struct GNUNET_DISK_PipeHandle *notification_pipe;
+
+/**
+ * Pipe read task
+ */
+static struct GNUNET_SCHEDULER_Task *pipe_read_task;
+
+struct OpenSignJob
+{
+
+  struct OpenSignJob *next;
+
+  struct OpenSignJob *prev;
+
+  struct GNUNET_IDENTITY_PrivateKey zone;
+
+  struct GNUNET_GNSRECORD_Block *block;
+
+  struct GNUNET_GNSRECORD_Block *block_priv;
+
+  struct DhtPutActivity *ma;
+
+  size_t block_size;
+
+  struct GNUNET_TIME_Absolute expire_pub;
+
+  char *label;
+
+};
+
+
+/**
+ * DLL
+ */
+static struct OpenSignJob *jobs_head;
+
+/**
+ * DLL
+ */
+static struct OpenSignJob *jobs_tail;
+
+/**
+ * DLL
+ */
+static struct OpenSignJob *results_head;
+
+/**
+ * DLL
+ */
+static struct OpenSignJob *results_tail;
+
+
 /**
  * Handle for DHT PUT activity triggered from the namestore monitor.
  */
@@ -319,8 +394,13 @@ shutdown_task (void *cls)
   struct CacheOperation *cop;
 
   (void) cls;
+  in_shutdown == GNUNET_YES;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Shutting down!\n");
+  if (NULL != notification_pipe)
+    GNUNET_DISK_pipe_close (notification_pipe);
+  if (NULL != pipe_read_task)
+    GNUNET_SCHEDULER_cancel (pipe_read_task);
   while (NULL != (cop = cop_head))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -332,7 +412,8 @@ shutdown_task (void *cls)
 
   while (NULL != (ma = it_head))
   {
-    GNUNET_DHT_put_cancel (ma->ph);
+    if (NULL != ma->ph)
+      GNUNET_DHT_put_cancel (ma->ph);
     dht_queue_length--;
     GNUNET_CONTAINER_DLL_remove (it_head,
                                  it_tail,
@@ -682,6 +763,16 @@ dht_put_continuation (void *cls)
   GNUNET_free (ma);
 }
 
+static void
+free_job (struct OpenSignJob *job)
+{
+  if (job->block != job->block_priv)
+    GNUNET_free (job->block_priv);
+  GNUNET_free (job->block);
+  if (NULL != job->label)
+    GNUNET_free (job->label);
+  GNUNET_free (job);
+}
 
 
 /**
@@ -760,35 +851,86 @@ perform_dht_put (const struct GNUNET_IDENTITY_PrivateKey 
*key,
   else
     block_priv = block;
   block_size = GNUNET_GNSRECORD_block_get_size (block);
-  GNUNET_GNSRECORD_query_from_private_key (key,
-                                           label,
+  GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
+  struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
+  job->block = GNUNET_malloc (block_size); // FIXME this does not need to be 
copied, can be freed by worker
+  memcpy (job->block, block, block_size);
+  job->block_size = block_size;
+  job->block_priv = block_priv;
+  job->zone = *key;
+  job->ma = ma;
+  job->label = GNUNET_strdup (label);
+  job->expire_pub = expire_pub;
+  GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
+  GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Storing %u record(s) for label `%s' in DHT with expiration 
`%s'\n",
+              rd_public_count,
+              label,
+              GNUNET_STRINGS_absolute_time_to_string (expire));
+  num_public_records++;
+}
+
+static void
+notification_pipe_cb (void *cls);
+
+static void
+initiate_put_from_pipe_trigger (void *cls)
+{
+  struct GNUNET_HashCode query;
+  struct OpenSignJob *job;
+
+  pipe_read_task = NULL;
+  GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
+  job = results_head;
+  if (NULL == job)
+  {
+    GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
+    const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
+      notification_pipe,
+      GNUNET_DISK_PIPE_END_READ);
+    pipe_read_task =
+      GNUNET_SCHEDULER_add_read_file (
+        GNUNET_TIME_UNIT_FOREVER_REL,
+        np_fh,
+        notification_pipe_cb,
+        NULL);
+    return;
+  }
+  GNUNET_CONTAINER_DLL_remove (results_head, results_tail, job);
+  GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
+  GNUNET_GNSRECORD_query_from_private_key (&job->zone,
+                                           job->label,
                                            &query);
   GNUNET_STATISTICS_update (statistics,
                             "DHT put operations initiated",
                             1,
                             GNUNET_NO);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Storing %u record(s) for label `%s' in DHT with expiration `%s' 
under key %s\n",
-              rd_public_count,
-              label,
-              GNUNET_STRINGS_absolute_time_to_string (expire),
+              "Storing record(s) for label `%s' in DHT under key %s\n",
+              job->label,
               GNUNET_h2s (&query));
-  num_public_records++;
-  ret = GNUNET_DHT_put (dht_handle,
-                        &query,
-                        DHT_GNS_REPLICATION_LEVEL,
-                        GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
-                        GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
-                        block_size,
-                        block,
-                        expire_pub,
-                        &dht_put_continuation,
-                        ma);
-  refresh_block (block_priv);
-  if (block != block_priv)
-    GNUNET_free (block_priv);
-  GNUNET_free (block);
-  return ret;
+  job->ma->ph = GNUNET_DHT_put (dht_handle,
+                                &query,
+                                DHT_GNS_REPLICATION_LEVEL,
+                                GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
+                                GNUNET_BLOCK_TYPE_GNS_NAMERECORD,
+                                job->block_size,
+                                job->block,
+                                job->expire_pub,
+                                &dht_put_continuation,
+                                job->ma);
+  if (NULL == job->ma->ph)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Could not perform DHT PUT, is the DHT running?\n");
+    GNUNET_free (job->ma);
+    free_job (job);
+    return;
+  }
+  refresh_block (job->block_priv);
+  free_job (job);
+  return;
 }
 
 
@@ -907,45 +1049,29 @@ put_gns_record (void *cls,
   /* We got a set of records to publish */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Starting DHT PUT\n");
-
-  ma = GNUNET_new (struct DhtPutActivity);
-  ma->start_date = GNUNET_TIME_absolute_get ();
-  ma->ph = perform_dht_put (key,
-                            label,
-                            rd,
-                            rd_count,
-                            expire,
-                            ma);
   put_cnt++;
   if (0 == put_cnt % DELTA_INTERVAL)
     update_velocity (DELTA_INTERVAL);
   check_zone_namestore_next ();
-  if (NULL == ma->ph)
+  if (dht_queue_length >= DHT_QUEUE_LIMIT)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Could not perform DHT PUT, is the DHT running?\n");
-    GNUNET_free (ma);
+                "DHT PUT queue length exceeded (%u), aborting PUT\n",
+                DHT_QUEUE_LIMIT);
     return;
   }
+
+  ma = GNUNET_new (struct DhtPutActivity);
+  perform_dht_put (key,
+                   label,
+                   rd,
+                   rd_count,
+                   expire,
+                   ma);
   dht_queue_length++;
   GNUNET_CONTAINER_DLL_insert_tail (it_head,
                                     it_tail,
                                     ma);
-  if (dht_queue_length > DHT_QUEUE_LIMIT)
-  {
-    ma = it_head;
-    GNUNET_CONTAINER_DLL_remove (it_head,
-                                 it_tail,
-                                 ma);
-    GNUNET_DHT_put_cancel (ma->ph);
-    dht_queue_length--;
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "DHT PUT unconfirmed after %s, aborting PUT\n",
-                GNUNET_STRINGS_relative_time_to_string (
-                  GNUNET_TIME_absolute_get_duration (ma->start_date),
-                  GNUNET_YES));
-    GNUNET_free (ma);
-  }
 }
 
 /**
@@ -1075,9 +1201,18 @@ perform_dht_put_monitor (const struct 
GNUNET_IDENTITY_PrivateKey *key,
   else
     block_priv = block;
   block_size = GNUNET_GNSRECORD_block_get_size (block);
+  GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
+  struct OpenSignJob *job = GNUNET_new (struct OpenSignJob);
+  job->block = GNUNET_malloc (block_size); // FIXME this does not need to be 
copied, can be freed by worker
+  memcpy (job->block, block, block_size);
+  job->zone = *key;
+  job->label = GNUNET_strdup (label);
+  GNUNET_CONTAINER_DLL_insert (jobs_head, jobs_tail, job);
+  GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
   GNUNET_GNSRECORD_query_from_private_key (key,
                                            label,
                                            &query);
+  GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
   GNUNET_STATISTICS_update (statistics,
                             "DHT put operations initiated",
                             1,
@@ -1196,6 +1331,48 @@ handle_monitor_error (void *cls)
                             GNUNET_NO);
 }
 
+static void*
+sign_worker (void *)
+{
+  struct OpenSignJob *job;
+  const struct GNUNET_DISK_FileHandle *fh;
+
+  fh = GNUNET_DISK_pipe_handle (notification_pipe, GNUNET_DISK_PIPE_END_WRITE);
+  while (GNUNET_YES != in_shutdown)
+  {
+    GNUNET_assert (0 == pthread_mutex_lock (&jobs_lock));
+    if (NULL != jobs_head)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Taking on Job for %s\n", jobs_head->label);
+      job = jobs_head;
+      GNUNET_CONTAINER_DLL_remove (jobs_head, jobs_tail, job);
+    }
+    GNUNET_assert (0 == pthread_mutex_unlock (&jobs_lock));
+    if (NULL != job)
+    {
+      GNUNET_assert (0 == pthread_mutex_lock (&results_lock));
+      GNUNET_CONTAINER_DLL_insert (results_head, results_tail, job);
+      GNUNET_assert (0 == pthread_mutex_unlock (&results_lock));
+      job = NULL;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Done, notifying main thread throug pipe!\n");
+      GNUNET_DISK_file_write (fh, "!", 1);
+    }
+    else {
+      sleep (1);
+    }
+  }
+  return NULL;
+}
+
+static void
+notification_pipe_cb (void *cls)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Received wake up notification through pipe, checking 
results\n");
+  GNUNET_SCHEDULER_add_now (&initiate_put_from_pipe_trigger, NULL);
+}
 
 /**
  * Perform zonemaster duties: watch namestore, publish records.
@@ -1305,6 +1482,40 @@ run (void *cls,
 
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                  NULL);
+
+  notification_pipe = GNUNET_DISK_pipe (GNUNET_DISK_PF_NONE);
+  const struct GNUNET_DISK_FileHandle *np_fh = GNUNET_DISK_pipe_handle (
+    notification_pipe,
+    GNUNET_DISK_PIPE_END_READ);
+  pipe_read_task = GNUNET_SCHEDULER_add_read_file 
(GNUNET_TIME_UNIT_FOREVER_REL,
+                                                   np_fh,
+                                                   notification_pipe_cb, NULL);
+
+  long long unsigned int worker_count = 1;
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_number (c,
+                                             "zonemaster",
+                                             "WORKER_COUNT",
+                                             &worker_count))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Number of workers not defined falling back to 1\n");
+  }
+  worker = GNUNET_malloc (sizeof (pthread_t) * worker_count);
+  /** Start worker */
+  for (int i = 0; i < worker_count; i++)
+  {
+    if (0 !=
+        pthread_create (&worker[i],
+                        NULL,
+                        &sign_worker,
+                        NULL))
+    {
+      GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
+                           "pthread_create");
+      GNUNET_SCHEDULER_shutdown ();
+    }
+  }
 }
 
 
diff --git a/src/zonemaster/zonemaster.conf.in 
b/src/zonemaster/zonemaster.conf.in
index 560239944..9c920c476 100644
--- a/src/zonemaster/zonemaster.conf.in
+++ b/src/zonemaster/zonemaster.conf.in
@@ -6,6 +6,7 @@ HOSTNAME = localhost
 BINARY = gnunet-service-zonemaster
 UNIXPATH = $GNUNET_USER_RUNTIME_DIR/gnunet-service-zonemaster.sock
 @JAVAPORT@PORT = 2123
+WORKER_COUNT = 10
 
 # Do we require users that want to access GNS to run this process
 # (usually not a good idea)

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]