[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[taler-exchange] branch master updated: -add skeleton logic for purse ex
From: |
gnunet |
Subject: |
[taler-exchange] branch master updated: -add skeleton logic for purse expiration |
Date: |
Mon, 16 May 2022 15:43:45 +0200 |
This is an automated email from the git hooks/post-receive script.
grothoff pushed a commit to branch master
in repository exchange.
The following commit(s) were added to refs/heads/master by this push:
new 02716c40 -add skeleton logic for purse expiration
02716c40 is described below
commit 02716c4084c76630f35a5fcc4d2ef4e17d7e1b00
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Mon May 16 15:43:40 2022 +0200
-add skeleton logic for purse expiration
---
src/exchange/.gitignore | 1 +
src/exchange/Makefile.am | 14 ++
src/exchange/exchange.conf | 7 +
...r-exchange-router.c => taler-exchange-expire.c} | 256 ++++++++++++---------
src/exchange/taler-exchange-router.c | 3 +-
src/exchangedb/plugin_exchangedb_postgres.c | 21 ++
src/include/taler_exchangedb_plugin.h | 15 ++
7 files changed, 201 insertions(+), 116 deletions(-)
diff --git a/src/exchange/.gitignore b/src/exchange/.gitignore
index c12ee011..bcfdb7e8 100644
--- a/src/exchange/.gitignore
+++ b/src/exchange/.gitignore
@@ -10,3 +10,4 @@ test_taler_exchange_httpd_home/.config/taler/account-1.json
taler-exchange-closer
taler-exchange-transfer
taler-exchange-router
+taler-exchange-expire
diff --git a/src/exchange/Makefile.am b/src/exchange/Makefile.am
index 21cc1228..24fb7e3d 100644
--- a/src/exchange/Makefile.am
+++ b/src/exchange/Makefile.am
@@ -19,6 +19,7 @@ pkgcfg_DATA = \
bin_PROGRAMS = \
taler-exchange-aggregator \
taler-exchange-closer \
+ taler-exchange-expire \
taler-exchange-httpd \
taler-exchange-router \
taler-exchange-transfer \
@@ -51,6 +52,19 @@ taler_exchange_closer_LDADD = \
-lgnunetutil \
$(XLIB)
+taler_exchange_expire_SOURCES = \
+ taler-exchange-expire.c
+taler_exchange_expire_LDADD = \
+ $(LIBGCRYPT_LIBS) \
+ $(top_builddir)/src/json/libtalerjson.la \
+ $(top_builddir)/src/util/libtalerutil.la \
+ $(top_builddir)/src/bank-lib/libtalerbank.la \
+ $(top_builddir)/src/exchangedb/libtalerexchangedb.la \
+ -ljansson \
+ -lgnunetcurl \
+ -lgnunetutil \
+ $(XLIB)
+
taler_exchange_router_SOURCES = \
taler-exchange-router.c
taler_exchange_router_LDADD = \
diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf
index 9c68208a..df136d9e 100644
--- a/src/exchange/exchange.conf
+++ b/src/exchange/exchange.conf
@@ -47,8 +47,15 @@ BASE_URL = http://localhost:8081/
# How long should the aggregator sleep if it has nothing to do?
AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
+# FIXME: document!
ROUTER_IDLE_SLEEP_INTERVAL = 60 s
+# How big is an individual shard to be processed
+# by taler-exchange-expire (in time). It may take
+# this much time for an expired purse to be really
+# cleaned up and the coins refunded.
+EXPIRE_SHARD_SIZE = 5 m
+
# How long should the transfer tool
# sleep if it has nothing to do?
TRANSFER_IDLE_SLEEP_INTERVAL = 60 s
diff --git a/src/exchange/taler-exchange-router.c
b/src/exchange/taler-exchange-expire.c
similarity index 62%
copy from src/exchange/taler-exchange-router.c
copy to src/exchange/taler-exchange-expire.c
index ca4499e3..c7691930 100644
--- a/src/exchange/taler-exchange-router.c
+++ b/src/exchange/taler-exchange-expire.c
@@ -15,12 +15,8 @@
*/
/**
- * @file taler-exchange-router.c
- * @brief Process that routes P2P payments. Responsible for
- * (1) refunding coins in unmerged purses, (2) merging purses into local
reserves;
- * (3) aggregating remote payments into the respective wad transfers.
- * Execution of actual wad transfers is still to be done by
taler-exchange-transfer,
- * and watching for incoming wad transfers is done by
taler-exchange-wirewatch.
+ * @file taler-exchange-expire.c
+ * @brief Process that cleans up expired purses
* @author Christian Grothoff
*/
#include "platform.h"
@@ -33,9 +29,6 @@
#include "taler_bank_service.h"
-// FIXME: revisit how (and if) we do sharding!
-// Maybe use different helpers for wads than
-// for local purses?!
/**
* Work shard we are processing.
*/
@@ -50,12 +43,12 @@ struct Shard
/**
* Starting row of the shard.
*/
- uint32_t shard_start;
+ struct GNUNET_TIME_Absolute shard_start;
/**
* Inclusive end row of the shard.
*/
- uint32_t shard_end;
+ struct GNUNET_TIME_Absolute shard_end;
/**
* Number of starting points found in the shard.
@@ -65,26 +58,6 @@ struct Shard
};
-/**
- * What is the smallest unit we support for wire transfers?
- * We will need to round down to a multiple of this amount.
- */
-static struct TALER_Amount currency_round_unit;
-
-/**
- * What is the base URL of this exchange? Used in the
- * wire transfer subjects so that merchants and governments
- * can ask for the list of aggregated deposits.
- */
-static char *exchange_base_url;
-
-/**
- * Set to #GNUNET_YES if this exchange does not support KYC checks
- * and thus P2P transfers are to be made regardless of the
- * KYC status of the target reserve.
- */
-static int kyc_off;
-
/**
* The exchange's configuration.
*/
@@ -103,14 +76,14 @@ static struct GNUNET_SCHEDULER_Task *task;
/**
* How long should we sleep when idle before trying to find more work?
*/
-static struct GNUNET_TIME_Relative router_idle_sleep_interval;
+static struct GNUNET_TIME_Relative expire_idle_sleep_interval;
/**
* How big are the shards we are processing? Is an inclusive offset, so every
* shard ranges from [X,X+shard_size) exclusive. So a shard covers
- * shard_size slots. The maximum value for shard_size is INT32_MAX+1.
+ * shard_size slots.
*/
-static uint32_t shard_size;
+static struct GNUNET_TIME_Relative shard_size;
/**
* Value to return from main(). 0 on success, non-zero on errors.
@@ -150,54 +123,29 @@ shutdown_task (void *cls)
}
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
- TALER_EXCHANGEDB_unload_accounts ();
cfg = NULL;
}
/**
- * Parse the configuration for wirewatch.
+ * Parse the configuration for expire.
*
* @return #GNUNET_OK on success
*/
static enum GNUNET_GenericReturnValue
-parse_wirewatch_config (void)
+parse_expire_config (void)
{
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_string (cfg,
- "exchange",
- "BASE_URL",
- &exchange_base_url))
- {
- GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
- "exchange",
- "BASE_URL");
- return GNUNET_SYSERR;
- }
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange",
- "ROUTER_IDLE_SLEEP_INTERVAL",
- &router_idle_sleep_interval))
+ "EXPIRE_IDLE_SLEEP_INTERVAL",
+ &expire_idle_sleep_interval))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
- "ROUTER_IDLE_SLEEP_INTERVAL");
+ "EXPIRE_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR;
}
- if ( (GNUNET_OK !=
- TALER_config_get_amount (cfg,
- "taler",
- "CURRENCY_ROUND_UNIT",
- ¤cy_round_unit)) ||
- ( (0 != currency_round_unit.fraction) &&
- (0 != currency_round_unit.value) ) )
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Need non-zero value in section `TALER' under
`CURRENCY_ROUND_UNIT'\n");
- return GNUNET_SYSERR;
- }
-
if (NULL ==
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
{
@@ -205,16 +153,6 @@ parse_wirewatch_config (void)
"Failed to initialize DB subsystem\n");
return GNUNET_SYSERR;
}
- if (GNUNET_OK !=
- TALER_EXCHANGEDB_load_accounts (cfg,
- TALER_EXCHANGEDB_ALO_DEBIT))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "No wire accounts configured for debit!\n");
- TALER_EXCHANGEDB_plugin_unload (db_plugin);
- db_plugin = NULL;
- return GNUNET_SYSERR;
- }
return GNUNET_OK;
}
@@ -251,11 +189,11 @@ release_shard (struct Shard *s)
{
enum GNUNET_DB_QueryStatus qs;
- qs = db_plugin->release_revolving_shard (
+ qs = db_plugin->complete_shard (
db_plugin->cls,
- "router",
- s->shard_start,
- s->shard_end);
+ "expire",
+ s->shard_start.abs_value_us,
+ s->shard_end.abs_value_us);
GNUNET_free (s);
switch (qs)
{
@@ -269,25 +207,112 @@ release_shard (struct Shard *s)
/* Strange, but let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Purse expiration shard completed with %llu purses\n",
+ (unsigned long long) s->work_counter);
/* normal case */
break;
}
}
+/**
+ * Release lock on shard @a s in the database due to an abort of the
+ * operation. On error, terminates this process.
+ *
+ * @param[in] s shard to free (and memory to release)
+ */
static void
-run_routing (void *cls)
+abort_shard (struct Shard *s)
+{
+ enum GNUNET_DB_QueryStatus qs;
+
+ qs = db_plugin->abort_shard (db_plugin->cls,
+ "expire",
+ s->shard_start.abs_value_us,
+ s->shard_end.abs_value_us);
+ if (0 >= qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to abort shard (%d)!\n",
+ qs);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+}
+
+
+/**
+ * Main function that processes the work in one shard.
+ *
+ * @param[in] cls a `struct Shard` to process
+ */
+static void
+run_expire (void *cls)
{
struct Shard *s = cls;
+ enum GNUNET_DB_QueryStatus qs;
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Checking for ready P2P transfers to route\n");
- // FIXME: do actual work here!
- commit_or_warn ();
- release_shard (s);
- task = GNUNET_SCHEDULER_add_now (&run_shard,
- NULL);
+ "Checking for expired purses\n");
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (db_plugin->start (db_plugin->cls,
+ "expire-purse"))
+ {
+ global_ret = EXIT_FAILURE;
+ db_plugin->rollback (db_plugin->cls);
+ abort_shard (s);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ qs = db_plugin->expire_purse (db_plugin->cls,
+ s->shard_start,
+ s->shard_end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ db_plugin->rollback (db_plugin->cls);
+ abort_shard (s);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ db_plugin->rollback (db_plugin->cls);
+ abort_shard (s);
+ task = GNUNET_SCHEDULER_add_now (&run_shard,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ if (0 > commit_or_warn ())
+ {
+ db_plugin->rollback (db_plugin->cls);
+ abort_shard (s);
+ }
+ else
+ {
+ release_shard (s);
+ }
+ task = GNUNET_SCHEDULER_add_now (&run_shard,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* commit, and go again immediately */
+ s->work_counter++;
+ (void) commit_or_warn ();
+ task = GNUNET_SCHEDULER_add_now (&run_expire,
+ s);
+ }
}
@@ -315,12 +340,12 @@ run_shard (void *cls)
}
s = GNUNET_new (struct Shard);
s->start_time = GNUNET_TIME_timestamp_get ();
- qs = db_plugin->begin_revolving_shard (db_plugin->cls,
- "router",
- shard_size,
- 1U + INT32_MAX,
- &s->shard_start,
- &s->shard_end);
+ qs = db_plugin->begin_shard (db_plugin->cls,
+ "expire",
+ shard_size,
+ shard_size.rel_value_us,
+ &s->shard_start.abs_value_us,
+ &s->shard_end.abs_value_us);
if (0 >= qs)
{
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -343,11 +368,24 @@ run_shard (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
+ if (GNUNET_TIME_absolute_is_future (s->shard_end))
+ {
+ task = GNUNET_SCHEDULER_add_at (s->shard_end,
+ &run_shard,
+ NULL);
+ abort_shard (s);
+ return;
+ }
+ /* If this is a first-time run, we immediately
+ try to catch up with the present */
+ if (GNUNET_TIME_absolute_is_zero (s->shard_start))
+ s->shard_end = GNUNET_TIME_absolute_get ();
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Starting shard [%u:%u]!\n",
- (unsigned int) s->shard_start,
- (unsigned int) s->shard_end);
- task = GNUNET_SCHEDULER_add_now (&run_routing,
+ "Starting shard [%llu:%llu]!\n",
+ (unsigned long long) s->shard_start.abs_value_us,
+ (unsigned long long) s->shard_end.abs_value_us);
+ task = GNUNET_SCHEDULER_add_now (&run_expire,
s);
}
@@ -366,33 +404,27 @@ run (void *cls,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *c)
{
- unsigned long long ass;
(void) cls;
(void) args;
(void) cfgfile;
cfg = c;
- if (GNUNET_OK != parse_wirewatch_config ())
+ if (GNUNET_OK != parse_expire_config ())
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
return;
}
if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_number (cfg,
- "exchange",
- "ROUTER_SHARD_SIZE",
- &ass))
+ GNUNET_CONFIGURATION_get_value_time (cfg,
+ "exchange",
+ "EXPIRE_SHARD_SIZE",
+ &shard_size))
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
return;
}
- if ( (0 == ass) ||
- (ass > INT32_MAX) )
- shard_size = 1U + INT32_MAX;
- else
- shard_size = (uint32_t) ass;
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_shard,
NULL);
@@ -402,7 +434,7 @@ run (void *cls,
/**
- * The main function of the taler-exchange-router.
+ * The main function of the taler-exchange-expire.
*
* @param argc number of arguments from the command line
* @param argv command line arguments
@@ -419,10 +451,6 @@ main (int argc,
"test",
"run in test mode and exit when idle",
&test_mode),
- GNUNET_GETOPT_option_flag ('y',
- "kyc-off",
- "perform wire transfers without KYC checks",
- &kyc_off),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;
@@ -434,9 +462,9 @@ main (int argc,
TALER_OS_init ();
ret = GNUNET_PROGRAM_run (
argc, argv,
- "taler-exchange-router",
+ "taler-exchange-expire",
gettext_noop (
- "background process that routes P2P transfers"),
+ "background process that expires purses"),
options,
&run, NULL);
GNUNET_free_nz ((void *) argv);
@@ -448,4 +476,4 @@ main (int argc,
}
-/* end of taler-exchange-router.c */
+/* end of taler-exchange-expire.c */
diff --git a/src/exchange/taler-exchange-router.c
b/src/exchange/taler-exchange-router.c
index ca4499e3..0816dfdb 100644
--- a/src/exchange/taler-exchange-router.c
+++ b/src/exchange/taler-exchange-router.c
@@ -17,8 +17,7 @@
/**
* @file taler-exchange-router.c
* @brief Process that routes P2P payments. Responsible for
- * (1) refunding coins in unmerged purses, (2) merging purses into local
reserves;
- * (3) aggregating remote payments into the respective wad transfers.
+ * aggregating remote payments into the respective wad transfers.
* Execution of actual wad transfers is still to be done by
taler-exchange-transfer,
* and watching for incoming wad transfers is done by
taler-exchange-wirewatch.
* @author Christian Grothoff
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c
b/src/exchangedb/plugin_exchangedb_postgres.c
index 4d5efb9c..ab282f4f 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -13554,6 +13554,25 @@ postgres_insert_purse_request (
}
+/**
+ * Function called to clean up one expired purse.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param start_time select purse expired after this time
+ * @param end_time select purse expired before this time
+ * @return transaction status code (#GNUNET_DB_STATUS_SUCCESS_NO_RESULTS if no
purse expired in the given time interval).
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_expire_purse (
+ void *cls,
+ struct GNUNET_TIME_Absolute start_time,
+ struct GNUNET_TIME_Absolute end_time)
+{
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+}
+
+
/**
* Function called to obtain information about a purse.
*
@@ -14283,6 +14302,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
= &postgres_insert_purse_request;
plugin->select_purse_request
= &postgres_select_purse_request;
+ plugin->expire_purse
+ = &postgres_expire_purse;
plugin->select_purse
= &postgres_select_purse;
plugin->select_purse_by_merge_pub
diff --git a/src/include/taler_exchangedb_plugin.h
b/src/include/taler_exchangedb_plugin.h
index 52e684f6..213fe114 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -4603,6 +4603,21 @@ struct TALER_EXCHANGEDB_Plugin
bool *in_conflict);
+ /**
+ * Function called to clean up one expired purse.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param start_time select purse expired after this time
+ * @param end_time select purse expired before this time
+ * @return transaction status code (#GNUNET_DB_STATUS_SUCCESS_NO_RESULTS if
no purse expired in the given time interval).
+ */
+ enum GNUNET_DB_QueryStatus
+ (*expire_purse)(
+ void *cls,
+ struct GNUNET_TIME_Absolute start_time,
+ struct GNUNET_TIME_Absolute end_time);
+
+
/**
* Function called to obtain information about a purse.
*
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [taler-exchange] branch master updated: -add skeleton logic for purse expiration,
gnunet <=