gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: clean up wirewatch logic


From: gnunet
Subject: [taler-exchange] branch master updated: clean up wirewatch logic
Date: Sun, 15 Mar 2020 21:20:59 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new d3f7cc11 clean up wirewatch logic
d3f7cc11 is described below

commit d3f7cc11842a3e2a574431919179d037e56715ea
Author: Christian Grothoff <address@hidden>
AuthorDate: Sun Mar 15 21:20:56 2020 +0100

    clean up wirewatch logic
---
 src/exchange/exchange.conf              |   8 +-
 src/exchange/taler-exchange-wirewatch.c | 249 ++++++++++++++++++--------------
 2 files changed, 151 insertions(+), 106 deletions(-)

diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf
index 8144bddc..9de19894 100644
--- a/src/exchange/exchange.conf
+++ b/src/exchange/exchange.conf
@@ -54,9 +54,15 @@ PORT = 8081
 BASE_URL = http://localhost:8081/
 
 
-# How long should the aggregator sleep if it has nothing to do?
+# How long should the aggregator (and closer, and transfer)
+# sleep if it has nothing to do?
 AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
 
+# How long should wirewatch sleep if it has nothing to do?
+# (Set very aggressively here for the demonstrators to be
+# super fast.)
+WIREWATCH_IDLE_SLEEP_INTERVAL = 1 s
+
 # how long is one signkey valid?
 SIGNKEY_DURATION = 4 weeks
 
diff --git a/src/exchange/taler-exchange-wirewatch.c 
b/src/exchange/taler-exchange-wirewatch.c
index 3731f663..04bf2169 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -1,6 +1,6 @@
 /*
   This file is part of TALER
-  Copyright (C) 2016, 2017, 2018 Taler Systems SA
+  Copyright (C) 2016--2020 Taler Systems SA
 
   TALER is free software; you can redistribute it and/or modify it under the
   terms of the GNU Affero General Public License as published by the Free 
Software
@@ -29,12 +29,13 @@
 #include "taler_json_lib.h"
 #include "taler_bank_service.h"
 
+#define DEBUG_LOGGING 0
+
 /**
- * How long do we sleep before trying again if there
- * are no transactions returned by the wire plugin?
+ * What is the initial batch size we use for credit history
+ * requests with the bank.  See `batch_size` below.
  */
-#define DELAY GNUNET_TIME_UNIT_SECONDS
-
+#define INITIAL_BATCH_SIZE 1024
 
 /**
  * Information we keep for each supported account.
@@ -56,11 +57,48 @@ struct WireAccount
    */
   char *section_name;
 
+  /**
+   * Database session we are using for the current transaction.
+   */
+  struct TALER_EXCHANGEDB_Session *session;
+
+  /**
+   * Active request for history.
+   */
+  struct TALER_BANK_CreditHistoryHandle *hh;
+
   /**
    * Authentication data.
    */
   struct TALER_BANK_AuthenticationData auth;
 
+  /**
+   * Until when is processing this wire plugin delayed?
+   */
+  struct GNUNET_TIME_Absolute delayed_until;
+
+  /**
+   * Encoded offset in the wire transfer list from where
+   * to start the next query with the bank.
+   */
+  uint64_t last_row_off;
+
+  /**
+   * Latest row offset seen in this transaction, becomes
+   * the new #last_row_off upon commit.
+   */
+  uint64_t latest_row_off;
+
+  /**
+   * How many transactions do we retrieve per batch?
+   */
+  unsigned int batch_size;
+
+  /**
+   * How many transactions did we see in the current batch?
+   */
+  unsigned int current_batch_size;
+
   /**
    * Are we running from scratch and should re-process all transactions
    * for this account?
@@ -68,9 +106,10 @@ struct WireAccount
   int reset_mode;
 
   /**
-   * Until when is processing this wire plugin delayed?
+   * Should we delay the next request to the wire plugin a bit?  Set to
+   * #GNUNET_NO if we actually did some work.
    */
-  struct GNUNET_TIME_Absolute delayed_until;
+  int delay;
 
 };
 
@@ -86,7 +125,8 @@ static struct WireAccount *wa_head;
 static struct WireAccount *wa_tail;
 
 /**
- * Wire plugin we are currently using.
+ * Wire account we are currently processing.  This would go away
+ * if we ever start processing all accounts in parallel.
  */
 static struct WireAccount *wa_pos;
 
@@ -111,27 +151,25 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
 static struct TALER_EXCHANGEDB_Plugin *db_plugin;
 
 /**
- * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
- * on serious errors.
- */
-static int global_ret;
-
-/**
- * Encoded offset in the wire transfer list from where
- * to start the next query with the bank.
+ * How long should we sleep when idle before trying to find more work?
  */
-static uint64_t last_row_off;
+static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
 
 /**
- * Latest row offset seen in this transaction, becomes
- * the new #last_row_off upon commit.
- */
-static uint64_t latest_row_off;
-
-/**
- * Should we delay the next request to the wire plugin a bit?
+ * Value to return from main(). 0 on success, non-zero on
+ * on serious errors.
  */
-static int delay;
+static enum
+{
+  GR_SUCCESS = 0,
+  GR_DATABASE_SESSION_FAIL = 1,
+  GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2,
+  GR_DATABASE_SELECT_LATEST_HARD_FAIL = 3,
+  GR_BANK_REQUEST_HISTORY_FAIL = 4,
+  GR_CONFIGURATION_INVALID = 5,
+  GR_CMD_LINE_UTF8_ERROR = 6,
+  GR_CMD_LINE_OPTIONS_WRONG = 7,
+} global_ret;
 
 /**
  * Are we run in testing mode and should only do one pass?
@@ -144,25 +182,10 @@ static int test_mode;
 static int reset_mode;
 
 /**
- * How many transactions do we retrieve per batch?
- */
-static unsigned int batch_size = 1024;
-
-/**
- * How many transactions did we see in the current batch?
- */
-static unsigned int current_batch_size;
-
-/**
- * Next task to run, if any.
+ * Current task waiting for execution, if any.
  */
 static struct GNUNET_SCHEDULER_Task *task;
 
-/**
- * Active request for history.
- */
-static struct TALER_BANK_CreditHistoryHandle *hh;
-
 
 /**
  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
@@ -173,11 +196,26 @@ static void
 shutdown_task (void *cls)
 {
   (void) cls;
-  if (NULL != hh)
   {
-    TALER_BANK_credit_history_cancel (hh);
-    hh = NULL;
+    struct WireAccount *wa;
+
+    while (NULL != (wa = wa_head))
+    {
+      if (NULL != wa->hh)
+      {
+        TALER_BANK_credit_history_cancel (wa->hh);
+        wa->hh = NULL;
+      }
+      GNUNET_CONTAINER_DLL_remove (wa_head,
+                                   wa_tail,
+                                   wa);
+      TALER_BANK_auth_free (&wa->auth);
+      GNUNET_free (wa->section_name);
+      GNUNET_free (wa);
+    }
   }
+  wa_pos = NULL;
+
   if (NULL != ctx)
   {
     GNUNET_CURL_fini (ctx);
@@ -195,21 +233,6 @@ shutdown_task (void *cls)
   }
   TALER_EXCHANGEDB_plugin_unload (db_plugin);
   db_plugin = NULL;
-  {
-    struct WireAccount *wa;
-
-    while (NULL != (wa = wa_head))
-    {
-      GNUNET_CONTAINER_DLL_remove (wa_head,
-                                   wa_tail,
-                                   wa);
-      TALER_BANK_auth_free (&wa->auth);
-      GNUNET_free (wa->section_name);
-      GNUNET_free (wa);
-    }
-  }
-  wa_pos = NULL;
-  last_row_off = 0;
 }
 
 
@@ -243,6 +266,7 @@ add_account_cb (void *cls,
     return;
   }
   wa->section_name = GNUNET_strdup (ai->section_name);
+  wa->batch_size = INITIAL_BATCH_SIZE;
   GNUNET_CONTAINER_DLL_insert (wa_head,
                                wa_tail,
                                wa);
@@ -258,6 +282,17 @@ add_account_cb (void *cls,
 static int
 exchange_serve_process_config (void)
 {
+  if (GNUNET_OK !=
+      GNUNET_CONFIGURATION_get_value_time (cfg,
+                                           "exchange",
+                                           "WIREWATCH_IDLE_SLEEP_INTERVAL",
+                                           &wirewatch_idle_sleep_interval))
+  {
+    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+                               "exchange",
+                               "WIREWATCH_IDLE_SLEEP_INTERVAL");
+    return GNUNET_SYSERR;
+  }
   if (NULL ==
       (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
   {
@@ -292,7 +327,7 @@ find_transfers (void *cls);
  * Callbacks of this type are used to serve the result of asking
  * the bank for the transaction history.
  *
- * @param cls closure with the `struct TALER_EXCHANGEDB_Session *`
+ * @param cls closure with the `struct WioreAccount *` we are processing
  * @param http_status HTTP status code from the server
  * @param ec taler error code
  * @param serial_id identification of the position at which we are querying
@@ -308,13 +343,14 @@ history_cb (void *cls,
             const struct TALER_BANK_CreditDetails *details,
             const json_t *json)
 {
-  struct TALER_EXCHANGEDB_Session *session = cls;
+  struct WireAccount *wa = cls;
+  struct TALER_EXCHANGEDB_Session *session = wa->session;
   enum GNUNET_DB_QueryStatus qs;
 
   (void) json;
   if (NULL == details)
   {
-    hh = NULL;
+    wa->hh = NULL;
     if (TALER_EC_NONE != ec)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -331,8 +367,8 @@ history_cb (void *cls,
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Got DB soft error for commit\n");
       /* reduce transaction size to reduce rollback probability */
-      if (2 > current_batch_size)
-        current_batch_size /= 2;
+      if (2 > wa->current_batch_size)
+        wa->current_batch_size /= 2;
       /* try again */
       GNUNET_assert (NULL == task);
       task = GNUNET_SCHEDULER_add_now (&find_transfers,
@@ -342,27 +378,28 @@ history_cb (void *cls,
     if (0 < qs)
     {
       /* transaction success, update #last_row_off */
-      last_row_off = latest_row_off;
-      latest_row_off = 0;
-
+      wa->last_row_off = wa->latest_row_off;
+      wa->latest_row_off = 0; /* should not be needed */
+      wa->session = NULL; /* should not be needed */
       /* if successful at limit, try increasing transaction batch size (AIMD) 
*/
-      if (current_batch_size == batch_size)
-        batch_size++;
+      if ( (wa->current_batch_size == wa->batch_size) &&
+           (UINT_MAX > wa->batch_size) )
+        wa->batch_size++;
     }
     GNUNET_break (0 <= qs);
-    if ( (GNUNET_YES == delay) &&
+    if ( (GNUNET_YES == wa->delay) &&
          (test_mode) &&
-         (NULL == wa_pos->next) )
+         (NULL == wa->next) )
     {
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "Shutdown due to test mode!\n");
       GNUNET_SCHEDULER_shutdown ();
       return GNUNET_OK;
     }
-    if (GNUNET_YES == delay)
+    if (GNUNET_YES == wa->delay)
     {
-      wa_pos->delayed_until
-        = GNUNET_TIME_relative_to_absolute (DELAY);
+      wa->delayed_until
+        = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
       wa_pos = wa_pos->next;
       if (NULL == wa_pos)
         wa_pos = wa_head;
@@ -381,29 +418,32 @@ history_cb (void *cls,
   /**
    * Debug block.
    */
+#if DEBUG_LOGGING
   {
-/* Should be 53, give 80 just to be redundant.  */
+    /** Should be 53, give 80 just to be extra conservative (and aligned).  */
 #define PUBSIZE 80
     char wtid_s[PUBSIZE];
 
-    GNUNET_break
-      (NULL != GNUNET_STRINGS_data_to_string (&details->reserve_pub,
-                                              sizeof (details->reserve_pub),
-                                              &wtid_s[0],
-                                              PUBSIZE));
+    GNUNET_break (NULL !=
+                  GNUNET_STRINGS_data_to_string (&details->reserve_pub,
+                                                 sizeof (details->reserve_pub),
+                                                 &wtid_s[0],
+                                                 PUBSIZE));
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Plain text subject (= reserve_pub): %s\n",
                 wtid_s);
   }
+#endif
 
-  current_batch_size++;
+  if (wa->current_batch_size < UINT_MAX)
+    wa->current_batch_size++;
   qs = db_plugin->reserves_in_insert (db_plugin->cls,
                                       session,
                                       &details->reserve_pub,
                                       &details->amount,
                                       details->execution_date,
                                       details->debit_account_url,
-                                      wa_pos->section_name,
+                                      wa->section_name,
                                       serial_id);
   if (GNUNET_DB_STATUS_HARD_ERROR == qs)
   {
@@ -425,8 +465,8 @@ history_cb (void *cls,
                                      NULL);
     return GNUNET_SYSERR;
   }
-
-  latest_row_off = serial_id;
+  wa->delay = GNUNET_NO;
+  wa->latest_row_off = serial_id;
   return GNUNET_OK;
 }
 
@@ -446,12 +486,11 @@ find_transfers (void *cls)
   task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Checking for incoming wire transfers\n");
-
   if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Failed to obtain database session!\n");
-    global_ret = GNUNET_SYSERR;
+    global_ret = GR_DATABASE_SESSION_FAIL;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
@@ -464,7 +503,7 @@ find_transfers (void *cls)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Failed to start database transaction!\n");
-    global_ret = GNUNET_SYSERR;
+    global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
@@ -473,14 +512,14 @@ find_transfers (void *cls)
     qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
                                                      session,
                                                      wa_pos->section_name,
-                                                     &last_row_off);
+                                                     &wa_pos->last_row_off);
     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Failed to obtain starting point for montoring from 
database!\n");
       db_plugin->rollback (db_plugin->cls,
                            session);
-      global_ret = GNUNET_SYSERR;
+      global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL;
       GNUNET_SCHEDULER_shutdown ();
       return;
     }
@@ -493,28 +532,28 @@ find_transfers (void *cls)
                                        NULL);
       return;
     }
+    wa_pos->reset_mode = GNUNET_NO;
   }
-  wa_pos->reset_mode = GNUNET_NO;
-  delay = GNUNET_YES;
-  current_batch_size = 0;
+  wa_pos->delay = GNUNET_YES;
+  wa_pos->current_batch_size = 0; /* reset counter */
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "wirewatch: requesting incoming history from %s\n",
               wa_pos->auth.wire_gateway_url);
-
-  hh = TALER_BANK_credit_history (ctx,
-                                  &wa_pos->auth,
-                                  last_row_off,
-                                  batch_size,
-                                  &history_cb,
-                                  session);
-  if (NULL == hh)
+  wa_pos->session = session;
+  wa_pos->hh = TALER_BANK_credit_history (ctx,
+                                          &wa_pos->auth,
+                                          wa_pos->last_row_off,
+                                          wa_pos->batch_size,
+                                          &history_cb,
+                                          wa_pos);
+  if (NULL == wa_pos->hh)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Failed to start request for account history!\n");
     db_plugin->rollback (db_plugin->cls,
                          session);
-    global_ret = GNUNET_SYSERR;
+    global_ret = GR_BANK_REQUEST_HISTORY_FAIL;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
@@ -542,7 +581,7 @@ run (void *cls,
   if (GNUNET_OK !=
       exchange_serve_process_config ())
   {
-    global_ret = 1;
+    global_ret = GR_CONFIGURATION_INVALID;
     return;
   }
   wa_pos = wa_head;
@@ -567,7 +606,7 @@ run (void *cls,
  *
  * @param argc number of arguments from the command line
  * @param argv command line arguments
- * @return 0 ok, 1 on error
+ * @return 0 ok, non-zero on error
  */
 int
 main (int argc,
@@ -590,7 +629,7 @@ main (int argc,
   if (GNUNET_OK !=
       GNUNET_STRINGS_get_utf8_args (argc, argv,
                                     &argc, &argv))
-    return 2;
+    return GR_CMD_LINE_UTF8_ERROR;
   if (GNUNET_OK !=
       GNUNET_PROGRAM_run (argc, argv,
                           "taler-exchange-wirewatch",
@@ -600,7 +639,7 @@ main (int argc,
                           &run, NULL))
   {
     GNUNET_free ((void *) argv);
-    return 1;
+    return GR_CMD_LINE_OPTIONS_WRONG;
   }
   GNUNET_free ((void *) argv);
   return global_ret;

-- 
To stop receiving notification emails like this one, please contact
address@hidden.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]