gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: towards removing tiny bit


From: gnunet
Subject: [taler-exchange] branch master updated: towards removing tiny bit
Date: Sun, 27 Mar 2022 10:32:36 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new d0a69da8 towards removing tiny bit
d0a69da8 is described below

commit d0a69da8954fd72f361795c2e007bad3fe5accd1
Author: Christian Grothoff <grothoff@gnunet.org>
AuthorDate: Sun Mar 27 10:32:28 2022 +0200

    towards removing tiny bit
---
 src/exchangedb/drop0001.sql                 |   2 +-
 src/exchangedb/exchange-0001.sql            |  69 +++++-
 src/exchangedb/plugin_exchangedb_postgres.c | 360 +++++++++++++++++++++++++++-
 src/exchangedb/test_exchangedb.c            | 166 +++++++------
 src/include/taler_exchangedb_plugin.h       |  92 +++++++
 5 files changed, 582 insertions(+), 107 deletions(-)

diff --git a/src/exchangedb/drop0001.sql b/src/exchangedb/drop0001.sql
index 225c817a..3f43a569 100644
--- a/src/exchangedb/drop0001.sql
+++ b/src/exchangedb/drop0001.sql
@@ -82,9 +82,9 @@ DROP TABLE IF EXISTS denominations CASCADE;
 DROP TABLE IF EXISTS cs_nonce_locks CASCADE;
 DROP FUNCTION IF EXISTS add_constraints_to_cs_nonce_locks_partition;
 
-DROP TABLE IF EXISTS deposits_by_coin CASCADE;
 DROP TABLE IF EXISTS global_fee CASCADE;
 DROP TABLE IF EXISTS recoup_by_reserve CASCADE;
+DROP TABLE IF EXISTS aggregation_transient CASCADE;
 
 
 DROP TABLE IF EXISTS partners CASCADE;
diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001.sql
index b2fb52ac..e6902ed1 100644
--- a/src/exchangedb/exchange-0001.sql
+++ b/src/exchangedb/exchange-0001.sql
@@ -772,7 +772,7 @@ CREATE TABLE IF NOT EXISTS deposits_by_ready_default
 
 CREATE TABLE IF NOT EXISTS deposits_for_matching
   (refund_deadline INT8 NOT NULL
-  ,shard INT8 NOT NULL
+  ,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)
   ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins 
(coin_pub) ON DELETE CASCADE
   ,deposit_serial_id INT8
   )
@@ -782,7 +782,7 @@ COMMENT ON TABLE deposits_for_matching
 
 CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index
   ON deposits_for_matching
-  (refund_deadline ASC, shard, coin_pub);
+  (refund_deadline ASC, merchant_pub, coin_pub);
 
 CREATE TABLE IF NOT EXISTS deposits_for_matching_default
   PARTITION OF deposits_for_matching
@@ -818,12 +818,12 @@ BEGIN
   THEN
     INSERT INTO deposits_for_matching
       (refund_deadline
-      ,shard
+      ,merchant_pub
       ,coin_pub
       ,deposit_serial_id)
     VALUES
       (NEW.refund_deadline
-      ,NEW.shard
+      ,NEW.merchant_pub
       ,NEW.coin_pub
       ,NEW.deposit_serial_id);
   END IF;
@@ -866,7 +866,7 @@ BEGIN
   THEN
     DELETE FROM deposits_for_matching
      WHERE refund_deadline = OLD.refund_deadline
-       AND shard = OLD.shard
+       AND merchant_pub = OLD.merchant_pub
        AND coin_pub = OLD.coin_pub
        AND deposit_serial_id = OLD.deposit_serial_id;
   END IF;
@@ -887,12 +887,12 @@ BEGIN
   THEN
     INSERT INTO deposits_for_matching
       (refund_deadline
-      ,shard
+      ,merchant_pub
       ,coin_pub
       ,deposit_serial_id)
     VALUES
       (NEW.refund_deadline
-      ,NEW.shard
+      ,NEW.merchant_pub
       ,NEW.coin_pub
       ,NEW.deposit_serial_id);
   END IF;
@@ -930,7 +930,7 @@ BEGIN
   THEN
     DELETE FROM deposits_for_matching
      WHERE refund_deadline = OLD.refund_deadline
-       AND shard = OLD.shard
+       AND merchant_pub = OLD.merchant_pub
        AND coin_pub = OLD.coin_pub
        AND deposit_serial_id = OLD.deposit_serial_id;
   END IF;
@@ -1040,21 +1040,64 @@ $$;
 
 SELECT add_constraints_to_wire_out_partition('default');
 
+CREATE OR REPLACE FUNCTION wire_out_delete_trigger()
+  RETURNS trigger
+  LANGUAGE plpgsql
+  AS $$
+BEGIN
+  DELETE FROM aggregation_tracking
+   WHERE wtid_raw = OLD.wtid_raw;
+  RETURN OLD;
+END $$;
+COMMENT ON FUNCTION wire_out_delete_trigger()
+  IS 'Replicate reserve_out deletions into aggregation_tracking. This replaces 
an earlier use of an ON DELETE CASCADE that required a DEFERRABLE constraint 
and conflicted with nice partitioning.';
+
+CREATE TRIGGER wire_out_on_delete
+  AFTER DELETE
+    ON wire_out
+   FOR EACH ROW EXECUTE FUNCTION wire_out_delete_trigger();
+
+
+
+-- ------------------------------ aggregation_transient 
----------------------------------------
+
+-- Note: this table is not yet used; it is designed
+-- to allow us to get rid of the 'tiny BOOL' and
+-- the associated need to look at tiny
+-- deposits repeatedly.
+CREATE TABLE IF NOT EXISTS aggregation_transient
+  (amount_val INT8 NOT NULL
+  ,amount_frac INT4 NOT NULL
+  ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)
+  ,exchange_account_section TEXT NOT NULL
+  ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)
+  )
+  PARTITION BY HASH (wire_target_h_payto);
+COMMENT ON TABLE aggregation_transient
+  IS 'aggregations currently happening (lacking wire_out, usually because the 
amount is too low); this table is not replicated';
+COMMENT ON COLUMN aggregation_transient.amount_val
+  IS 'Sum of all of the aggregated deposits (without deposit fees)';
+COMMENT ON COLUMN aggregation_transient.wtid_raw
+  IS 'identifier of the wire transfer';
+
+CREATE TABLE IF NOT EXISTS aggregation_transient_default
+  PARTITION OF aggregation_transient
+  FOR VALUES WITH (MODULUS 1, REMAINDER 0);
+
+
 
 -- ------------------------------ aggregation_tracking 
----------------------------------------
 
--- FIXME-URGENT: add colum coin_pub to select by coin_pub + deposit_serial_id 
for more efficient deposit lookup!?
--- Or which direction(s) is this table used? Is the partitioning sane??
 CREATE TABLE IF NOT EXISTS aggregation_tracking
   (aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE
   ,deposit_serial_id INT8 PRIMARY KEY -- REFERENCES deposits 
(deposit_serial_id) ON DELETE CASCADE
-  ,wtid_raw BYTEA NOT NULL CONSTRAINT wire_out_ref REFERENCES 
wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE
+  ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)
   )
   PARTITION BY HASH (deposit_serial_id);
 COMMENT ON TABLE aggregation_tracking
   IS 'mapping from wire transfer identifiers (WTID) to deposits (and back)';
 COMMENT ON COLUMN aggregation_tracking.wtid_raw
-  IS 'We first create entries in the aggregation_tracking table and then 
finally the wire_out entry once we know the total amount. Hence the constraint 
must be deferrable and we cannot use a wireout_uuid here, because we do not 
have it when these rows are created. Changing the logic to first INSERT a dummy 
row into wire_out and then UPDATEing that row in the same transaction would 
theoretically reduce per-deposit storage costs by 5 percent (24/~460 bytes).';
+  IS 'identifier of the wire transfer';
 
 CREATE TABLE IF NOT EXISTS aggregation_tracking_default
   PARTITION OF aggregation_tracking
@@ -1070,7 +1113,7 @@ BEGIN
   EXECUTE FORMAT (
     'ALTER TABLE aggregation_tracking_' || partition_suffix || ' '
       'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || 
'_aggregation_serial_id_key '
-        'UNIQUE (aggregation_serial_id) '
+        'UNIQUE (aggregation_serial_id);'
   );
 END
 $$;
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c 
b/src/exchangedb/plugin_exchangedb_postgres.c
index c7bdae39..1709f17e 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1188,7 +1188,7 @@ prepare_statements (struct PostgresClosure *pg)
       "  ,dbr.shard ASC"
       " LIMIT 1;",
       4),
-    /* Used in #postgres_iterate_matching_deposits() */
+    /* FIXME: deprecated; Used in #postgres_iterate_matching_deposits() */
     GNUNET_PQ_make_prepare (
       "deposits_iterate_matching",
       "SELECT"
@@ -1207,14 +1207,115 @@ prepare_statements (struct PostgresClosure *pg)
       "    JOIN denominations denom"
       "      USING (denominations_serial)"
       " WHERE dfm.refund_deadline<$3"
-      "  AND dfm.shard=$4"
+      "  AND dfm.merchant_pub=$1"
       "  AND dep.merchant_pub=$1"
       "  AND dep.wire_target_h_payto=$2"
       " LIMIT "
       TALER_QUOTE (
         TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";",
+      3),
+
+    /* Used in #postgres_aggregate() */
+    GNUNET_PQ_make_prepare (
+      "aggregate",
+      "WITH rdy AS (" /* find deposits ready */
+      "  SELECT"
+      "    coin_pub"
+      "    FROM deposits_for_matching"
+      "    WHERE refund_deadline<$1"
+      "      AND merchant_pub=$2"
+      "    ORDER BY refund_deadline ASC" /* ordering is not critical */
+      "    LIMIT "
+      TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
+      " )"
+      " ,dep AS (" /* restrict to our merchant and account */
+      "  UPDATE deposits"
+      "     SET done=TRUE"
+      "   WHERE coin_pub IN (SELECT coin_pub FROM rdy)"
+      "     AND merchant_pub=$2"
+      "     AND wire_target_h_payto=$3"
+      "   RETURNING"
+      "     deposit_serial_id"
+      "    ,coin_pub"
+      "    ,amount_with_fee_val AS amount_val"
+      "    ,amount_with_fee_frac AS amount_frac)"
+      " ,ref AS (" /* find applicable refunds */
+      "  SELECT"
+      "    amount_with_fee_val AS refund_val"
+      "   ,amount_with_fee_frac AS refund_frac"
+      "   ,coin_pub"
+      "    FROM refunds"
+      "   WHERE coin_pub IN (SELECT coin_pub FROM dep)"
+      "     AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))"
+      " ,fees AS (" /* find deposit fees for non-refunded deposits */
+      "  SELECT"
+      "    denom.fee_deposit_val AS fee_val"
+      "   ,denom.fee_deposit_frac AS fee_frac"
+      "    FROM known_coins kc"
+      "    JOIN denominations denom"
+      "      USING (denominations_serial)"
+      "    WHERE coin_pub IN (SELECT coin_pub FROM dep)"
+      "      AND coin_pub NOT IN (SELECT coin_pub FROM ref))"
+      " ,dummy AS (" /* add deposits to aggregation_tracking */
+      "    INSERT INTO aggregation_tracking"
+      "    (deposit_serial_id"
+      "    ,wtid_raw)"
+      "    SELECT deposit_serial_id,$4"
+      "      FROM dep)"
+      "SELECT" /* calculate totals (deposits, refunds and fees) */
+      "  CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value"
+      " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction"
+      " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value"
+      " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction"
+      " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value"
+      " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction"
+      " FROM dep "
+      "   FULL OUTER JOIN ref ON (FALSE)"
+      "   FULL OUTER JOIN fees ON (FALSE);",
+      4),
+
+
+    /* Used in #postgres_create_aggregation_transient() */
+    GNUNET_PQ_make_prepare (
+      "create_aggregation_transient",
+      "INSERT INTO aggregation_transient"
+      " (amount_val"
+      " ,amount_frac"
+      " ,wire_target_h_payto"
+      " ,exchange_account_section"
+      " ,wtid_raw)"
+      " VALUES ($1, $2, $3, $4, $5);",
+      5),
+    /* Used in #postgres_select_aggregation_transient() */
+    GNUNET_PQ_make_prepare (
+      "select_aggregation_transient",
+      "SELECT"
+      "  amount_val"
+      " ,amount_frac"
+      " ,wtid_raw"
+      " FROM aggregation_transient"
+      " WHERE wire_target_h_payto=$1"
+      "   AND exchange_account_section=$2;",
+      2),
+    /* Used in #postgres_update_aggregation_transient() */
+    GNUNET_PQ_make_prepare (
+      "update_aggregation_transient",
+      "UPDATE aggregation_transient"
+      " SET amount_val=$1"
+      "    ,amount_frac=$2"
+      " WHERE wire_target_h_payto=$3"
+      "   AND wtid_raw=$4",
       4),
-    /* Used in #postgres_mark_deposit_tiny() */
+    /* Used in #postgres_delete_aggregation_transient() */
+    GNUNET_PQ_make_prepare (
+      "delete_aggregation_transient",
+      "DELETE FROM aggregation_transient"
+      " WHERE wire_target_h_payto=$1"
+      "   AND wtid_raw=$2",
+      2),
+
+
+    /* FIXME-deprecated: Used in #postgres_mark_deposit_tiny() */
     GNUNET_PQ_make_prepare (
       "mark_deposit_tiny",
       "UPDATE deposits"
@@ -1222,7 +1323,7 @@ prepare_statements (struct PostgresClosure *pg)
       " WHERE coin_pub=$1"
       "   AND deposit_serial_id=$2",
       2),
-    /* Used in #postgres_mark_deposit_done() */
+    /* FIXME-deprecated: Used in #postgres_mark_deposit_done() */
     GNUNET_PQ_make_prepare (
       "mark_deposit_done",
       "UPDATE deposits"
@@ -1230,6 +1331,7 @@ prepare_statements (struct PostgresClosure *pg)
       " WHERE coin_pub=$1"
       "   AND deposit_serial_id=$2;",
       2),
+
     /* Used in #postgres_get_coin_transactions() to obtain information
        about how a coin has been spend with /deposit requests. */
     GNUNET_PQ_make_prepare (
@@ -2835,8 +2937,8 @@ prepare_statements (struct PostgresClosure *pg)
     GNUNET_PQ_make_prepare (
       "insert_into_table_refunds",
       "INSERT INTO refunds"
-      "(coin_pub"
-      ",refund_serial_id"
+      "(refund_serial_id"
+      ",coin_pub"
       ",merchant_sig"
       ",rtransaction_id"
       ",amount_with_fee_val"
@@ -5868,6 +5970,240 @@ postgres_have_deposit2 (
 }
 
 
+/**
+ * Aggregate all matching deposits for @a h_payto and
+ * @a merchant_pub, returning the total amounts.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param merchant_pub public key of the merchant
+ * @param wtid wire transfer ID to set for the aggregate
+ * @param[out] total set to the sum of the total deposits minus applicable 
deposit fees and refunds
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_aggregate (
+  void *cls,
+  const struct TALER_PaytoHashP *h_payto,
+  const struct TALER_MerchantPublicKeyP *merchant_pub,
+  const struct TALER_WireTransferIdentifierRawP *wtid,
+  struct TALER_Amount *total)
+{
+  struct PostgresClosure *pg = cls;
+  struct GNUNET_TIME_Absolute now = {0};
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_absolute_time (&now),
+    GNUNET_PQ_query_param_auto_from_type (merchant_pub),
+    GNUNET_PQ_query_param_auto_from_type (h_payto),
+    GNUNET_PQ_query_param_auto_from_type (wtid),
+    GNUNET_PQ_query_param_end
+  };
+  uint64_t sum_deposit_value;
+  uint64_t sum_deposit_frac;
+  uint64_t sum_refund_value;
+  uint64_t sum_refund_frac;
+  uint64_t sum_fee_value;
+  uint64_t sum_fee_frac;
+  struct GNUNET_PQ_ResultSpec rs[] = {
+    GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
+                                  &sum_deposit_value),
+    GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
+                                  &sum_deposit_frac),
+    GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
+                                  &sum_refund_value),
+    GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
+                                  &sum_refund_frac),
+    GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
+                                  &sum_fee_value),
+    GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
+                                  &sum_fee_frac),
+    GNUNET_PQ_result_spec_end
+  };
+  enum GNUNET_DB_QueryStatus qs;
+  struct TALER_Amount sum_deposit;
+  struct TALER_Amount sum_refund;
+  struct TALER_Amount sum_fee;
+  struct TALER_Amount delta;
+
+  now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
+                                         pg->aggregator_shift);
+  qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+                                                 "aggregate",
+                                                 params,
+                                                 rs);
+  if (qs < 0)
+  {
+    GNUNET_assert (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+    return qs;
+  }
+  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+  {
+    GNUNET_assert (GNUNET_OK ==
+                   TALER_amount_set_zero (pg->currency,
+                                          total));
+    return qs;
+  }
+  GNUNET_assert (GNUNET_OK ==
+                 TALER_amount_set_zero (pg->currency,
+                                        &sum_deposit));
+  GNUNET_assert (GNUNET_OK ==
+                 TALER_amount_set_zero (pg->currency,
+                                        &sum_refund));
+  GNUNET_assert (GNUNET_OK ==
+                 TALER_amount_set_zero (pg->currency,
+                                        &sum_fee));
+  sum_deposit.value    = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
+                         + sum_deposit_value;
+  sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
+  sum_refund.value     = sum_refund_frac  / TALER_AMOUNT_FRAC_BASE
+                         + sum_refund_value;
+  sum_refund.fraction  = sum_refund_frac  % TALER_AMOUNT_FRAC_BASE;
+  sum_fee.value        = sum_fee_frac     / TALER_AMOUNT_FRAC_BASE
+                         + sum_fee_value;
+  sum_fee.fraction     = sum_fee_frac     % TALER_AMOUNT_FRAC_BASE; \
+  GNUNET_assert (0 <=
+                 TALER_amount_subtract (&delta,
+                                        &sum_deposit,
+                                        &sum_refund));
+  GNUNET_assert (0 <=
+                 TALER_amount_subtract (total,
+                                        &delta,
+                                        &sum_fee));
+  return qs;
+}
+
+
+/**
+ * Create a new entry in the transient aggregation table.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param exchange_account_section exchange account to use
+ * @param wtid the raw wire transfer identifier to be used
+ * @param total amount to be wired in the future
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_create_aggregation_transient (
+  void *cls,
+  const struct TALER_PaytoHashP *h_payto,
+  const char *exchange_account_section,
+  const struct TALER_WireTransferIdentifierRawP *wtid,
+  const struct TALER_Amount *total)
+{
+  struct PostgresClosure *pg = cls;
+  struct GNUNET_PQ_QueryParam params[] = {
+    TALER_PQ_query_param_amount (total),
+    GNUNET_PQ_query_param_auto_from_type (h_payto),
+    GNUNET_PQ_query_param_string (exchange_account_section),
+    GNUNET_PQ_query_param_auto_from_type (wtid),
+    GNUNET_PQ_query_param_end
+  };
+
+  return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+                                             "create_aggregation_transient",
+                                             params);
+}
+
+
+/**
+ * Find existing entry in the transient aggregation table.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param exchange_account_section exchange account to use
+ * @param[out] wtid set to the raw wire transfer identifier to be used
+ * @param[out] total existing amount to be wired in the future
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_select_aggregation_transient (
+  void *cls,
+  const struct TALER_PaytoHashP *h_payto,
+  const char *exchange_account_section,
+  struct TALER_WireTransferIdentifierRawP *wtid,
+  struct TALER_Amount *total)
+{
+  struct PostgresClosure *pg = cls;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_auto_from_type (h_payto),
+    GNUNET_PQ_query_param_string (exchange_account_section),
+    GNUNET_PQ_query_param_end
+  };
+  struct GNUNET_PQ_ResultSpec rs[] = {
+    TALER_PQ_RESULT_SPEC_AMOUNT ("amount",
+                                 total),
+    GNUNET_PQ_result_spec_auto_from_type ("wtid_raw",
+                                          wtid),
+    GNUNET_PQ_result_spec_end
+  };
+
+  return GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+                                                   
"select_aggregation_transient",
+                                                   params,
+                                                   rs);
+}
+
+
+/**
+ * Update existing entry in the transient aggregation table.
+ * @a h_payto is only needed for query performance.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param wtid the raw wire transfer identifier to update
+ * @param total new total amount to be wired in the future
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_update_aggregation_transient (
+  void *cls,
+  const struct TALER_PaytoHashP *h_payto,
+  const struct TALER_WireTransferIdentifierRawP *wtid,
+  const struct TALER_Amount *total)
+{
+  struct PostgresClosure *pg = cls;
+  struct GNUNET_PQ_QueryParam params[] = {
+    TALER_PQ_query_param_amount (total),
+    GNUNET_PQ_query_param_auto_from_type (h_payto),
+    GNUNET_PQ_query_param_auto_from_type (wtid),
+    GNUNET_PQ_query_param_end
+  };
+
+  return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+                                             "update_aggregation_transient",
+                                             params);
+}
+
+
+/**
+ * Delete existing entry in the transient aggregation table.
+ * @a h_payto is only needed for query performance.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param h_payto destination of the wire transfer
+ * @param wtid the raw wire transfer identifier to update
+ * @return transaction status
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_delete_aggregation_transient (
+  void *cls,
+  const struct TALER_PaytoHashP *h_payto,
+  const struct TALER_WireTransferIdentifierRawP *wtid)
+{
+  struct PostgresClosure *pg = cls;
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_auto_from_type (h_payto),
+    GNUNET_PQ_query_param_auto_from_type (wtid),
+    GNUNET_PQ_query_param_end
+  };
+
+  return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+                                             "delete_aggregation_transient",
+                                             params);
+}
+
+
 /**
  * Mark a deposit as tiny, thereby declaring that it cannot be
  * executed by itself and should no longer be returned by
@@ -6147,12 +6483,10 @@ postgres_iterate_matching_deposits (
 {
   struct PostgresClosure *pg = cls;
   struct GNUNET_TIME_Absolute now = {0};
-  uint64_t shard = compute_shard (merchant_pub);
   struct GNUNET_PQ_QueryParam params[] = {
     GNUNET_PQ_query_param_auto_from_type (merchant_pub),
     GNUNET_PQ_query_param_auto_from_type (h_payto),
     GNUNET_PQ_query_param_absolute_time (&now),
-    GNUNET_PQ_query_param_uint64 (&shard),
     GNUNET_PQ_query_param_end
   };
   struct MatchingDepositContext mdc = {
@@ -9299,7 +9633,6 @@ refunds_serial_helper_cb (void *cls,
   struct RefundsSerialContext *rsc = cls;
   struct PostgresClosure *pg = rsc->pg;
 
-  fprintf (stderr, "Got %u results\n", num_results);
   for (unsigned int i = 0; i<num_results; i++)
   {
     struct TALER_EXCHANGEDB_Refund refund;
@@ -13081,6 +13414,15 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
   plugin->get_known_coin = &postgres_get_known_coin;
   plugin->get_coin_denomination = &postgres_get_coin_denomination;
   plugin->have_deposit2 = &postgres_have_deposit2;
+  plugin->aggregate = &postgres_aggregate;
+  plugin->create_aggregation_transient
+    = &postgres_create_aggregation_transient;
+  plugin->select_aggregation_transient
+    = &postgres_select_aggregation_transient;
+  plugin->update_aggregation_transient
+    = &postgres_update_aggregation_transient;
+  plugin->delete_aggregation_transient
+    = &postgres_delete_aggregation_transient;
   plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny;
   plugin->mark_deposit_done = &postgres_mark_deposit_done;
   plugin->get_ready_deposit = &postgres_get_ready_deposit;
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index 79b09e0e..9e2e8a48 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -674,48 +674,6 @@ deposit_cb (void *cls,
 }
 
 
-/**
- * Function called with details about deposits that
- * have been made.  Called in the test on the
- * deposit given in @a cls.
- *
- * @param cls closure a `struct TALER_EXCHANGEDB_Deposit *`
- * @param rowid unique ID for the deposit in our DB, used for marking
- *              it as 'tiny' or 'done'
- * @param coin_pub public key of the coin
- * @param amount_with_fee amount that was deposited including fee
- * @param deposit_fee amount the exchange gets to keep as transaction fees
- * @param h_contract_terms hash of the proposal data known to merchant and 
customer
- * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to 
continue to iterate
- */
-static enum GNUNET_DB_QueryStatus
-matching_deposit_cb (void *cls,
-                     uint64_t rowid,
-                     const struct TALER_CoinSpendPublicKeyP *coin_pub,
-                     const struct TALER_Amount *amount_with_fee,
-                     const struct TALER_Amount *deposit_fee,
-                     const struct TALER_PrivateContractHashP *h_contract_terms)
-{
-  struct TALER_EXCHANGEDB_Deposit *deposit = cls;
-
-  deposit_rowid = rowid;
-  if ( (0 != TALER_amount_cmp (amount_with_fee,
-                               &deposit->amount_with_fee)) ||
-       (0 != TALER_amount_cmp (deposit_fee,
-                               &deposit->deposit_fee)) ||
-       (0 != GNUNET_memcmp (h_contract_terms,
-                            &deposit->h_contract_terms)) ||
-       (0 != GNUNET_memcmp (coin_pub,
-                            &deposit->coin.coin_pub)) )
-  {
-    GNUNET_break (0);
-    return GNUNET_DB_STATUS_HARD_ERROR;
-  }
-
-  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-}
-
-
 /**
  * Callback for #select_deposits_above_serial_id ()
  *
@@ -1055,7 +1013,7 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit 
*deposit)
                     &h_payto);
   auditor_row_cnt = 0;
   memset (&wire_out_wtid,
-          42,
+          41,
           sizeof (wire_out_wtid));
   wire_out_date = GNUNET_TIME_timestamp_get ();
   GNUNET_assert (GNUNET_OK ==
@@ -1109,14 +1067,6 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit 
*deposit)
                                                 &coin_fee2,
                                                 &kyc));
   }
-  /* insert WT data */
-  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
-          plugin->insert_aggregation_tracking (plugin->cls,
-                                               &wire_out_wtid,
-                                               deposit_rowid));
-
-  /* Now let's fix the transient constraint violation by
-     putting in the WTID into the wire_out table */
   {
     struct TALER_ReservePublicKeyP rpub;
     struct TALER_EXCHANGEDB_KycStatus kyc;
@@ -2270,44 +2220,92 @@ run (void *cls)
                                      &deposit_cb,
                                      &deposit));
   FAILIF (8 == result);
-  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
-          plugin->iterate_matching_deposits (plugin->cls,
-                                             &wire_target_h_payto,
-                                             &deposit.merchant_pub,
-                                             &matching_deposit_cb,
-                                             &deposit,
-                                             2));
+  {
+    struct TALER_Amount total;
+    struct TALER_WireTransferIdentifierRawP wtid;
+
+    memset (&wtid,
+            41,
+            sizeof (wtid));
+    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+            plugin->aggregate (plugin->cls,
+                               &wire_target_h_payto,
+                               &deposit.merchant_pub,
+                               &wtid,
+                               &total));
+  }
   FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->commit (plugin->cls));
-  FAILIF (GNUNET_OK !=
-          plugin->start (plugin->cls,
-                         "test-2"));
-  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
-          plugin->mark_deposit_tiny (plugin->cls,
-                                     &deposit.coin.coin_pub,
-                                     deposit_rowid));
-  FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
-          plugin->get_ready_deposit (plugin->cls,
-                                     0,
-                                     INT32_MAX,
-                                     true,
-                                     &deposit_cb,
-                                     &deposit));
-  plugin->rollback (plugin->cls);
-  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
-          plugin->get_ready_deposit (plugin->cls,
-                                     0,
-                                     INT32_MAX,
-                                     true,
-                                     &deposit_cb,
-                                     &deposit));
   FAILIF (GNUNET_OK !=
           plugin->start (plugin->cls,
                          "test-3"));
-  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
-          plugin->mark_deposit_done (plugin->cls,
-                                     &deposit.coin.coin_pub,
-                                     deposit_rowid));
+  {
+    struct TALER_WireTransferIdentifierRawP wtid;
+    struct TALER_Amount total;
+    struct TALER_WireTransferIdentifierRawP wtid2;
+    struct TALER_Amount total2;
+
+    memset (&wtid,
+            42,
+            sizeof (wtid));
+    GNUNET_assert (GNUNET_OK ==
+                   TALER_string_to_amount (CURRENCY ":42",
+                                           &total));
+    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
+            plugin->select_aggregation_transient (plugin->cls,
+                                                  &wire_target_h_payto,
+                                                  "x-bank",
+                                                  &wtid2,
+                                                  &total2));
+    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+            plugin->create_aggregation_transient (plugin->cls,
+                                                  &wire_target_h_payto,
+                                                  "x-bank",
+                                                  &wtid,
+                                                  &total));
+    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+            plugin->select_aggregation_transient (plugin->cls,
+                                                  &wire_target_h_payto,
+                                                  "x-bank",
+                                                  &wtid2,
+                                                  &total2));
+    FAILIF (0 !=
+            GNUNET_memcmp (&wtid2,
+                           &wtid));
+    FAILIF (0 !=
+            TALER_amount_cmp (&total2,
+                              &total));
+    GNUNET_assert (GNUNET_OK ==
+                   TALER_string_to_amount (CURRENCY ":43",
+                                           &total));
+    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+            plugin->update_aggregation_transient (plugin->cls,
+                                                  &wire_target_h_payto,
+                                                  &wtid,
+                                                  &total));
+    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+            plugin->select_aggregation_transient (plugin->cls,
+                                                  &wire_target_h_payto,
+                                                  "x-bank",
+                                                  &wtid2,
+                                                  &total2));
+    FAILIF (0 !=
+            GNUNET_memcmp (&wtid2,
+                           &wtid));
+    FAILIF (0 !=
+            TALER_amount_cmp (&total2,
+                              &total));
+    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
+            plugin->delete_aggregation_transient (plugin->cls,
+                                                  &wire_target_h_payto,
+                                                  &wtid));
+    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
+            plugin->select_aggregation_transient (plugin->cls,
+                                                  &wire_target_h_payto,
+                                                  "x-bank",
+                                                  &wtid2,
+                                                  &total2));
+  }
   FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->commit (plugin->cls));
 
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index b2ea240e..4ca6905e 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -3145,6 +3145,98 @@ struct TALER_EXCHANGEDB_Plugin
     uint32_t limit);
 
 
+  /**
+   * Aggregate all matching deposits for @a h_payto and
+   * @a merchant_pub, returning the total amounts.
+   *
+   * @param cls the @e cls of this struct with the plugin-specific state
+   * @param h_payto destination of the wire transfer
+   * @param merchant_pub public key of the merchant
+   * @param wtid wire transfer ID to set for the aggregate
+   * @param[out] total set to the sum of the total deposits minus applicable 
deposit fees and refunds
+   * @return transaction status
+   */
+  enum GNUNET_DB_QueryStatus
+  (*aggregate)(
+    void *cls,
+    const struct TALER_PaytoHashP *h_payto,
+    const struct TALER_MerchantPublicKeyP *merchant_pub,
+    const struct TALER_WireTransferIdentifierRawP *wtid,
+    struct TALER_Amount *total);
+
+
+  /**
+   * Create a new entry in the transient aggregation table.
+   *
+   * @param cls the @e cls of this struct with the plugin-specific state
+   * @param h_payto destination of the wire transfer
+   * @param exchange_account_section exchange account to use
+   * @param wtid the raw wire transfer identifier to be used
+   * @param total amount to be wired in the future
+   * @return transaction status
+   */
+  enum GNUNET_DB_QueryStatus
+  (*create_aggregation_transient)(
+    void *cls,
+    const struct TALER_PaytoHashP *h_payto,
+    const char *exchange_account_section,
+    const struct TALER_WireTransferIdentifierRawP *wtid,
+    const struct TALER_Amount *total);
+
+
+  /**
+   * Find existing entry in the transient aggregation table.
+   *
+   * @param cls the @e cls of this struct with the plugin-specific state
+   * @param h_payto destination of the wire transfer
+   * @param exchange_account_section exchange account to use
+   * @param[out] wtid set to the raw wire transfer identifier to be used
+   * @param[out] total existing amount to be wired in the future
+   * @return transaction status
+   */
+  enum GNUNET_DB_QueryStatus
+  (*select_aggregation_transient)(
+    void *cls,
+    const struct TALER_PaytoHashP *h_payto,
+    const char *exchange_account_section,
+    struct TALER_WireTransferIdentifierRawP *wtid,
+    struct TALER_Amount *total);
+
+
+  /**
+   * Update existing entry in the transient aggregation table.
+   * @a h_payto is only needed for query performance.
+   *
+   * @param cls the @e cls of this struct with the plugin-specific state
+   * @param h_payto destination of the wire transfer
+   * @param wtid the raw wire transfer identifier to update
+   * @param total new total amount to be wired in the future
+   * @return transaction status
+   */
+  enum GNUNET_DB_QueryStatus
+  (*update_aggregation_transient)(
+    void *cls,
+    const struct TALER_PaytoHashP *h_payto,
+    const struct TALER_WireTransferIdentifierRawP *wtid,
+    const struct TALER_Amount *total);
+
+
+  /**
+   * Delete existing entry in the transient aggregation table.
+   * @a h_payto is only needed for query performance.
+   *
+   * @param cls the @e cls of this struct with the plugin-specific state
+   * @param h_payto destination of the wire transfer
+   * @param wtid the raw wire transfer identifier to update
+   * @return transaction status
+   */
+  enum GNUNET_DB_QueryStatus
+  (*delete_aggregation_transient)(
+    void *cls,
+    const struct TALER_PaytoHashP *h_payto,
+    const struct TALER_WireTransferIdentifierRawP *wtid);
+
+
   /**
    * Lookup melt commitment data under the given @a rc.
    *

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]