gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] 05/08: -more work on wirewatch revision


From: gnunet
Subject: [taler-exchange] 05/08: -more work on wirewatch revision
Date: Sun, 20 Nov 2022 21:55:28 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

commit 30997afc7fc81f1fa6af85c754390209d0200a67
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Thu Nov 17 21:50:20 2022 +0100

    -more work on wirewatch revision
---
 src/exchange/taler-exchange-wirewatch.c | 814 ++++++++++++++------------------
 1 file changed, 358 insertions(+), 456 deletions(-)

diff --git a/src/exchange/taler-exchange-wirewatch.c 
b/src/exchange/taler-exchange-wirewatch.c
index d84344fc..0d902bf2 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -13,7 +13,6 @@
   You should have received a copy of the GNU Affero General Public License 
along with
   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
 */
-
 /**
  * @file taler-exchange-wirewatch.c
  * @brief Process that watches for wire transfers to the exchange's bank 
account
@@ -43,122 +42,88 @@
 #define MAXIMUM_BATCH_SIZE 1024
 
 /**
- * Information we keep for each supported account.
+ * Information about our account.
  */
-struct WireAccount
-{
-  /**
-   * Accounts are kept in a DLL.
-   */
-  struct WireAccount *next;
-
-  /**
-   * Plugins are kept in a DLL.
-   */
-  struct WireAccount *prev;
-
-  /**
-   * Information about this account.
-   */
-  const struct TALER_EXCHANGEDB_AccountInfo *ai;
-
-  /**
-   * Active request for history.
-   */
-  struct TALER_BANK_CreditHistoryHandle *hh;
-
-  /**
-   * Until when is processing this wire plugin delayed?
-   */
-  struct GNUNET_TIME_Absolute delayed_until;
-
-  /**
-   * Encoded offset in the wire transfer list from where
-   * to start the next query with the bank.
-   */
-  uint64_t batch_start;
-
-  /**
-   * Latest row offset seen in this transaction, becomes
-   * the new #batch_start upon commit.
-   */
-  uint64_t latest_row_off;
-
-  /**
-   * Maximum row offset this transaction may yield. If we got the
-   * maximum number of rows, we must not @e delay before running
-   * the next transaction.
-   */
-  uint64_t max_row_off;
-
-  /**
-   * Offset where our current shard begins (inclusive).
-   */
-  uint64_t shard_start;
-
-  /**
-   * Offset where our current shard ends (exclusive).
-   */
-  uint64_t shard_end;
-
-  /**
-   * When did we start with the shard?
-   */
-  struct GNUNET_TIME_Absolute shard_start_time;
-
-  /**
-   * For how long did we lock the shard?
-   */
-  struct GNUNET_TIME_Absolute shard_end_time;
-
-  /**
-   * How long did we take to finish the last shard
-   * for this account?
-   */
-  struct GNUNET_TIME_Relative shard_delay;
-
-  /**
-   * Name of our job in the shard table.
-   */
-  char *job_name;
-
-  /**
-   * How many transactions do we retrieve per batch?
-   */
-  unsigned int batch_size;
-
-  /**
-   * How much do we increment @e batch_size on success?
-   */
-  unsigned int batch_thresh;
-
-  /**
-   * Should we delay the next request to the wire plugin a bit?  Set to
-   * false if we actually did some work.
-   */
-  bool delay;
-
-  /**
-   * Did we start a transaction yet?
-   */
-  bool started_transaction;
-
-  /**
-   * Is this shard still open for processing.
-   */
-  bool shard_open;
-};
+static const struct TALER_EXCHANGEDB_AccountInfo *ai;
+
+/**
+ * Active request for history.
+ */
+static struct TALER_BANK_CreditHistoryHandle *hh;
+
+/**
+ * Until when is processing this wire plugin delayed?
+ */
+static struct GNUNET_TIME_Absolute delayed_until;
+
+/**
+ * Encoded offset in the wire transfer list from where
+ * to start the next query with the bank.
+ */
+static uint64_t batch_start;
+
+/**
+ * Latest row offset seen in this transaction, becomes
+ * the new #batch_start upon commit.
+ */
+static uint64_t latest_row_off;
+
+/**
+ * Offset where our current shard begins (inclusive).
+ */
+static uint64_t shard_start;
+
+/**
+ * Offset where our current shard ends (exclusive).
+ */
+static uint64_t shard_end;
+
+/**
+ * When did we start with the shard?
+ */
+static struct GNUNET_TIME_Absolute shard_start_time;
+
+/**
+ * For how long did we lock the shard?
+ */
+static struct GNUNET_TIME_Absolute shard_end_time;
+
+/**
+ * How long did we take to finish the last shard
+ * for this account?
+ */
+static struct GNUNET_TIME_Relative shard_delay;
+
+/**
+ * Name of our job in the shard table.
+ */
+static char *job_name;
+
+/**
+ * How many transactions do we retrieve per batch?
+ */
+static unsigned int batch_size;
 
+/**
+ * How much do we increment @e batch_size on success?
+ */
+static unsigned int batch_thresh;
 
 /**
- * Head of list of loaded wire plugins.
+ * Did work remain in the transaction queue? Set to true
+ * if we did some work and thus there might be more.
  */
-static struct WireAccount *wa_head;
+static bool progress;
 
 /**
- * Tail of list of loaded wire plugins.
+ * Did we start a transaction yet?
  */
-static struct WireAccount *wa_tail;
+static bool started_transaction;
+
+/**
+ * Is this shard still open for processing.
+ */
+static bool shard_open;
 
 /**
  * Handle to the context for interacting with the bank.
@@ -227,6 +192,10 @@ static int ignore_account_404;
  */
 static struct GNUNET_SCHEDULER_Task *task;
 
+/**
+ * Name of the configuration section with the account we should watch.
+ */
+static char *account_section;
 
 /**
  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
@@ -236,38 +205,27 @@ static struct GNUNET_SCHEDULER_Task *task;
 static void
 shutdown_task (void *cls)
 {
+  enum GNUNET_DB_QueryStatus qs;
   (void) cls;
-  {
-    struct WireAccount *wa;
 
-    while (NULL != (wa = wa_head))
-    {
-      enum GNUNET_DB_QueryStatus qs;
-
-      if (NULL != wa->hh)
-      {
-        TALER_BANK_credit_history_cancel (wa->hh);
-        wa->hh = NULL;
-      }
-      GNUNET_CONTAINER_DLL_remove (wa_head,
-                                   wa_tail,
-                                   wa);
-      if (wa->started_transaction)
-      {
-        db_plugin->rollback (db_plugin->cls);
-        wa->started_transaction = false;
-      }
-      qs = db_plugin->abort_shard (db_plugin->cls,
-                                   wa->job_name,
-                                   wa->shard_start,
-                                   wa->shard_end);
-      if (qs <= 0)
-        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                    "Failed to abort work shard on shutdown\n");
-      GNUNET_free (wa->job_name);
-      GNUNET_free (wa);
-    }
+  if (NULL != hh)
+  {
+    TALER_BANK_credit_history_cancel (hh);
+    hh = NULL;
+  }
+  if (started_transaction)
+  {
+    db_plugin->rollback (db_plugin->cls);
+    started_transaction = false;
   }
+  qs = db_plugin->abort_shard (db_plugin->cls,
+                               job_name,
+                               shard_start,
+                               shard_end);
+  if (qs <= 0)
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Failed to abort work shard on shutdown\n");
+  GNUNET_free (job_name);
   if (NULL != ctx)
   {
     GNUNET_CURL_fini (ctx);
@@ -295,28 +253,36 @@ shutdown_task (void *cls)
  * account to our list (if it is enabled and we can load the plugin).
  *
  * @param cls closure, NULL
- * @param ai account information
+ * @param in_ai account information
  */
 static void
 add_account_cb (void *cls,
-                const struct TALER_EXCHANGEDB_AccountInfo *ai)
+                const struct TALER_EXCHANGEDB_AccountInfo *in_ai)
 {
-  struct WireAccount *wa;
-
   (void) cls;
-  if (! ai->credit_enabled)
+  if (! in_ai->credit_enabled)
+    return; /* not enabled for us, skip */
+  if ( (NULL != account_section) &&
+       (0 != strcasecmp (ai->section_name,
+                         account_section)) )
     return; /* not enabled for us, skip */
-  wa = GNUNET_new (struct WireAccount);
-  wa->ai = ai;
-  GNUNET_asprintf (&wa->job_name,
+  if (NULL != ai)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Multiple accounts enabled (%s and %s), use '-a' command-line 
option to select one!\n",
+                ai->section_name,
+                in_ai->section_name);
+    GNUNET_SCHEDULER_shutdown ();
+    global_ret = EXIT_INVALIDARGUMENT;
+    return;
+  }
+  ai = in_ai;
+  GNUNET_asprintf (&job_name,
                    "wirewatch-%s",
                    ai->section_name);
-  wa->batch_size = MAXIMUM_BATCH_SIZE;
-  if (0 != shard_size % wa->batch_size)
-    wa->batch_size = shard_size;
-  GNUNET_CONTAINER_DLL_insert (wa_head,
-                               wa_tail,
-                               wa);
+  batch_size = MAXIMUM_BATCH_SIZE;
+  if (0 != shard_size % batch_size)
+    batch_size = shard_size;
 }
 
 
@@ -360,7 +326,16 @@ exchange_serve_process_config (void)
   }
   TALER_EXCHANGEDB_find_accounts (&add_account_cb,
                                   NULL);
-  GNUNET_assert (NULL != wa_head);
+  if (NULL == ai)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "No accounts enabled for credit!\n");
+    GNUNET_SCHEDULER_shutdown ();
+    global_ret = EXIT_INVALIDARGUMENT;
+    TALER_EXCHANGEDB_plugin_unload (db_plugin);
+    db_plugin = NULL;
+    return GNUNET_SYSERR;
+  }
   return GNUNET_OK;
 }
 
@@ -368,240 +343,111 @@ exchange_serve_process_config (void)
 /**
  * Lock a shard and then begin to query for incoming wire transfers.
  *
- * @param cls a `struct WireAccount` to operate on
+ * @param cls NULL
  */
 static void
 lock_shard (void *cls);
 
 
 /**
- * Continue with the credit history of the shard
- * reserved as @a wa.
+ * Continue with the credit history of the shard.
  *
- * @param[in,out] cls `struct WireAccount *` account with shard to continue 
processing
+ * @param cls NULL
  */
 static void
 continue_with_shard (void *cls);
 
 
 /**
- * We encountered a serialization error.
- * Rollback the transaction and try again
- *
- * @param wa account we are transacting on
+ * We encountered a serialization error.  Rollback the transaction and try
+ * again.
  */
 static void
-handle_soft_error (struct WireAccount *wa)
+handle_soft_error (void)
 {
   db_plugin->rollback (db_plugin->cls);
-  wa->started_transaction = false;
-  if (1 < wa->batch_size)
+  started_transaction = false;
+  if (1 < batch_size)
   {
-    wa->batch_thresh = wa->batch_size;
-    wa->batch_size /= 2;
+    batch_thresh = batch_size;
+    batch_size /= 2;
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Reduced batch size to %llu due to serialization issue\n",
-                (unsigned long long) wa->batch_size);
+                (unsigned long long) batch_size);
   }
   /* Reset to beginning of transaction, and go again
      from there. */
-  wa->latest_row_off = wa->batch_start;
+  latest_row_off = batch_start;
   GNUNET_assert (NULL == task);
   task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
-                                   wa);
+                                   NULL);
 }
 
 
 /**
- * Schedule the #lock_shard() operation for
- * @a wa. If @a wa is NULL, start with #wa_head.
- *
- * @param wa account to schedule #lock_shard() for,
- *        possibly NULL (!).
+ * Schedule the #lock_shard() operation.
  */
 static void
-schedule_transfers (struct WireAccount *wa)
+schedule_transfers (void)
 {
-  if (NULL == wa)
-  {
-    wa = wa_head;
-    GNUNET_assert (NULL != wa);
-  }
-  if (wa->shard_open)
+  if (shard_open)
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Will retry my shard (%llu,%llu] of %s in %s\n",
-                (unsigned long long) wa->shard_start,
-                (unsigned long long) wa->shard_end,
-                wa->job_name,
+                (unsigned long long) shard_start,
+                (unsigned long long) shard_end,
+                job_name,
                 GNUNET_STRINGS_relative_time_to_string (
-                  GNUNET_TIME_absolute_get_remaining (wa->delayed_until),
-                  GNUNET_YES));
+                  GNUNET_TIME_absolute_get_remaining (delayed_until),
+                  true));
   else
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Will try to lock next shard of %s in %s\n",
-                wa->job_name,
+                job_name,
                 GNUNET_STRINGS_relative_time_to_string (
-                  GNUNET_TIME_absolute_get_remaining (wa->delayed_until),
-                  GNUNET_YES));
+                  GNUNET_TIME_absolute_get_remaining (delayed_until),
+                  true));
   GNUNET_assert (NULL == task);
-  task = GNUNET_SCHEDULER_add_at (wa->delayed_until,
+  task = GNUNET_SCHEDULER_add_at (delayed_until,
                                   &lock_shard,
-                                  wa);
+                                  NULL);
 }
 
 
 /**
- * We are done with the work that is possible on @a wa right now (and the
- * transaction was committed, if there was one to commit). Move on to the next
- * account.
- *
- * @param wa wire account for which we completed a shard
+ * We are done with the work that is possible right now (and the transaction
+ * was committed, if there was one to commit). Move on to the next shard.
  */
 static void
-account_completed (struct WireAccount *wa)
+transaction_completed (void)
 {
-  GNUNET_assert (! wa->started_transaction);
-  if ( (wa->batch_start + wa->batch_size ==
-        wa->latest_row_off) &&
-       (wa->batch_size < MAXIMUM_BATCH_SIZE) )
+  GNUNET_assert (! started_transaction);
+  if ( (batch_start + batch_size ==
+        latest_row_off) &&
+       (batch_size < MAXIMUM_BATCH_SIZE) )
   {
     /* The current batch size worked without serialization
        issues, and we are allowed to grow. Do so slowly. */
     int delta;
 
-    delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
+    delta = ((int) batch_thresh - (int) batch_size) / 4;
     if (delta < 0)
       delta = -delta;
-    wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
-                                 wa->batch_size + delta + 1);
+    batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
+                             batch_size + delta + 1);
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Increasing batch size to %llu\n",
-                (unsigned long long) wa->batch_size);
-  }
-
-  if (wa->delay)
-  {
-    /* This account was finished, block this one for the
-       #wirewatch_idle_sleep_interval and move on to the next one. */
-    wa->delayed_until
-      = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
-    wa = wa->next;
+                (unsigned long long) batch_size);
   }
-  GNUNET_assert (NULL == task);
-  schedule_transfers (wa);
-}
-
 
-/**
- * Check if we are finished with the current shard.  If so, update the
- * database, marking the shard as finished.
- *
- * @param wa wire account to commit for
- * @return true if we were indeed done with the shard
- */
-static bool
-check_shard_done (struct WireAccount *wa)
-{
-  enum GNUNET_DB_QueryStatus qs;
-
-  if (wa->shard_end > wa->latest_row_off)
+  if ( (! progress) && test_mode)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                "Shard %s (%llu,%llu] at %llu\n",
-                wa->job_name,
-                (unsigned long long) wa->shard_start,
-                (unsigned long long) wa->shard_end,
-                (unsigned long long) wa->latest_row_off);
-    return false; /* actually, not done! */
-  }
-  /* shard is complete, mark this as well */
-  qs = db_plugin->complete_shard (db_plugin->cls,
-                                  wa->job_name,
-                                  wa->shard_start,
-                                  wa->shard_end);
-  switch (qs)
-  {
-  case GNUNET_DB_STATUS_HARD_ERROR:
-    GNUNET_break (0);
-    db_plugin->rollback (db_plugin->cls);
-    GNUNET_SCHEDULER_shutdown ();
-    return false;
-  case GNUNET_DB_STATUS_SOFT_ERROR:
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                "Got DB soft error for complete_shard. Rolling back.\n");
-    handle_soft_error (wa);
-    return false;
-  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
-    GNUNET_break (0);
-    /* Not expected, but let's just continue */
-    break;
-  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
-    /* normal case */
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                "Completed shard %s (%llu,%llu] after %s\n",
-                wa->job_name,
-                (unsigned long long) wa->shard_start,
-                (unsigned long long) wa->shard_end,
-                GNUNET_STRINGS_relative_time_to_string (
-                  GNUNET_TIME_absolute_get_duration (wa->shard_start_time),
-                  GNUNET_YES));
-    break;
-  }
-  return true;
-}
-
-
-/**
- * We are finished with the current transaction, try
- * to commit and then schedule the next iteration.
- *
- * @param wa wire account to commit for
- */
-static void
-do_commit (struct WireAccount *wa)
-{
-  enum GNUNET_DB_QueryStatus qs;
-  bool shard_done;
-
-  GNUNET_assert (NULL == task);
-  shard_done = check_shard_done (wa);
-  wa->started_transaction = false;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Committing %s progress (%llu,%llu] at %llu\n (%s)",
-              wa->job_name,
-              (unsigned long long) wa->shard_start,
-              (unsigned long long) wa->shard_end,
-              (unsigned long long) wa->latest_row_off,
-              shard_done
-              ? "shard done"
-              : "shard incomplete");
-  qs = db_plugin->commit (db_plugin->cls);
-  switch (qs)
-  {
-  case GNUNET_DB_STATUS_HARD_ERROR:
-    GNUNET_break (0);
+    /* Transaction list was drained and we are in
+       test mode. So we are done. */
     GNUNET_SCHEDULER_shutdown ();
     return;
-  case GNUNET_DB_STATUS_SOFT_ERROR:
-    /* reduce transaction size to reduce rollback probability */
-    handle_soft_error (wa);
-    return;
-  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
-  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
-    /* normal case */
-    break;
-  }
-  if (shard_done)
-  {
-    wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
-    wa->shard_open = false;
-    account_completed (wa);
-  }
-  else
-  {
-    task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
-                                     wa);
   }
+  GNUNET_assert (NULL == task);
+  schedule_transfers ();
 }
 
 
@@ -609,18 +455,24 @@ do_commit (struct WireAccount *wa)
  * We got incoming transaction details from the bank. Add them
  * to the database.
  *
- * @param wa wire account we are handling
  * @param details array of transaction details
  * @param details_length length of the @a details array
- * @return true on success
  */
-static bool
-process_reply (struct WireAccount *wa,
-               const struct TALER_BANK_CreditDetails *details,
+static void
+process_reply (const struct TALER_BANK_CreditDetails *details,
                unsigned int details_length)
 {
-  uint64_t lroff = wa->latest_row_off;
+  enum GNUNET_DB_QueryStatus qs;
+  bool shard_done;
+  uint64_t lroff = latest_row_off;
 
+  if (0 == details_length)
+  {
+    /* Server should have used 204, not 200! */
+    GNUNET_break_op (0);
+    transaction_completed ();
+    return;
+  }
   /* check serial IDs for range constraints */
   for (unsigned int i = 0; i<details_length; i++)
   {
@@ -634,16 +486,9 @@ process_reply (struct WireAccount *wa,
                   (unsigned long long) lroff);
       db_plugin->rollback (db_plugin->cls);
       GNUNET_SCHEDULER_shutdown ();
-      wa->hh = NULL;
-      return false;
-    }
-    if (cd->serial_id >= wa->max_row_off)
-    {
-      /* We got 'limit' transactions back from the bank, so we should not
-         introduce any delay before the next call. */
-      wa->delay = false;
+      return;
     }
-    if (cd->serial_id > wa->shard_end)
+    if (cd->serial_id > shard_end)
     {
       /* we are *past* the current shard (likely because the serial_id of the
          shard_end happens to not exist in the DB). So commit and stop this
@@ -651,19 +496,14 @@ process_reply (struct WireAccount *wa,
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "Serial ID %llu past shard end at %llu, ending iteration 
early!\n",
                   (unsigned long long) cd->serial_id,
-                  (unsigned long long) wa->shard_end);
+                  (unsigned long long) shard_end);
       details_length = i;
-      wa->delay = false;
+      progress = true;
+      lroff = cd->serial_id - 1;
       break;
     }
     lroff = cd->serial_id;
   }
-  if (0 == details_length)
-  {
-    /* Server should have used 204, not 200! */
-    GNUNET_break_op (0);
-    return true;
-  }
   if (GNUNET_OK !=
       db_plugin->start_read_committed (db_plugin->cls,
                                        "wirewatch check for incoming wire 
transfers"))
@@ -672,15 +512,13 @@ process_reply (struct WireAccount *wa,
                 "Failed to start database transaction!\n");
     global_ret = EXIT_FAILURE;
     GNUNET_SCHEDULER_shutdown ();
-    wa->hh = NULL;
-    return false;
+    return;
   }
-  wa->started_transaction = true;
+  started_transaction = true;
 
   for (unsigned int i = 0; i<details_length; i++)
   {
     const struct TALER_BANK_CreditDetails *cd = &details[i];
-    enum GNUNET_DB_QueryStatus qs;
 
     /* FIXME #7276: Consider using Postgres multi-valued insert here,
    for up to 15x speed-up according to
@@ -692,23 +530,19 @@ process_reply (struct WireAccount *wa,
                                         &cd->amount,
                                         cd->execution_date,
                                         cd->debit_account_uri,
-                                        wa->ai->section_name,
+                                        ai->section_name,
                                         cd->serial_id);
     switch (qs)
     {
     case GNUNET_DB_STATUS_HARD_ERROR:
       GNUNET_break (0);
-      db_plugin->rollback (db_plugin->cls);
-      wa->started_transaction = false;
       GNUNET_SCHEDULER_shutdown ();
-      wa->hh = NULL;
-      return false;
+      return;
     case GNUNET_DB_STATUS_SOFT_ERROR:
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "Got DB soft error for reserves_in_insert. Rolling back.\n");
-      handle_soft_error (wa);
-      wa->hh = NULL;
-      return true;
+      handle_soft_error ();
+      return;
     case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
       /* Either wirewatch was freshly started after the system was
          shutdown and we're going over an incomplete shard again
@@ -720,25 +554,92 @@ process_reply (struct WireAccount *wa,
                   "Attempted to import transaction %llu (%s) twice. "
                   "This should happen rarely (if not, ask for support).\n",
                   (unsigned long long) cd->serial_id,
-                  wa->job_name);
+                  job_name);
       db_plugin->rollback (db_plugin->cls);
-      wa->latest_row_off = cd->serial_id;
-      wa->started_transaction = false;
+      started_transaction = false;
       /* already existed, ok, let's just continue */
-      return true;
+      return;
     case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
-      wa->latest_row_off = cd->serial_id;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "Imported transaction %llu.",
+                  (unsigned long long) cd->serial_id);
       /* normal case */
       break;
     }
   }
-  do_commit (wa);
-  if (check_shard_done (wa))
-    account_completed (wa);
-  else
-    task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
-                                     wa);
-  return true;
+  latest_row_off = lroff;
+  shard_done = (shard_end <= latest_row_off);
+  if (shard_done)
+  {
+    /* shard is complete, mark this as well */
+    qs = db_plugin->complete_shard (db_plugin->cls,
+                                    job_name,
+                                    shard_start,
+                                    shard_end);
+    switch (qs)
+    {
+    case GNUNET_DB_STATUS_HARD_ERROR:
+      GNUNET_break (0);
+      GNUNET_SCHEDULER_shutdown ();
+      return;
+    case GNUNET_DB_STATUS_SOFT_ERROR:
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "Got DB soft error for complete_shard. Rolling back.\n");
+      handle_soft_error ();
+      return;
+    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+      GNUNET_break (0);
+      /* Not expected, but let's just continue */
+      break;
+    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+      /* normal case */
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "Completed shard %s (%llu,%llu] after %s\n",
+                  job_name,
+                  (unsigned long long) shard_start,
+                  (unsigned long long) shard_end,
+                  GNUNET_STRINGS_relative_time_to_string (
+                    GNUNET_TIME_absolute_get_duration (shard_start_time),
+                    true));
+      break;
+    }
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Committing %s progress (%llu,%llu] at %llu\n (%s)",
+              job_name,
+              (unsigned long long) shard_start,
+              (unsigned long long) shard_end,
+              (unsigned long long) latest_row_off,
+              shard_done
+              ? "shard done"
+              : "shard incomplete");
+  qs = db_plugin->commit (db_plugin->cls);
+  switch (qs)
+  {
+  case GNUNET_DB_STATUS_HARD_ERROR:
+    GNUNET_break (0);
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  case GNUNET_DB_STATUS_SOFT_ERROR:
+    /* reduce transaction size to reduce rollback probability */
+    handle_soft_error ();
+    return;
+  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+    started_transaction = false;
+    /* normal case */
+    break;
+  }
+  if (shard_done)
+  {
+    shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
+    shard_open = false;
+    transaction_completed ();
+    return;
+  }
+  GNUNET_assert (NULL == task);
+  task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+                                   NULL);
 }
 
 
@@ -746,76 +647,75 @@ process_reply (struct WireAccount *wa,
  * Callbacks of this type are used to serve the result of asking
  * the bank for the transaction history.
  *
- * @param cls closure with the `struct WireAccount *` we are processing
+ * @param cls NULL
  * @param reply response we got from the bank
  */
 static void
 history_cb (void *cls,
             const struct TALER_BANK_CreditHistoryResponse *reply)
 {
-  struct WireAccount *wa = cls;
-  bool ok;
-
+  (void) cls;
   GNUNET_assert (NULL == task);
-  wa->hh = NULL;
+  hh = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "History request returned with HTTP status %u\n",
+              reply->http_status);
   switch (reply->http_status)
   {
-  case 0:
-    ok = false;
   case MHD_HTTP_OK:
-    ok = process_reply (wa,
-                        reply->details.success.details,
-                        reply->details.success.details_length);
-    break;
+    process_reply (reply->details.success.details,
+                   reply->details.success.details_length);
+    return;
   case MHD_HTTP_NO_CONTENT:
-    ok = true;
-    break;
+    transaction_completed ();
+    return;
   case MHD_HTTP_NOT_FOUND:
-    ok = ignore_account_404;
+    if (ignore_account_404)
+    {
+      transaction_completed ();
+      return;
+    }
     break;
   default:
-    ok = false;
     break;
   }
-
-  if (! ok)
+  GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+              "Error fetching history: %s (%u)\n",
+              TALER_ErrorCode_get_hint (reply->ec),
+              reply->http_status);
+  if (! exit_on_error)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Error fetching history: %s (%u)\n",
-                TALER_ErrorCode_get_hint (reply->ec),
-                reply->http_status);
-    if (! (exit_on_error || test_mode) )
-    {
-      account_completed (wa);
-      return;
-    }
-    GNUNET_SCHEDULER_shutdown ();
+    transaction_completed ();
     return;
   }
+  GNUNET_SCHEDULER_shutdown ();
 }
 
 
 static void
 continue_with_shard (void *cls)
 {
-  struct WireAccount *wa = cls;
   unsigned int limit;
 
+  (void) cls;
   task = NULL;
-  limit = GNUNET_MIN (wa->batch_size,
-                      wa->shard_end - wa->latest_row_off);
-  wa->max_row_off = wa->latest_row_off + limit;
-  GNUNET_assert (NULL == wa->hh);
-  wa->hh = TALER_BANK_credit_history (ctx,
-                                      wa->ai->auth,
-                                      wa->latest_row_off,
-                                      limit,
-                                      test_mode
-                                      ? GNUNET_TIME_UNIT_ZERO
-                                      : LONGPOLL_TIMEOUT,
-                                      &history_cb,
-                                      wa);
-  if (NULL == wa->hh)
+  GNUNET_assert (shard_end > latest_row_off);
+  limit = GNUNET_MIN (batch_size,
+                      shard_end - latest_row_off);
+  GNUNET_assert (NULL == hh);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Requesting credit history staring from %llu\n",
+              (unsigned long long) latest_row_off);
+  hh = TALER_BANK_credit_history (ctx,
+                                  ai->auth,
+                                  latest_row_off,
+                                  limit,
+                                  test_mode
+                                  ? GNUNET_TIME_UNIT_ZERO
+                                  : LONGPOLL_TIMEOUT,
+                                  &history_cb,
+                                  NULL);
+  if (NULL == hh)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Failed to start request for account history!\n");
@@ -829,12 +729,12 @@ continue_with_shard (void *cls)
 static void
 lock_shard (void *cls)
 {
-  struct WireAccount *wa = cls;
   enum GNUNET_DB_QueryStatus qs;
   struct GNUNET_TIME_Relative delay;
-  uint64_t last_shard_start = wa->shard_start;
-  uint64_t last_shard_end = wa->shard_end;
+  uint64_t last_shard_start = shard_start;
+  uint64_t last_shard_end = shard_end;
 
+  (void) cls;
   task = NULL;
   if (GNUNET_SYSERR ==
       db_plugin->preflight (db_plugin->cls))
@@ -845,17 +745,16 @@ lock_shard (void *cls)
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
-  if ( (wa->shard_open) &&
-       (GNUNET_TIME_absolute_is_future (wa->shard_end_time)) )
+  if ( (shard_open) &&
+       (GNUNET_TIME_absolute_is_future (shard_end_time)) )
   {
-    wa->delay = true; /* default is to delay, unless
-                         we find out that we're really busy */
-    wa->batch_start = wa->latest_row_off;
+    progress = false;
+    batch_start = latest_row_off;
     task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
-                                     wa);
+                                     NULL);
     return;
   }
-  if (wa->shard_open)
+  if (shard_open)
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Shard not completed in time, will try to re-acquire\n");
   /* How long we lock a shard depends on the number of
@@ -868,15 +767,15 @@ lock_shard (void *cls)
       GNUNET_CRYPTO_QUALITY_WEAK,
       4 * GNUNET_TIME_relative_max (
         wirewatch_idle_sleep_interval,
-        GNUNET_TIME_relative_multiply (wa->shard_delay,
+        GNUNET_TIME_relative_multiply (shard_delay,
                                        max_workers)).rel_value_us);
-  wa->shard_start_time = GNUNET_TIME_absolute_get ();
+  shard_start_time = GNUNET_TIME_absolute_get ();
   qs = db_plugin->begin_shard (db_plugin->cls,
-                               wa->job_name,
+                               job_name,
                                delay,
                                shard_size,
-                               &wa->shard_start,
-                               &wa->shard_end);
+                               &shard_start,
+                               &shard_end);
   switch (qs)
   {
   case GNUNET_DB_STATUS_HARD_ERROR:
@@ -893,52 +792,51 @@ lock_shard (void *cls)
       rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval);
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                   "Serialization error tying to obtain shard %s, will try 
again in %s!\n",
-                  wa->job_name,
+                  job_name,
                   GNUNET_STRINGS_relative_time_to_string (rdelay,
-                                                          GNUNET_YES));
-      wa->delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
+                                                          true));
+      delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
     }
     GNUNET_assert (NULL == task);
-    schedule_transfers (wa->next);
+    schedule_transfers ();
     return;
   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     GNUNET_break (0);
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                 "No shard available, will try again for %s in %s!\n",
-                wa->job_name,
+                job_name,
                 GNUNET_STRINGS_relative_time_to_string (
                   wirewatch_idle_sleep_interval,
                   GNUNET_YES));
-    wa->delayed_until = GNUNET_TIME_relative_to_absolute (
+    delayed_until = GNUNET_TIME_relative_to_absolute (
       wirewatch_idle_sleep_interval);
-    wa->shard_open = false;
+    shard_open = false;
     GNUNET_assert (NULL == task);
-    schedule_transfers (wa->next);
+    schedule_transfers ();
     return;
   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     /* continued below */
     break;
   }
-  wa->shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
+  shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Starting with shard %s at (%llu,%llu] locked for %s\n",
-              wa->job_name,
-              (unsigned long long) wa->shard_start,
-              (unsigned long long) wa->shard_end,
+              job_name,
+              (unsigned long long) shard_start,
+              (unsigned long long) shard_end,
               GNUNET_STRINGS_relative_time_to_string (delay,
-                                                      GNUNET_YES));
-  wa->delay = true; /* default is to delay, unless
-                       we find out that we're really busy */
-  wa->batch_start = wa->shard_start;
-  if ( (wa->shard_open) &&
-       (wa->shard_start == last_shard_start) &&
-       (wa->shard_end == last_shard_end) )
-    GNUNET_break (wa->latest_row_off >= wa->batch_start); /* resume where we 
left things */
+                                                      true));
+  progress = false;
+  batch_start = shard_start;
+  if ( (shard_open) &&
+       (shard_start == last_shard_start) &&
+       (shard_end == last_shard_end) )
+    GNUNET_break (latest_row_off >= batch_start); /* resume where we left 
things */
   else
-    wa->latest_row_off = wa->batch_start;
-  wa->shard_open = true;
+    latest_row_off = batch_start;
+  shard_open = true;
   task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
-                                   wa);
+                                   NULL);
 }
 
 
@@ -961,14 +859,15 @@ run (void *cls,
   (void) cfgfile;
 
   cfg = c;
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+                                 cls);
   if (GNUNET_OK !=
       exchange_serve_process_config ())
   {
     global_ret = EXIT_NOTCONFIGURED;
+    GNUNET_SCHEDULER_shutdown ();
     return;
   }
-  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
-                                 cls);
   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
                           &rc);
   if (NULL == ctx)
@@ -978,9 +877,7 @@ run (void *cls,
     return;
   }
   rc = GNUNET_CURL_gnunet_rc_create (ctx);
-  GNUNET_assert (NULL == task);
-  task = GNUNET_SCHEDULER_add_now (&lock_shard,
-                                   wa_head);
+  schedule_transfers ();
 }
 
 
@@ -996,6 +893,11 @@ main (int argc,
       char *const *argv)
 {
   struct GNUNET_GETOPT_CommandLineOption options[] = {
+    GNUNET_GETOPT_option_string ('a',
+                                 "account",
+                                 "SECTION_NAME",
+                                 "name of the configuration section with the 
account we should watch (needed if more than one is enabled for crediting)",
+                                 &account_section),
     GNUNET_GETOPT_option_flag ('e',
                                "exit-on-error",
                                "terminate wirewatch if we failed to download 
information from the bank",

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]