gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: implement taler-exchange-transfe


From: gnunet
Subject: [taler-exchange] branch master updated: implement taler-exchange-transfer DB sharding logic
Date: Sun, 05 Sep 2021 15:26:02 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new ae8d481e implement taler-exchange-transfer DB sharding logic
ae8d481e is described below

commit ae8d481e1ce9f694a42619809d2c9b6e6acf3497
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sun Sep 5 15:25:46 2021 +0200

    implement taler-exchange-transfer DB sharding logic
---
 src/benchmark/benchmark.conf                |   2 +-
 src/exchange/exchange.conf                  |  11 +-
 src/exchange/taler-exchange-aggregator.c    |  17 +-
 src/exchange/taler-exchange-closer.c        |  16 +-
 src/exchange/taler-exchange-transfer.c      | 405 ++++++++++++++++++++++------
 src/exchange/taler-exchange-wirewatch.c     |   1 +
 src/exchangedb/plugin_exchangedb_postgres.c | 123 +++++++--
 src/exchangedb/test_exchangedb.c            |   6 +
 src/include/taler_exchangedb_plugin.h       |   8 +-
 src/testing/testing_api_cmd_exec_transfer.c |   3 +
 10 files changed, 468 insertions(+), 124 deletions(-)

diff --git a/src/benchmark/benchmark.conf b/src/benchmark/benchmark.conf
index 844106cf..c38981dd 100644
--- a/src/benchmark/benchmark.conf
+++ b/src/benchmark/benchmark.conf
@@ -24,7 +24,7 @@ DB = postgres
 # exchange (or the twister) is actually listening.
 BASE_URL = "http://localhost:8081/";
 
-AGGREGATOR_SHARD_SIZE = 268435456
+AGGREGATOR_SHARD_SIZE = 67108864
 #AGGREGATOR_SHARD_SIZE = 2147483648
 
 
diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf
index 68c1556d..4b7f5f5a 100644
--- a/src/exchange/exchange.conf
+++ b/src/exchange/exchange.conf
@@ -41,10 +41,17 @@ PORT = 8081
 BASE_URL = http://localhost:8081/
 
 
-# How long should the aggregator (and closer, and transfer)
-# sleep if it has nothing to do?
+# How long should the aggregator sleep if it has nothing to do?
 AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
 
+# How long should the transfer tool
+# sleep if it has nothing to do?
+TRANSFER_IDLE_SLEEP_INTERVAL = 60 s
+
+# How long should the closer tool
+# sleep if it has nothing to do?
+CLOSER_IDLE_SLEEP_INTERVAL = 60 s
+
 # Values of 0 or above 2^31 disable sharding, which
 # is a sane default for most use-cases.
 # When changing this value, you MUST stop all
diff --git a/src/exchange/taler-exchange-aggregator.c 
b/src/exchange/taler-exchange-aggregator.c
index 893fa79f..caa4528d 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -1034,9 +1034,22 @@ run_shard (void *cls)
                                          &s->shard_end);
   if (0 >= qs)
   {
+    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+    {
+      static struct GNUNET_TIME_Relative delay;
+
+      GNUNET_free (s);
+      delay = GNUNET_TIME_randomized_backoff (delay,
+                                              GNUNET_TIME_UNIT_SECONDS);
+      task = GNUNET_SCHEDULER_add_delayed (delay,
+                                           &run_shard,
+                                           NULL);
+      return;
+    }
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to begin shard!\n");
-    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
+                "Failed to begin shard (%d)!\n",
+                qs);
+    GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs);
     global_ret = EXIT_FAILURE;
     GNUNET_SCHEDULER_shutdown ();
     return;
diff --git a/src/exchange/taler-exchange-closer.c 
b/src/exchange/taler-exchange-closer.c
index 926c9355..19cc06c7 100644
--- a/src/exchange/taler-exchange-closer.c
+++ b/src/exchange/taler-exchange-closer.c
@@ -60,7 +60,7 @@ static struct GNUNET_SCHEDULER_Task *task;
 /**
  * How long should we sleep when idle before trying to find more work?
  */
-static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
+static struct GNUNET_TIME_Relative closer_idle_sleep_interval;
 
 /**
  * Value to return from main(). 0 on success, non-zero
@@ -112,8 +112,8 @@ shutdown_task (void *cls)
  *
  * @return #GNUNET_OK on success
  */
-static int
-parse_wirewatch_config (void)
+static enum GNUNET_GenericReturnValue
+parse_closer_config (void)
 {
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_string (cfg,
@@ -129,12 +129,12 @@ parse_wirewatch_config (void)
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_time (cfg,
                                            "exchange",
-                                           "AGGREGATOR_IDLE_SLEEP_INTERVAL",
-                                           &aggregator_idle_sleep_interval))
+                                           "CLOSER_IDLE_SLEEP_INTERVAL",
+                                           &closer_idle_sleep_interval))
   {
     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
                                "exchange",
-                               "AGGREGATOR_IDLE_SLEEP_INTERVAL");
+                               "CLOSER_IDLE_SLEEP_INTERVAL");
     return GNUNET_SYSERR;
   }
   if ( (GNUNET_OK !=
@@ -444,7 +444,7 @@ run_reserve_closures (void *cls)
     }
     else
     {
-      task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
+      task = GNUNET_SCHEDULER_add_delayed (closer_idle_sleep_interval,
                                            &run_reserve_closures,
                                            NULL);
     }
@@ -480,7 +480,7 @@ run (void *cls,
   (void) cfgfile;
 
   cfg = c;
-  if (GNUNET_OK != parse_wirewatch_config ())
+  if (GNUNET_OK != parse_closer_config ())
   {
     cfg = NULL;
     global_ret = EXIT_NOTCONFIGURED;
diff --git a/src/exchange/taler-exchange-transfer.c 
b/src/exchange/taler-exchange-transfer.c
index d6d44eb0..b93d1460 100644
--- a/src/exchange/taler-exchange-transfer.c
+++ b/src/exchange/taler-exchange-transfer.c
@@ -1,6 +1,6 @@
 /*
   This file is part of TALER
-  Copyright (C) 2016-2020 Taler Systems SA
+  Copyright (C) 2016-2021 Taler Systems SA
 
   TALER is free software; you can redistribute it and/or modify it under the
   terms of the GNU Affero General Public License as published by the Free 
Software
@@ -27,6 +27,46 @@
 #include "taler_json_lib.h"
 #include "taler_bank_service.h"
 
+/**
+ * What is the maximum batch size we use for credit history
+ * requests with the bank.  See `batch_size` below.
+ */
+#define MAXIMUM_BATCH_SIZE 1024
+
+
+/**
+ * Information about our work shard.
+ */
+struct Shard
+{
+
+  /**
+   * Time when we started to work on this shard.
+   */
+  struct GNUNET_TIME_Absolute shard_start_time;
+
+  /**
+   * Offset the shard begins at.
+   */
+  uint64_t shard_start;
+
+  /**
+   * Exclusive offset where the shard ends.
+   */
+  uint64_t shard_end;
+
+  /**
+   * Offset where our current batch begins.
+   */
+  uint64_t batch_start;
+
+  /**
+   * Highest row processed in the current batch.
+   */
+  uint64_t batch_end;
+
+};
+
 
 /**
  * Data we keep to #run_transfers().  There is at most
@@ -37,6 +77,18 @@
 struct WirePrepareData
 {
 
+  /**
+   * All transfers done in the same transaction
+   * are kept in a DLL.
+   */
+  struct WirePrepareData *next;
+
+  /**
+   * All transfers done in the same transaction
+   * are kept in a DLL.
+   */
+  struct WirePrepareData *prev;
+
   /**
    * Wire execution handle.
    */
@@ -71,10 +123,21 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
 static struct GNUNET_SCHEDULER_Task *task;
 
 /**
- * If we are currently executing a transfer, information about
- * the active transfer is here. Otherwise, this variable is NULL.
+ * If we are currently executing transfers, information about
+ * the active transfers is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd_head;
+
+/**
+ * If we are currently executing transfers, information about
+ * the active transfers is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd_tail;
+
+/**
+ * Information about our work shard.
  */
-static struct WirePrepareData *wpd;
+static struct Shard *shard;
 
 /**
  * Handle to the context for interacting with the bank / wire gateway.
@@ -86,11 +149,6 @@ static struct GNUNET_CURL_Context *ctx;
  */
 static struct GNUNET_CURL_RescheduleContext *rc;
 
-/**
- * How long should we sleep when idle before trying to find more work?
- */
-static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
-
 /**
  * Value to return from main(). 0 on success, non-zero on errors.
  */
@@ -101,6 +159,54 @@ static int global_ret;
  */
 static int test_mode;
 
+/**
+ * How long should we sleep when idle before trying to find more work?
+ * Also used for how long we wait to grab a shard before trying it again.
+ * The value should be set to a bit above the average time it takes to
+ * process a shard.
+ */
+static struct GNUNET_TIME_Relative transfer_idle_sleep_interval;
+
+/**
+ * How long did we take to finish the last shard?
+ */
+static struct GNUNET_TIME_Relative shard_delay;
+
+/**
+ * Modulus to apply to group shards.  The shard size must ultimately be a
+ * multiple of the batch size. Thus, if this is not a multiple of the
+ * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
+ */
+static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
+
+/**
+ * How many workers should we plan our scheduling with?
+ */
+static unsigned int max_workers = 16;
+
+
+/**
+ * Clean up all active bank interactions.
+ */
+static void
+cleanup_wpd (void)
+{
+  struct WirePrepareData *wpd;
+
+  while (NULL != (wpd = wpd_head))
+  {
+    GNUNET_CONTAINER_DLL_remove (wpd_head,
+                                 wpd_tail,
+                                 wpd);
+    if (NULL != wpd->eh)
+    {
+      TALER_BANK_transfer_cancel (wpd->eh);
+      wpd->eh = NULL;
+    }
+    GNUNET_free (wpd);
+  }
+}
+
 
 /**
  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
@@ -128,17 +234,9 @@ shutdown_task (void *cls)
     GNUNET_SCHEDULER_cancel (task);
     task = NULL;
   }
-  if (NULL != wpd)
-  {
-    if (NULL != wpd->eh)
-    {
-      TALER_BANK_transfer_cancel (wpd->eh);
-      wpd->eh = NULL;
-    }
-    db_plugin->rollback (db_plugin->cls);
-    GNUNET_free (wpd);
-    wpd = NULL;
-  }
+  cleanup_wpd ();
+  GNUNET_free (shard);
+  db_plugin->rollback (db_plugin->cls); /* just in case */
   TALER_EXCHANGEDB_plugin_unload (db_plugin);
   db_plugin = NULL;
   TALER_EXCHANGEDB_unload_accounts ();
@@ -151,18 +249,18 @@ shutdown_task (void *cls)
  *
  * @return #GNUNET_OK on success
  */
-static int
-parse_wirewatch_config (void)
+static enum GNUNET_GenericReturnValue
+parse_transfer_config (void)
 {
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_time (cfg,
                                            "exchange",
-                                           "AGGREGATOR_IDLE_SLEEP_INTERVAL",
-                                           &aggregator_idle_sleep_interval))
+                                           "TRANSFER_IDLE_SLEEP_INTERVAL",
+                                           &transfer_idle_sleep_interval))
   {
     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
                                "exchange",
-                               "AGGREGATOR_IDLE_SLEEP_INTERVAL");
+                               "TRANSFER_IDLE_SLEEP_INTERVAL");
     return GNUNET_SYSERR;
   }
   if (NULL ==
@@ -218,13 +316,22 @@ static void
 run_transfers (void *cls);
 
 
+/**
+ * Select shard to process.
+ *
+ * @param cls NULL
+ */
+static void
+select_shard (void *cls);
+
+
 /**
  * Function called with the result from the execute step.
  * On success, we mark the respective wire transfer as finished,
  * and in general we afterwards continue to #run_transfers(),
  * except for irrecoverable errors.
  *
- * @param cls NULL
+ * @param cls `struct WirePrepareData` we are working on
  * @param http_status_code #MHD_HTTP_OK on success
  * @param ec taler error code
  * @param row_id unique ID of the wire transfer in the bank's records
@@ -237,15 +344,18 @@ wire_confirm_cb (void *cls,
                  uint64_t row_id,
                  struct GNUNET_TIME_Absolute wire_timestamp)
 {
+  struct WirePrepareData *wpd = cls;
   enum GNUNET_DB_QueryStatus qs;
 
-  (void) cls;
   (void) row_id;
   (void) wire_timestamp;
   wpd->eh = NULL;
   switch (http_status_code)
   {
   case MHD_HTTP_OK:
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Wire transfer %llu completed successfully\n",
+                (unsigned long long) wpd->row_id);
     qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
                                                      wpd->row_id);
     /* continued below */
@@ -262,38 +372,43 @@ wire_confirm_cb (void *cls,
     break;
   default:
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Wire transaction failed: %u/%d\n",
+                "Wire transfer %llu failed: %u/%d\n",
+                (unsigned long long) wpd->row_id,
                 http_status_code,
                 ec);
     db_plugin->rollback (db_plugin->cls);
     global_ret = EXIT_FAILURE;
     GNUNET_SCHEDULER_shutdown ();
-    GNUNET_free (wpd);
-    wpd = NULL;
     return;
   }
-  if (0 >= qs)
+  shard->batch_end = GNUNET_MAX (wpd->row_id,
+                                 shard->batch_end);
+  switch (qs)
   {
-    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+  case GNUNET_DB_STATUS_SOFT_ERROR:
     db_plugin->rollback (db_plugin->cls);
-    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
-    {
-      /* try again */
-      GNUNET_assert (NULL == task);
-      task = GNUNET_SCHEDULER_add_now (&run_transfers,
-                                       NULL);
-    }
-    else
-    {
-      global_ret = EXIT_FAILURE;
-      GNUNET_SCHEDULER_shutdown ();
-    }
-    GNUNET_free (wpd);
-    wpd = NULL;
+    cleanup_wpd ();
+    GNUNET_assert (NULL == task);
+    task = GNUNET_SCHEDULER_add_now (&run_transfers,
+                                     NULL);
     return;
+  case GNUNET_DB_STATUS_HARD_ERROR:
+    db_plugin->rollback (db_plugin->cls);
+    cleanup_wpd ();
+    global_ret = EXIT_FAILURE;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+    GNUNET_CONTAINER_DLL_remove (wpd_head,
+                                 wpd_tail,
+                                 wpd);
+    GNUNET_free (wpd);
+    break;
   }
-  GNUNET_free (wpd);
-  wpd = NULL;
+  if (NULL != wpd_head)
+    return; /* wait for other queries to complete */
+  /* batch done */
   switch (commit_or_warn ())
   {
   case GNUNET_DB_STATUS_SOFT_ERROR:
@@ -308,8 +423,9 @@ wire_confirm_cb (void *cls,
     GNUNET_SCHEDULER_shutdown ();
     return;
   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+    shard->batch_start = shard->batch_end + 1;
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                "Wire transfer complete\n");
+                "Batch complete\n");
     /* continue with #run_transfers(), just to guard
        against the unlikely case that there are more. */
     GNUNET_assert (NULL == task);
@@ -343,6 +459,7 @@ wire_prepare_cb (void *cls,
                  size_t buf_size)
 {
   const struct TALER_EXCHANGEDB_AccountInfo *wa;
+  struct WirePrepareData *wpd;
 
   (void) cls;
   if ( (NULL == wire_method) ||
@@ -351,9 +468,14 @@ wire_prepare_cb (void *cls,
     GNUNET_break (0);
     db_plugin->rollback (db_plugin->cls);
     global_ret = EXIT_FAILURE;
-    goto cleanup;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
   }
+  wpd = GNUNET_new (struct WirePrepareData);
   wpd->row_id = rowid;
+  GNUNET_CONTAINER_DLL_insert (wpd_head,
+                               wpd_tail,
+                               wpd);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Starting wire transfer %llu\n",
               (unsigned long long) rowid);
@@ -365,7 +487,8 @@ wire_prepare_cb (void *cls,
     GNUNET_break (0);
     db_plugin->rollback (db_plugin->cls);
     global_ret = EXIT_NOTCONFIGURED;
-    goto cleanup;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
   }
   wa = wpd->wa;
   wpd->eh = TALER_BANK_transfer (ctx,
@@ -373,19 +496,15 @@ wire_prepare_cb (void *cls,
                                  buf,
                                  buf_size,
                                  &wire_confirm_cb,
-                                 NULL);
+                                 wpd);
   if (NULL == wpd->eh)
   {
     GNUNET_break (0); /* Irrecoverable */
     db_plugin->rollback (db_plugin->cls);
     global_ret = EXIT_FAILURE;
-    goto cleanup;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
   }
-  return;
-cleanup:
-  GNUNET_SCHEDULER_shutdown ();
-  GNUNET_free (wpd);
-  wpd = NULL;
 }
 
 
@@ -399,23 +518,55 @@ static void
 run_transfers (void *cls)
 {
   enum GNUNET_DB_QueryStatus qs;
+  int64_t limit;
 
   (void) cls;
   task = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Checking for pending wire transfers\n");
-  if (GNUNET_SYSERR ==
-      db_plugin->preflight (db_plugin->cls))
+  limit = shard->shard_end - shard->batch_start;
+  if (0 >= limit)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to obtain database connection!\n");
-    global_ret = EXIT_FAILURE;
-    GNUNET_SCHEDULER_shutdown ();
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Shard [%llu,%llu) completed\n",
+                (unsigned long long) shard->shard_start,
+                (unsigned long long) shard->batch_end);
+    qs = db_plugin->complete_shard (db_plugin->cls,
+                                    "transfer",
+                                    shard->shard_start,
+                                    shard->batch_end + 1);
+    switch (qs)
+    {
+    case GNUNET_DB_STATUS_HARD_ERROR:
+      GNUNET_break (0);
+      GNUNET_free (shard);
+      GNUNET_SCHEDULER_shutdown ();
+      return;
+    case GNUNET_DB_STATUS_SOFT_ERROR:
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "Got DB soft error for complete_shard. Rolling back.\n");
+      GNUNET_free (shard);
+      task = GNUNET_SCHEDULER_add_now (&select_shard,
+                                       NULL);
+      return;
+    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+      /* already existed, ok, let's just continue */
+      break;
+    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+      /* normal case */
+      break;
+    }
+    shard_delay = GNUNET_TIME_absolute_get_duration (shard->shard_start_time);
+    GNUNET_free (shard);
+    task = GNUNET_SCHEDULER_add_now (&select_shard,
+                                     NULL);
     return;
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Checking for %lld pending wire transfers [%llu-...)\n",
+              (long long) limit,
+              (unsigned long long) shard->batch_start);
   if (GNUNET_OK !=
-      db_plugin->start (db_plugin->cls,
-                        "aggregator run transfer"))
+      db_plugin->start_read_committed (db_plugin->cls,
+                                       "aggregator run transfer"))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Failed to start database transaction!\n");
@@ -423,30 +574,29 @@ run_transfers (void *cls)
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
-  wpd = GNUNET_new (struct WirePrepareData);
   qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
+                                         shard->batch_start,
+                                         limit,
                                          &wire_prepare_cb,
                                          NULL);
-  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
-    return;  /* continued via continuation set in #wire_prepare_cb() */
-  db_plugin->rollback (db_plugin->cls);
-  GNUNET_free (wpd);
-  wpd = NULL;
   switch (qs)
   {
   case GNUNET_DB_STATUS_HARD_ERROR:
+    db_plugin->rollback (db_plugin->cls);
     GNUNET_break (0);
     global_ret = EXIT_FAILURE;
     GNUNET_SCHEDULER_shutdown ();
     return;
   case GNUNET_DB_STATUS_SOFT_ERROR:
     /* try again */
+    db_plugin->rollback (db_plugin->cls);
     GNUNET_assert (NULL == task);
     task = GNUNET_SCHEDULER_add_now (&run_transfers,
                                      NULL);
     return;
   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     /* no more prepared wire transfers, go sleep a bit! */
+    db_plugin->rollback (db_plugin->cls);
     GNUNET_assert (NULL == task);
     if (GNUNET_YES == test_mode)
     {
@@ -458,15 +608,92 @@ run_transfers (void *cls)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "No more pending wire transfers, going idle\n");
-      task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
+      task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
                                            &run_transfers,
                                            NULL);
     }
     return;
+  default:
+    /* continued in wire_prepare_cb() */
+    return;
+  }
+}
+
+
+/**
+ * Select shard to process.
+ *
+ * @param cls NULL
+ */
+static void
+select_shard (void *cls)
+{
+  enum GNUNET_DB_QueryStatus qs;
+  struct GNUNET_TIME_Relative delay;
+  uint64_t start;
+  uint64_t end;
+
+  (void) cls;
+  task = NULL;
+  if (GNUNET_SYSERR ==
+      db_plugin->preflight (db_plugin->cls))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to obtain database connection!\n");
+    global_ret = EXIT_FAILURE;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  if (0 == max_workers)
+    delay = GNUNET_TIME_UNIT_ZERO;
+  else
+    delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
+      GNUNET_CRYPTO_QUALITY_WEAK,
+      4 * GNUNET_TIME_relative_max (
+        transfer_idle_sleep_interval,
+        GNUNET_TIME_relative_multiply (shard_delay,
+                                       max_workers)).rel_value_us);
+  qs = db_plugin->begin_shard (db_plugin->cls,
+                               "transfer",
+                               delay,
+                               shard_size,
+                               &start,
+                               &end);
+  switch (qs)
+  {
+  case GNUNET_DB_STATUS_HARD_ERROR:
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to obtain starting point for montoring from 
database!\n");
+    global_ret = EXIT_FAILURE;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  case GNUNET_DB_STATUS_SOFT_ERROR:
+    /* try again */
+    task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
+                                         &select_shard,
+                                         NULL);
+    return;
+  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+    GNUNET_break (0);
+    task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
+                                         &select_shard,
+                                         NULL);
+    return;
   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
-    /* should be impossible */
-    GNUNET_assert (0);
+    /* continued below */
+    break;
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Starting with shard [%llu,%llu)\n",
+              (unsigned long long) start,
+              (unsigned long long) end);
+  shard = GNUNET_new (struct Shard);
+  shard->shard_start_time = GNUNET_TIME_absolute_get ();
+  shard->shard_start = start;
+  shard->shard_end = end;
+  shard->batch_start = start;
+  task = GNUNET_SCHEDULER_add_now (&run_transfers,
+                                   NULL);
 }
 
 
@@ -489,7 +716,7 @@ run (void *cls,
   (void) cfgfile;
 
   cfg = c;
-  if (GNUNET_OK != parse_wirewatch_config ())
+  if (GNUNET_OK != parse_transfer_config ())
   {
     cfg = NULL;
     global_ret = EXIT_NOTCONFIGURED;
@@ -503,9 +730,17 @@ run (void *cls,
     GNUNET_break (0);
     return;
   }
-
+  if (GNUNET_SYSERR ==
+      db_plugin->preflight (db_plugin->cls))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to obtain database connection!\n");
+    global_ret = EXIT_FAILURE;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
   GNUNET_assert (NULL == task);
-  task = GNUNET_SCHEDULER_add_now (&run_transfers,
+  task = GNUNET_SCHEDULER_add_now (&select_shard,
                                    NULL);
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                  cls);
@@ -524,12 +759,22 @@ main (int argc,
       char *const *argv)
 {
   struct GNUNET_GETOPT_CommandLineOption options[] = {
+    GNUNET_GETOPT_option_uint ('S',
+                               "size",
+                               "SIZE",
+                               "Size to process per shard (default: 1024)",
+                               &shard_size),
     GNUNET_GETOPT_option_timetravel ('T',
                                      "timetravel"),
     GNUNET_GETOPT_option_flag ('t',
                                "test",
                                "run in test mode and exit when idle",
                                &test_mode),
+    GNUNET_GETOPT_option_uint ('w',
+                               "workers",
+                               "COUNT",
+                               "Plan work load with up to COUNT worker 
processes (default: 16)",
+                               &max_workers),
     GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
     GNUNET_GETOPT_OPTION_END
   };
diff --git a/src/exchange/taler-exchange-wirewatch.c 
b/src/exchange/taler-exchange-wirewatch.c
index fb1fde31..6e2cd1ee 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -752,6 +752,7 @@ main (int argc,
                                "COUNT",
                                "Plan work load with up to COUNT worker 
processes (default: 16)",
                                &max_workers),
+    GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
     GNUNET_GETOPT_OPTION_END
   };
   enum GNUNET_GenericReturnValue ret;
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c 
b/src/exchangedb/plugin_exchangedb_postgres.c
index d66370a2..817c1a18 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1188,11 +1188,12 @@ prepare_statements (struct PostgresClosure *pg)
                             ",type"
                             ",buf"
                             " FROM prewire"
-                            " WHERE finished=FALSE"
+                            " WHERE prewire_uuid >= $1"
+                            "   AND finished=FALSE"
                             "   AND failed=FALSE"
                             " ORDER BY prewire_uuid ASC"
-                            " LIMIT 1;",
-                            0),
+                            " LIMIT $2;",
+                            2),
     /* Used in #postgres_select_deposits_missing_wire */
     GNUNET_PQ_make_prepare ("deposits_get_overdue",
                             "SELECT"
@@ -6984,52 +6985,116 @@ postgres_wire_prepare_data_mark_failed (
 }
 
 
+/**
+ * Closure for #prewire_cb().
+ */
+struct PrewireContext
+{
+  /**
+   * Function to call on each result.
+   */
+  TALER_EXCHANGEDB_WirePreparationIterator cb;
+
+  /**
+   * Closure for @a cb.
+   */
+  void *cb_cls;
+
+  /**
+   * #GNUNET_OK if everything went fine.
+   */
+  enum GNUNET_GenericReturnValue status;
+};
+
+
+/**
+ * Invoke the callback for each result.
+ *
+ * @param cls a `struct MissingWireContext *`
+ * @param result SQL result
+ * @param num_results number of rows in @a result
+ */
+static void
+prewire_cb (void *cls,
+            PGresult *result,
+            unsigned int num_results)
+{
+  struct PrewireContext *pc = cls;
+
+  for (unsigned int i = 0; i < num_results; i++)
+  {
+    uint64_t prewire_uuid;
+    char *type;
+    void *buf = NULL;
+    size_t buf_size;
+    struct GNUNET_PQ_ResultSpec rs[] = {
+      GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
+                                    &prewire_uuid),
+      GNUNET_PQ_result_spec_string ("type",
+                                    &type),
+      GNUNET_PQ_result_spec_variable_size ("buf",
+                                           &buf,
+                                           &buf_size),
+      GNUNET_PQ_result_spec_end
+    };
+
+    if (GNUNET_OK !=
+        GNUNET_PQ_extract_result (result,
+                                  rs,
+                                  i))
+    {
+      GNUNET_break (0);
+      pc->status = GNUNET_SYSERR;
+      return;
+    }
+    pc->cb (pc->cb_cls,
+            prewire_uuid,
+            type,
+            buf,
+            buf_size);
+    GNUNET_PQ_cleanup_result (rs);
+  }
+}
+
+
 /**
  * Function called to get an unfinished wire transfer
  * preparation data. Fetches at most one item.
  *
  * @param cls closure
+ * @param start_row offset to query table at
+ * @param limit maximum number of results to return
  * @param cb function to call for ONE unfinished item
  * @param cb_cls closure for @a cb
  * @return transaction status code
  */
 static enum GNUNET_DB_QueryStatus
 postgres_wire_prepare_data_get (void *cls,
+                                uint64_t start_row,
+                                uint64_t limit,
                                 TALER_EXCHANGEDB_WirePreparationIterator cb,
                                 void *cb_cls)
 {
   struct PostgresClosure *pg = cls;
-  enum GNUNET_DB_QueryStatus qs;
   struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_uint64 (&start_row),
+    GNUNET_PQ_query_param_uint64 (&limit),
     GNUNET_PQ_query_param_end
   };
-  uint64_t prewire_uuid;
-  char *type;
-  void *buf = NULL;
-  size_t buf_size;
-  struct GNUNET_PQ_ResultSpec rs[] = {
-    GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
-                                  &prewire_uuid),
-    GNUNET_PQ_result_spec_string ("type",
-                                  &type),
-    GNUNET_PQ_result_spec_variable_size ("buf",
-                                         &buf,
-                                         &buf_size),
-    GNUNET_PQ_result_spec_end
+  struct PrewireContext pc = {
+    .cb = cb,
+    .cb_cls = cb_cls,
+    .status = GNUNET_OK
   };
+  enum GNUNET_DB_QueryStatus qs;
 
-  qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
-                                                 "wire_prepare_data_get",
-                                                 params,
-                                                 rs);
-  if (0 >= qs)
-    return qs;
-  cb (cb_cls,
-      prewire_uuid,
-      type,
-      buf,
-      buf_size);
-  GNUNET_PQ_cleanup_result (rs);
+  qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
+                                             "wire_prepare_data_get",
+                                             params,
+                                             &prewire_cb,
+                                             &pc);
+  if (GNUNET_OK != pc.status)
+    return GNUNET_DB_STATUS_HARD_ERROR;
   return qs;
 }
 
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index 8478fac0..b332cd6d 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -117,6 +117,8 @@ test_wire_prepare (void)
 {
   FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->wire_prepare_data_get (plugin->cls,
+                                         0,
+                                         1,
                                          &dead_prepare_cb,
                                          NULL));
   FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
@@ -126,10 +128,14 @@ test_wire_prepare (void)
                                             11));
   FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
           plugin->wire_prepare_data_get (plugin->cls,
+                                         0,
+                                         1,
                                          &mark_prepare_cb,
                                          NULL));
   FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->wire_prepare_data_get (plugin->cls,
+                                         0,
+                                         1,
                                          &dead_prepare_cb,
                                          NULL));
   return GNUNET_OK;
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index 163b886c..4037ebac 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -2964,15 +2964,19 @@ struct TALER_EXCHANGEDB_Plugin
 
   /**
    * Function called to get an unfinished wire transfer
-   * preparation data. Fetches at most one item.
+   * preparation data.
    *
    * @param cls closure
-   * @param cb function to call for ONE unfinished item
+   * @param start_row offset to query table at
+   * @param limit maximum number of results to return
+   * @param cb function to call for unfinished work
    * @param cb_cls closure for @a cb
    * @return transaction status code
    */
   enum GNUNET_DB_QueryStatus
   (*wire_prepare_data_get)(void *cls,
+                           uint64_t start_row,
+                           uint64_t limit,
                            TALER_EXCHANGEDB_WirePreparationIterator cb,
                            void *cb_cls);
 
diff --git a/src/testing/testing_api_cmd_exec_transfer.c 
b/src/testing/testing_api_cmd_exec_transfer.c
index 2db445be..796f32d0 100644
--- a/src/testing/testing_api_cmd_exec_transfer.c
+++ b/src/testing/testing_api_cmd_exec_transfer.c
@@ -66,6 +66,9 @@ transfer_run (void *cls,
                                "taler-exchange-transfer",
                                "taler-exchange-transfer",
                                "-c", as->config_filename,
+                               "-L", "INFO",
+                               "-S", "1",
+                               "-w", "0",
                                "-t", /* exit when done */
                                NULL);
   if (NULL == as->transfer_proc)

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]