gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: -bugfix, preparations for shardi


From: gnunet
Subject: [taler-exchange] branch master updated: -bugfix, preparations for sharding
Date: Sat, 19 Jun 2021 18:20:21 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new 0271e848 -bugfix, preparations for sharding
0271e848 is described below

commit 0271e848138a94e27f472196f5341879fd3ab8ba
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sat Jun 19 18:20:19 2021 +0200

    -bugfix, preparations for sharding
---
 src/bank-lib/bank_api_credit.c          | 25 ----------
 src/exchange/taler-exchange-wirewatch.c | 85 +++++++++++++++++++--------------
 2 files changed, 50 insertions(+), 60 deletions(-)

diff --git a/src/bank-lib/bank_api_credit.c b/src/bank-lib/bank_api_credit.c
index 578d86c9..290a6484 100644
--- a/src/bank-lib/bank_api_credit.c
+++ b/src/bank-lib/bank_api_credit.c
@@ -210,24 +210,6 @@ handle_credit_history_finished (void *cls,
 }
 
 
-/**
- * Request the credit history of the exchange's bank account.
- *
- * @param ctx curl context for the event loop
- * @param auth authentication data to use
- * @param start_row from which row on do we want to get results,
- *        use UINT64_MAX for the latest; exclusive
- * @param num_results how many results do we want;
- *        negative numbers to go into the past, positive numbers
- *        to go into the future starting at @a start_row;
- *        must not be zero.
- * @param hres_cb the callback to call with the transaction
- *        history
- * @param hres_cb_cls closure for the above callback
- * @return NULL if the inputs are invalid (i.e. zero value for
- *         @e num_results). In this case, the callback is not
- *         called.
- */
 struct TALER_BANK_CreditHistoryHandle *
 TALER_BANK_credit_history (struct GNUNET_CURL_Context *ctx,
                            const struct TALER_BANK_AuthenticationData *auth,
@@ -300,13 +282,6 @@ TALER_BANK_credit_history (struct GNUNET_CURL_Context *ctx,
 }
 
 
-/**
- * Cancel a history request.  This function cannot be
- * used on a request handle if a response is already
- * served for it.
- *
- * @param hh the history request handle
- */
 void
 TALER_BANK_credit_history_cancel (struct TALER_BANK_CreditHistoryHandle *hh)
 {
diff --git a/src/exchange/taler-exchange-wirewatch.c 
b/src/exchange/taler-exchange-wirewatch.c
index 40b962f8..760dbe10 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -1,6 +1,6 @@
 /*
   This file is part of TALER
-  Copyright (C) 2016--2020 Taler Systems SA
+  Copyright (C) 2016--2021 Taler Systems SA
 
   TALER is free software; you can redistribute it and/or modify it under the
   terms of the GNU Affero General Public License as published by the Free 
Software
@@ -89,6 +89,11 @@ struct WireAccount
    */
   uint64_t latest_row_off;
 
+  /**
+   * Offset where our current shard ends.
+   */
+  uint64_t shard_end;
+
   /**
    * How many transactions do we retrieve per batch?
    */
@@ -103,19 +108,14 @@ struct WireAccount
    * Are we running from scratch and should re-process all transactions
    * for this account?
    */
-  int reset_mode;
+  bool reset_mode;
 
   /**
    * Should we delay the next request to the wire plugin a bit?  Set to
-   * #GNUNET_NO if we actually did some work.
+   * false if we actually did some work.
    */
-  int delay;
+  bool delay;
 
-  /**
-   * Did we experience a soft failure during the current
-   * transaction?
-   */
-  bool soft_fail;
 };
 
 
@@ -160,6 +160,11 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
  */
 static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
 
+/**
+ * Modulus to apply to group shards.
+ */
+static unsigned int shard_size = 1024;
+
 /**
  * Value to return from main(). 0 on success, non-zero on
  * on serious errors.
@@ -363,20 +368,10 @@ history_cb (void *cls,
                   (unsigned int) ec,
                   http_status);
     }
-    if (wa->soft_fail)
-    {
-      /* no point to commit, transaction was already rolled
-         back after we encountered a soft failure */
-      wa->soft_fail = false;
-      qs = GNUNET_DB_STATUS_SOFT_ERROR;
-    }
-    else
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "End of list. Committing progress!\n");
-      qs = db_plugin->commit (db_plugin->cls,
-                              session);
-    }
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "End of list. Committing progress!\n");
+    qs = db_plugin->commit (db_plugin->cls,
+                            session);
     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     {
       GNUNET_SCHEDULER_shutdown ();
@@ -410,7 +405,7 @@ history_cb (void *cls,
                   "Increasing batch size to %llu\n",
                   (unsigned long long) wa->batch_size);
     }
-    if ( (GNUNET_YES == wa->delay) &&
+    if ( (wa->delay) &&
          (test_mode) &&
          (NULL == wa->next) )
     {
@@ -419,7 +414,7 @@ history_cb (void *cls,
       GNUNET_SCHEDULER_shutdown ();
       return GNUNET_OK;
     }
-    if (GNUNET_YES == wa->delay)
+    if (wa->delay)
     {
       wa->delayed_until
         = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
@@ -477,6 +472,7 @@ history_cb (void *cls,
     db_plugin->rollback (db_plugin->cls,
                          session);
     GNUNET_SCHEDULER_shutdown ();
+    wa->hh = NULL;
     return GNUNET_SYSERR;
   }
   if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -485,10 +481,13 @@ history_cb (void *cls,
                 "Got DB soft error for reserves_in_insert. Rolling back.\n");
     db_plugin->rollback (db_plugin->cls,
                          session);
-    wa->soft_fail = true;
+    wa->hh = NULL;
+    GNUNET_assert (NULL == task);
+    task = GNUNET_SCHEDULER_add_now (&find_transfers,
+                                     NULL);
     return GNUNET_SYSERR;
   }
-  wa->delay = GNUNET_NO;
+  wa->delay = false;
   wa->latest_row_off = serial_id;
   return GNUNET_OK;
 }
@@ -504,6 +503,7 @@ find_transfers (void *cls)
 {
   struct TALER_EXCHANGEDB_Session *session;
   enum GNUNET_DB_QueryStatus qs;
+  unsigned int limit;
 
   (void) cls;
   task = NULL;
@@ -555,13 +555,21 @@ find_transfers (void *cls)
     }
     wa_pos->reset_mode = GNUNET_NO;
   }
-  wa_pos->delay = GNUNET_YES;
+  wa_pos->delay = true;
   wa_pos->current_batch_size = 0; /* reset counter */
   wa_pos->session = session;
+  if (wa_pos->shard_end == wa_pos->last_row_off)
+  {
+    /* advance to next shard */
+    wa_pos->shard_end += shard_size;
+  }
+  limit = GNUNET_MIN (wa_pos->batch_size,
+                      wa_pos->shard_end - wa_pos->last_row_off);
+  GNUNET_assert (NULL == wa_pos->hh);
   wa_pos->hh = TALER_BANK_credit_history (ctx,
                                           &wa_pos->auth,
                                           wa_pos->last_row_off,
-                                          wa_pos->batch_size,
+                                          limit,
                                           &history_cb,
                                           wa_pos);
   if (NULL == wa_pos->hh)
@@ -594,6 +602,7 @@ run (void *cls,
   (void) cls;
   (void) args;
   (void) cfgfile;
+
   cfg = c;
   if (GNUNET_OK !=
       exchange_serve_process_config ())
@@ -603,8 +612,6 @@ run (void *cls,
   }
   wa_pos = wa_head;
   GNUNET_assert (NULL != wa_pos);
-  task = GNUNET_SCHEDULER_add_now (&find_transfers,
-                                   NULL);
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                  cls);
   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
@@ -615,6 +622,9 @@ run (void *cls,
     GNUNET_break (0);
     return;
   }
+
+  task = GNUNET_SCHEDULER_add_now (&find_transfers,
+                                   NULL);
 }
 
 
@@ -630,16 +640,21 @@ main (int argc,
       char *const *argv)
 {
   struct GNUNET_GETOPT_CommandLineOption options[] = {
+    GNUNET_GETOPT_option_flag ('r',
+                               "reset",
+                               "start fresh with all transactions in the 
history",
+                               &reset_mode),
+    GNUNET_GETOPT_option_uint ('S',
+                               "size",
+                               "SIZE",
+                               "Size to process per shard (default: 1024)",
+                               &shard_size),
     GNUNET_GETOPT_option_timetravel ('T',
                                      "timetravel"),
     GNUNET_GETOPT_option_flag ('t',
                                "test",
                                "run in test mode and exit when idle",
                                &test_mode),
-    GNUNET_GETOPT_option_flag ('r',
-                               "reset",
-                               "start fresh with all transactions in the 
history",
-                               &reset_mode),
     GNUNET_GETOPT_OPTION_END
   };
   enum GNUNET_GenericReturnValue ret;

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]