gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet] branch master updated: NAMESTORE: Bulk insert API now properly


From: gnunet
Subject: [gnunet] branch master updated: NAMESTORE: Bulk insert API now properly handles message length restriction
Date: Thu, 06 Oct 2022 09:35:34 +0200

This is an automated email from the git hooks/post-receive script.

martin-schanzenbach pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new 1e4cb697e NAMESTORE: Bulk insert API now properly handles message 
length restriction
1e4cb697e is described below

commit 1e4cb697efcd7289da981eb44cca40bccc421c72
Author: Martin Schanzenbach <schanzen@gnunet.org>
AuthorDate: Thu Oct 6 16:35:26 2022 +0900

    NAMESTORE: Bulk insert API now properly handles message length restriction
---
 src/include/gnunet_namestore_service.h    |   2 +
 src/namestore/gnunet-service-namestore.c  |   4 +-
 src/namestore/namestore_api.c             |  25 +++++--
 src/namestore/perf_namestore_api_import.c | 110 ++++++++++++------------------
 src/namestore/plugin_rest_namestore.c     |  78 ++++++++++++++++-----
 5 files changed, 130 insertions(+), 89 deletions(-)

diff --git a/src/include/gnunet_namestore_service.h 
b/src/include/gnunet_namestore_service.h
index 6183fb8d0..998eb19d0 100644
--- a/src/include/gnunet_namestore_service.h
+++ b/src/include/gnunet_namestore_service.h
@@ -179,6 +179,7 @@ GNUNET_NAMESTORE_records_store (struct 
GNUNET_NAMESTORE_Handle *h,
  * @param rd_set_count the number of record sets
  * @param record_info the records to add containing @a rd_set_count records
  * @param cont continuation to call when done
+ * @param rds_sent set to how many record sets could actually be sent
  * @param cont_cls closure for @a cont
  * @return handle to abort the request
  */
@@ -188,6 +189,7 @@ GNUNET_NAMESTORE_records_store2 (
   const struct GNUNET_IDENTITY_PrivateKey *pkey,
   unsigned int rd_set_count,
   const struct GNUNET_NAMESTORE_RecordInfo *record_info,
+  unsigned int *rds_sent,
   GNUNET_NAMESTORE_ContinuationWithStatus cont,
   void *cont_cls);
 
diff --git a/src/namestore/gnunet-service-namestore.c 
b/src/namestore/gnunet-service-namestore.c
index 57818ac63..64d3ec51b 100644
--- a/src/namestore/gnunet-service-namestore.c
+++ b/src/namestore/gnunet-service-namestore.c
@@ -953,7 +953,7 @@ continue_store_activity (struct StoreActivity *sa,
         GNUNET_GNSRECORD_records_deserialize (rd_ser_len, rd_ser, rd_count,
                                               rd));
 
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Checking monitors watching for `%s'\n",
                   conv_name);
       for (struct ZoneMonitor *zm = sa->zm_pos; NULL != zm; zm = sa->zm_pos)
@@ -974,7 +974,7 @@ continue_store_activity (struct StoreActivity *sa,
             GNUNET_SCHEDULER_add_delayed (MONITOR_STALL_WARN_DELAY,
                                           &warn_monitor_slow,
                                           zm);
-          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                       "Monitor is blocking client for `%s'\n",
                       conv_name);
           GNUNET_free (conv_name);
diff --git a/src/namestore/namestore_api.c b/src/namestore/namestore_api.c
index 901ad9053..cf2247d18 100644
--- a/src/namestore/namestore_api.c
+++ b/src/namestore/namestore_api.c
@@ -1096,10 +1096,11 @@ GNUNET_NAMESTORE_records_store (
   void *cont_cls)
 {
   struct GNUNET_NAMESTORE_RecordInfo ri;
+  unsigned int rds_sent;
   ri.a_label = label;
   ri.a_rd_count = rd_count;
   ri.a_rd = (struct GNUNET_GNSRECORD_Data *) rd;
-  return GNUNET_NAMESTORE_records_store2 (h, pkey, 1, &ri,
+  return GNUNET_NAMESTORE_records_store2 (h, pkey, 1, &ri, &rds_sent,
                                           cont, cont_cls);
 }
 
@@ -1109,6 +1110,7 @@ GNUNET_NAMESTORE_records_store2 (
   const struct GNUNET_IDENTITY_PrivateKey *pkey,
   unsigned int rd_set_count,
   const struct GNUNET_NAMESTORE_RecordInfo *record_info,
+  unsigned int *rds_sent,
   GNUNET_NAMESTORE_ContinuationWithStatus cont,
   void *cont_cls)
 {
@@ -1127,7 +1129,9 @@ GNUNET_NAMESTORE_records_store2 (
   ssize_t sret;
   int i;
   size_t rd_set_len = 0;
+  size_t max_len = UINT16_MAX - sizeof (struct RecordStoreMessage);
 
+  *rds_sent = 0;
   for (i = 0; i < rd_set_count; i++)
   {
     label = record_info[i].a_label;
@@ -1137,21 +1141,30 @@ GNUNET_NAMESTORE_records_store2 (
     if (name_len > MAX_NAME_LEN)
     {
       GNUNET_break (0);
+      *rds_sent = 0;
       return NULL;
     }
     rd_ser_len[i] = GNUNET_GNSRECORD_records_get_size (rd_count, rd);
     if (rd_ser_len[i] < 0)
     {
       GNUNET_break (0);
+      *rds_sent = 0;
       return NULL;
     }
-    if (rd_ser_len[i] > UINT16_MAX)
+    if (rd_ser_len[i] > max_len)
     {
       GNUNET_break (0);
+      *rds_sent = 0;
       return NULL;
     }
+    if ((rd_set_len + sizeof (struct RecordSet) + name_len + rd_ser_len[i]) >
+        max_len)
+      break;
     rd_set_len += sizeof (struct RecordSet) + name_len + rd_ser_len[i];
   }
+  *rds_sent = i;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending %u of %u records!\n", *rds_sent, rd_count);
   rid = get_op_id (h);
   qe = GNUNET_new (struct GNUNET_NAMESTORE_QueueEntry);
   qe->h = h;
@@ -1164,11 +1177,13 @@ GNUNET_NAMESTORE_records_store2 (
   env = GNUNET_MQ_msg_extra (msg,
                              rd_set_len,
                              GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_STORE);
+  GNUNET_assert (NULL != msg);
+  GNUNET_assert (NULL != env);
   msg->gns_header.r_id = htonl (rid);
-  msg->rd_set_count = htons (rd_set_count);
+  msg->rd_set_count = htons ((uint16_t) (*rds_sent));
   msg->private_key = *pkey;
   rd_set = (struct RecordSet*) &msg[1];
-  for (int i = 0; i < rd_set_count; i++)
+  for (int i = 0; i < *rds_sent; i++)
   {
     label = record_info[i].a_label;
     rd = record_info[i].a_rd;
@@ -1193,7 +1208,7 @@ GNUNET_NAMESTORE_records_store2 (
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Sending NAMESTORE_RECORD_STORE message for name %u record sets\n",
-       rd_set_count);
+       *rds_sent);
   qe->timeout_task =
     GNUNET_SCHEDULER_add_delayed (NAMESTORE_DELAY_TOLERANCE, &warn_delay, qe);
   if (NULL == h->mq)
diff --git a/src/namestore/perf_namestore_api_import.c 
b/src/namestore/perf_namestore_api_import.c
index 369b29600..698a558b7 100644
--- a/src/namestore/perf_namestore_api_import.c
+++ b/src/namestore/perf_namestore_api_import.c
@@ -30,11 +30,7 @@
 
 #define TEST_RECORD_TYPE GNUNET_DNSPARSER_TYPE_TXT
 
-#define TEST_BATCH_COUNT 3
-
-#define TEST_BATCH_SIZE 500
-
-#define TEST_RECORD_COUNT TEST_BATCH_COUNT * TEST_BATCH_SIZE
+#define TEST_RECORD_COUNT 10000
 
 /**
  * A #BENCHMARK_SIZE of 1000 takes less than a minute on a reasonably
@@ -191,6 +187,10 @@ commit_cont (void *cls,
   GNUNET_SCHEDULER_shutdown ();
 }
 
+static void
+publish_records_bulk_tx (void *cls);
+
+
 static void
 put_cont_bulk_tx (void *cls,
                   int32_t success,
@@ -203,44 +203,29 @@ put_cont_bulk_tx (void *cls,
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
-  qe = GNUNET_NAMESTORE_transaction_commit (nsh, commit_cont, NULL);
-}
-
-
-static void
-publish_records_bulk_tx (void *cls);
-
-static void
-reput_cont_bulk_tx (void *cls,
-                    int32_t success,
-                    const char *emsg)
-{
-  (void) cls;
-  qe = NULL;
-  if (GNUNET_OK != success)
+  if (bulk_count == TEST_RECORD_COUNT)
   {
-    GNUNET_break (0);
-    GNUNET_SCHEDULER_shutdown ();
+    qe = GNUNET_NAMESTORE_transaction_commit (nsh, commit_cont, NULL);
     return;
   }
   t = GNUNET_SCHEDULER_add_now (&publish_records_bulk_tx, NULL);
-
-
 }
 
+
 static void
 publish_records_bulk_tx (void *cls)
 {
+  unsigned int sent_rds;
   t = NULL;
   qe = GNUNET_NAMESTORE_records_store2 (nsh,
                                         &privkey,
-                                        TEST_BATCH_SIZE,
-                                        &ri[bulk_count * TEST_BATCH_SIZE],
-                                        (bulk_count == TEST_BATCH_COUNT - 1) ? 
&put_cont_bulk_tx :
-                                        &reput_cont_bulk_tx,
+                                        TEST_RECORD_COUNT - bulk_count,
+                                        &ri[bulk_count],
+                                        &sent_rds,
+                                        &put_cont_bulk_tx,
                                         NULL);
-  bulk_count++;
-
+  bulk_count += sent_rds;
+  GNUNET_assert (sent_rds != 0);
 }
 
 
@@ -249,17 +234,21 @@ begin_cont (void *cls,
             int32_t success,
             const char *emsg)
 {
+  unsigned int sent_rds;
   qe = GNUNET_NAMESTORE_records_store2 (nsh,
                                         &privkey,
-                                        TEST_BATCH_SIZE,
-                                        &ri[bulk_count * TEST_BATCH_SIZE],
-                                        (bulk_count == TEST_BATCH_COUNT - 1) ? 
&put_cont_bulk_tx :
-                                        &reput_cont_bulk_tx,
+                                        TEST_RECORD_COUNT - bulk_count,
+                                        &ri[bulk_count],
+                                        &sent_rds,
+                                        &put_cont_bulk_tx,
                                         NULL);
-  bulk_count++;
-
+  bulk_count += sent_rds;
+  GNUNET_assert (sent_rds != 0);
 }
 
+static void
+publish_records_bulk (void *cls);
+
 static void
 put_cont_bulk (void *cls,
                int32_t success,
@@ -276,28 +265,19 @@ put_cont_bulk (void *cls,
     return;
   }
 
-  delay = GNUNET_TIME_absolute_get_duration (start);
-  fprintf (stdout,
-           "BULK: Publishing %u records took %s\n",
-           TEST_RECORD_COUNT,
-           GNUNET_STRINGS_relative_time_to_string (delay,
-                                                   GNUNET_YES));
-  start = GNUNET_TIME_absolute_get ();
-  bulk_count = 0;
-  qe = GNUNET_NAMESTORE_transaction_begin (nsh, begin_cont, NULL);
-
-}
-
-static void
-publish_records_bulk (void *cls);
-
-static void
-reput_cont_bulk (void *cls,
-                 int32_t success,
-                 const char *emsg)
-{
-  struct GNUNET_TIME_Relative delay;
-
+  if (bulk_count == TEST_RECORD_COUNT)
+  {
+    delay = GNUNET_TIME_absolute_get_duration (start);
+    fprintf (stdout,
+             "BULK: Publishing %u records took %s\n",
+             TEST_RECORD_COUNT,
+             GNUNET_STRINGS_relative_time_to_string (delay,
+                                                     GNUNET_YES));
+    start = GNUNET_TIME_absolute_get ();
+    bulk_count = 0;
+    qe = GNUNET_NAMESTORE_transaction_begin (nsh, begin_cont, NULL);
+    return;
+  }
   (void) cls;
   qe = NULL;
   if (GNUNET_OK != success)
@@ -307,23 +287,23 @@ reput_cont_bulk (void *cls,
     return;
   }
   t = GNUNET_SCHEDULER_add_now (&publish_records_bulk, NULL);
-
 }
 
-
 static void
 publish_records_bulk (void *cls)
 {
+  static unsigned int sent_rds = 0;
   (void) cls;
   t = NULL;
   qe = GNUNET_NAMESTORE_records_store2 (nsh,
                                         &privkey,
-                                        TEST_BATCH_SIZE,
-                                        &ri[bulk_count * TEST_BATCH_SIZE],
-                                        (bulk_count == TEST_BATCH_COUNT - 1) ? 
&put_cont_bulk :
-                                        &reput_cont_bulk,
+                                        TEST_RECORD_COUNT - bulk_count,
+                                        &ri[bulk_count],
+                                        &sent_rds,
+                                        &put_cont_bulk,
                                         NULL);
-  bulk_count++;
+  bulk_count += sent_rds;
+  GNUNET_assert (sent_rds != 0);
 }
 
 
diff --git a/src/namestore/plugin_rest_namestore.c 
b/src/namestore/plugin_rest_namestore.c
index a9b49e19a..7a5a70fca 100644
--- a/src/namestore/plugin_rest_namestore.c
+++ b/src/namestore/plugin_rest_namestore.c
@@ -200,6 +200,21 @@ struct RequestHandle
    */
   unsigned int rd_count;
 
+  /**
+   * RecordInfo array
+   */
+  struct GNUNET_NAMESTORE_RecordInfo *ri;
+
+  /**
+   * Size of record info
+   */
+  unsigned int rd_set_count;
+
+  /**
+   * Position of record info
+   */
+  unsigned int rd_set_pos;
+
   /**
    * NAMESTORE Operation
    */
@@ -768,7 +783,7 @@ bulk_tx_commit_cb (void *cls, int32_t success, const char 
*emsg)
  * @param emsg the error message (can be NULL)
  */
 static void
-import_finished_cb (void *cls, int32_t success, const char *emsg)
+import_next_cb (void *cls, int32_t success, const char *emsg)
 {
   struct RequestHandle *handle = cls;
 
@@ -787,9 +802,32 @@ import_finished_cb (void *cls, int32_t success, const char 
*emsg)
     GNUNET_SCHEDULER_add_now (&do_error, handle);
     return;
   }
-  handle->ns_qe = GNUNET_NAMESTORE_transaction_commit (handle->nc,
-                                                       &bulk_tx_commit_cb,
-                                                       handle);
+  unsigned int remaining = handle->rd_set_count - handle->rd_set_pos;
+  if (0 == remaining)
+  {
+    handle->ns_qe = GNUNET_NAMESTORE_transaction_commit (handle->nc,
+                                                         &bulk_tx_commit_cb,
+                                                         handle);
+    return;
+  }
+  unsigned int sent_rds = 0;
+  // Find the smallest set of records we can send with our message size
+  // restriction of 16 bit
+  handle->ns_qe = GNUNET_NAMESTORE_records_store2 (handle->nc,
+                                                   handle->zone_pkey,
+                                                   remaining,
+                                                   &handle->ri[handle->
+                                                               rd_set_pos],
+                                                   &sent_rds,
+                                                   &import_next_cb,
+                                                   handle);
+  if ((NULL == handle->ns_qe) && (0 == sent_rds))
+  {
+    handle->emsg = GNUNET_strdup (GNUNET_REST_NAMESTORE_FAILED);
+    GNUNET_SCHEDULER_add_now (&do_error, handle);
+    return;
+  }
+  handle->rd_set_pos += sent_rds;
 }
 
 static void
@@ -827,11 +865,11 @@ bulk_tx_start (void *cls, int32_t success, const char 
*emsg)
     json_decref (data_js);
     return;
   }
-  size_t rd_set_count = json_array_size (data_js);
-  struct GNUNET_NAMESTORE_RecordInfo ri[rd_set_count];
+  handle->rd_set_count = json_array_size (data_js);
+  handle->ri = GNUNET_malloc (handle->rd_set_count
+                              * sizeof (struct GNUNET_NAMESTORE_RecordInfo));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Got record set of size %d\n", rd_set_count);
-  const struct GNUNET_GNSRECORD_Data *a_rd[rd_set_count];
+              "Got record set of size %d\n", handle->rd_set_count);
   char *albl;
   size_t index;
   json_t *value;
@@ -839,7 +877,8 @@ bulk_tx_start (void *cls, int32_t success, const char *emsg)
     {
       struct GNUNET_GNSRECORD_Data *rd;
       struct GNUNET_JSON_Specification gnsspec[] =
-      { GNUNET_GNSRECORD_JSON_spec_gnsrecord (&rd, &ri[index].a_rd_count,
+      { GNUNET_GNSRECORD_JSON_spec_gnsrecord (&rd,
+                                              &handle->ri[index].a_rd_count,
                                               &albl),
         GNUNET_JSON_spec_end () };
       if (GNUNET_OK != GNUNET_JSON_parse (value, gnsspec, NULL, NULL))
@@ -849,30 +888,35 @@ bulk_tx_start (void *cls, int32_t success, const char 
*emsg)
         json_decref (data_js);
         return;
       }
-      ri[index].a_rd = rd;
-      ri[index].a_label = albl;
+      handle->ri[index].a_rd = rd;
+      handle->ri[index].a_label = albl;
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Parsed record set for name %s\n", ri[index].a_label);
+                  "Parsed record set for name %s\n",
+                  handle->ri[index].a_label);
     }
   }
   // json_decref (data_js);
 
+  unsigned int sent_rds = 0;
+  // Find the smallest set of records we can send with our message size
+  // restriction of 16 bit
   handle->ns_qe = GNUNET_NAMESTORE_records_store2 (handle->nc,
                                                    handle->zone_pkey,
-                                                   rd_set_count,
-                                                   ri,
-                                                   &import_finished_cb,
+                                                   handle->rd_set_count,
+                                                   handle->ri,
+                                                   &sent_rds,
+                                                   &import_next_cb,
                                                    handle);
-  if (NULL == handle->ns_qe)
+  if ((NULL == handle->ns_qe) && (0 == sent_rds))
   {
     handle->emsg = GNUNET_strdup (GNUNET_REST_NAMESTORE_FAILED);
     GNUNET_SCHEDULER_add_now (&do_error, handle);
     return;
   }
+  handle->rd_set_pos += sent_rds;
 }
 
 
-
 /**
  * Handle namestore POST import
  *

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]