gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: rework deposits sharding, toward


From: gnunet
Subject: [taler-exchange] branch master updated: rework deposits sharding, towards making aggregator faster (not necessarily done)
Date: Thu, 24 Mar 2022 17:33:39 +0100

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new b856d56d rework deposits sharding, towards making aggregator faster 
(not necessarily done)
b856d56d is described below

commit b856d56d95f92eb9dedb0af49493350ea8ea2268
Author: Christian Grothoff <grothoff@gnunet.org>
AuthorDate: Thu Mar 24 17:33:29 2022 +0100

    rework deposits sharding, towards making aggregator faster (not necessarily 
done)
---
 src/exchange/taler-exchange-aggregator.c    |  83 +++++++--
 src/exchangedb/drop0001.sql                 |   8 +-
 src/exchangedb/exchange-0001.sql            | 266 +++++++++++++++++++---------
 src/exchangedb/plugin_exchangedb_postgres.c | 131 +++++++-------
 src/exchangedb/test_exchangedb.c            |   4 +-
 src/include/taler_exchangedb_plugin.h       |   8 +-
 6 files changed, 324 insertions(+), 176 deletions(-)

diff --git a/src/exchange/taler-exchange-aggregator.c 
b/src/exchange/taler-exchange-aggregator.c
index abab347f..c34d47f9 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -28,6 +28,18 @@
 #include "taler_json_lib.h"
 #include "taler_bank_service.h"
 
+struct AdditionalDeposit
+{
+  /**
+   * Public key of the coin.
+   */
+  struct TALER_CoinSpendPublicKeyP coin_pub;
+
+  /**
+   * Row of the deposit.
+   */
+  uint64_t row;
+};
 
 /**
  * Information about one aggregation process to be executed.  There is
@@ -42,6 +54,11 @@ struct AggregationUnit
    */
   struct TALER_MerchantPublicKeyP merchant_pub;
 
+  /**
+   * Public key of the coin.
+   */
+  struct TALER_CoinSpendPublicKeyP coin_pub;
+
   /**
    * Total amount to be transferred, before subtraction of @e fees.wire and 
rounding down.
    */
@@ -97,7 +114,8 @@ struct AggregationUnit
   /**
    * Array of row_ids from the aggregation.
    */
-  uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
+  struct AdditionalDeposit
+    additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
 
   /**
    * Offset specifying how many @e additional_rows are in use.
@@ -383,7 +401,8 @@ deposit_cb (void *cls,
   enum GNUNET_DB_QueryStatus qs;
 
   au->merchant_pub = *merchant_pub;
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+  au->coin_pub = *coin_pub;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Aggregator processing payment %s with amount %s\n",
               TALER_B2S (coin_pub),
               TALER_amount2s (amount_with_fee));
@@ -405,7 +424,7 @@ deposit_cb (void *cls,
   {
     struct TALER_Amount ntotal;
 
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "Non-refunded transaction, subtracting deposit fee %s\n",
                 TALER_amount2s (deposit_fee));
     if (0 >
@@ -428,6 +447,9 @@ deposit_cb (void *cls,
       au->total_amount = ntotal;
     }
   }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Amount after fee is %s\n",
+              TALER_amount2s (&au->total_amount));
 
   GNUNET_assert (NULL == au->payto_uri);
   au->payto_uri = GNUNET_strdup (payto_uri);
@@ -437,7 +459,7 @@ deposit_cb (void *cls,
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                               &au->wtid,
                               sizeof (au->wtid));
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Starting aggregation under H(WTID)=%s, starting amount %s at 
%llu\n",
               TALER_B2S (&au->wtid),
               TALER_amount2s (amount_with_fee),
@@ -493,7 +515,7 @@ deposit_cb (void *cls,
               "Aggregator marks deposit %llu as done\n",
               (unsigned long long) row_id);
   qs = db_plugin->mark_deposit_done (db_plugin->cls,
-                                     merchant_pub,
+                                     coin_pub,
                                      row_id);
   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
   {
@@ -528,6 +550,8 @@ aggregate_cb (void *cls,
   struct TALER_Amount old;
   enum GNUNET_DB_QueryStatus qs;
 
+  if (row_id == au->row_id)
+    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
   if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
   {
     /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT 
results! */
@@ -605,18 +629,29 @@ aggregate_cb (void *cls,
   }
 
   /* "append" to our list of rows */
-  au->additional_rows[au->rows_offset++] = row_id;
+  au->additional_rows[au->rows_offset].coin_pub = *coin_pub;
+  au->additional_rows[au->rows_offset].row = row_id;
+  au->rows_offset++;
   /* insert into aggregation tracking table */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Adding %llu to aggregate %s\n",
+              (unsigned long long) row_id,
+              TALER_B2S (&au->wtid));
   qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
                                                &au->wtid,
                                                row_id);
   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Failed to add %llu to aggregate %s: %d\n",
+                (unsigned long long) row_id,
+                TALER_B2S (&au->wtid),
+                qs);
     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     return qs;
   }
   qs = db_plugin->mark_deposit_done (db_plugin->cls,
-                                     &au->merchant_pub,
+                                     coin_pub,
                                      row_id);
   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
   {
@@ -775,7 +810,7 @@ run_aggregation (void *cls)
   }
 
   /* Now try to find other deposits to aggregate */
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Found ready deposit for %s, aggregating by target %llu\n",
               TALER_B2S (&au_active.merchant_pub),
               (unsigned long long) au_active.wire_target);
@@ -808,13 +843,17 @@ run_aggregation (void *cls)
                                      s);
     return;
   }
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Found %d other deposits to combine into wire transfer.\n",
-              qs);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Found %d other deposits to combine into wire transfer with fee 
%s.\n",
+              qs,
+              TALER_amount2s (&au_active.fees.wire));
 
   /* Subtract wire transfer fee and round to the unit supported by the
      wire transfer method; Check if after rounding down, we still have
      an amount to transfer, and if not mark as 'tiny'. */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Rounding aggregate of %s\n",
+              TALER_amount2s (&au_active.total_amount));
   if ( (0 >=
         TALER_amount_subtract (&au_active.final_amount,
                                &au_active.total_amount,
@@ -822,8 +861,7 @@ run_aggregation (void *cls)
        (GNUNET_SYSERR ==
         TALER_amount_round_down (&au_active.final_amount,
                                  &currency_round_unit)) ||
-       ( (0 == au_active.final_amount.value) &&
-         (0 == au_active.final_amount.fraction) ) )
+       (TALER_amount_is_zero (&au_active.final_amount)) )
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "Aggregate value too low for transfer (%d/%s)\n",
@@ -848,23 +886,29 @@ run_aggregation (void *cls)
       return;
     }
     /* Mark transactions by row_id as minor */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Marking %s (%llu) as tiny\n",
+                TALER_B2S (&au_active.coin_pub),
+                (unsigned long long) au_active.row_id);
     qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
-                                       &au_active.merchant_pub,
+                                       &au_active.coin_pub,
                                        au_active.row_id);
-    if (0 <= qs)
+    if (0 < qs)
     {
       for (unsigned int i = 0; i<au_active.rows_offset; i++)
       {
         qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
-                                           &au_active.merchant_pub,
-                                           au_active.additional_rows[i]);
-        if (0 > qs)
+                                           &au_active.additional_rows[i].
+                                           coin_pub,
+                                           au_active.additional_rows[i].row);
+        if (0 >= qs)
           break;
       }
     }
+    GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs);
     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                   "Serialization issue, trying again later!\n");
       db_plugin->rollback (db_plugin->cls);
       cleanup_au (&au_active);
@@ -876,6 +920,7 @@ run_aggregation (void *cls)
     }
     if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     {
+      GNUNET_break (0);
       db_plugin->rollback (db_plugin->cls);
       cleanup_au (&au_active);
       global_ret = EXIT_FAILURE;
diff --git a/src/exchangedb/drop0001.sql b/src/exchangedb/drop0001.sql
index 60acc98a..225c817a 100644
--- a/src/exchangedb/drop0001.sql
+++ b/src/exchangedb/drop0001.sql
@@ -55,6 +55,8 @@ DROP TABLE IF EXISTS wire_targets CASCADE;
 DROP FUNCTION IF EXISTS add_constraints_to_wire_targets_partition;
 DROP TABLE IF EXISTS wire_fee CASCADE;
 DROP TABLE IF EXISTS deposits CASCADE;
+DROP TABLE IF EXISTS deposits_by_ready CASCADE;
+DROP TABLE IF EXISTS deposits_for_matching CASCADE;
 DROP FUNCTION IF EXISTS add_constraints_to_deposits_partition;
 DROP TABLE IF EXISTS extension_details CASCADE;
 DROP TABLE IF EXISTS refunds CASCADE;
@@ -88,6 +90,7 @@ DROP TABLE IF EXISTS recoup_by_reserve CASCADE;
 DROP TABLE IF EXISTS partners CASCADE;
 DROP TABLE IF EXISTS account_merges CASCADE;
 DROP TABLE IF EXISTS purse_merges CASCADE;
+DROP TABLE IF EXISTS purse_deposits CASCADE;
 DROP TABLE IF EXISTS contracts CASCADE;
 DROP TABLE IF EXISTS history_requests CASCADE;
 DROP TABLE IF EXISTS close_requests CASCADE;
@@ -103,8 +106,9 @@ DROP FUNCTION IF EXISTS exchange_do_withdraw;
 DROP FUNCTION IF EXISTS exchange_do_withdraw_limit_check;
 DROP FUNCTION IF EXISTS recoup_insert_trigger;
 DROP FUNCTION IF EXISTS recoup_delete_trigger;
-DROP FUNCTION IF EXISTS deposits_by_coin_insert_trigger;
-DROP FUNCTION IF EXISTS deposits_by_coin_delete_trigger;
+DROP FUNCTION IF EXISTS deposits_insert_trigger;
+DROP FUNCTION IF EXISTS deposits_update_trigger;
+DROP FUNCTION IF EXISTS deposits_delete_trigger;
 DROP FUNCTION IF EXISTS reserves_out_by_reserve_insert_trigger;
 DROP FUNCTION IF EXISTS reserves_out_by_reserve_delete_trigger;
 DROP FUNCTION IF EXISTS exchange_do_deposit;
diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001.sql
index e723a367..568779f9 100644
--- a/src/exchangedb/exchange-0001.sql
+++ b/src/exchangedb/exchange-0001.sql
@@ -610,7 +610,7 @@ CREATE TABLE IF NOT EXISTS deposits
   (deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- PRIMARY KEY
   ,shard INT8 NOT NULL
   ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) -- REFERENCES 
known_coins (coin_pub) ON DELETE CASCADE
-  ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON 
DELETE CASCADE
+  ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON 
DELETE CASCADE --- FIXME: column needed???
   ,amount_with_fee_val INT8 NOT NULL
   ,amount_with_fee_frac INT4 NOT NULL
   ,wallet_timestamp INT8 NOT NULL
@@ -626,22 +626,11 @@ CREATE TABLE IF NOT EXISTS deposits
   ,done BOOLEAN NOT NULL DEFAULT FALSE
   ,extension_blocked BOOLEAN NOT NULL DEFAULT FALSE
   ,extension_details_serial_id INT8 REFERENCES extension_details 
(extension_details_serial_id) ON DELETE CASCADE
-  ,UNIQUE (shard, coin_pub, merchant_pub, h_contract_terms)
+  ,UNIQUE (coin_pub, merchant_pub, h_contract_terms)
   )
-  PARTITION BY HASH (shard); -- FIXME: why not BY RANGE? RANGE would seem 
better for 'deposits_get_ready'!
+  PARTITION BY HASH (coin_pub);
 -- FIXME:
--- new idea: partition deposits by coin_pub (remove deposits_by_coin)
--- define 'ready' == ! (tiny || done || blocked)
--- add new deposits_by_ready (on shard + wire_deadline), select by shard, then 
ready + deadline
---         -- use triggers to ONLY include 'ready' deposits (delete on update)!
---         -- use multi-level partitions: Hash(shard) + 
Range(wire_deadline/sec)
--- add new deposits_by_match (on shard + refund_deadline)
---         -- use triggers to ONLY include 'ready' deposits (delete on update)!
---         -- use multi-level partitions: Hash(shard) + 
Range(refund_deadline/sec)
--- => first we select per-merchant shard, basically stay on the same system as 
other ops for the same merchant
--- => second we select by deadline, use enough values so that _usually_ the 
aggregator
---    and the 'insert' process _can_ work on different shards!
--- => the latter could be achieved by dynamically (!) creating/deleting 
partitions:
+-- TODO: dynamically (!) creating/deleting partitions:
 --    create new partitions 'as needed', drop old ones once the aggregator has 
made
 --    them empty; as 'new' deposits will always have deadlines in the future, 
this
 --    would basically guarantee no conflict between aggregator and exchange 
service!
@@ -683,31 +672,15 @@ COMMENT ON COLUMN deposits.extension_details_serial_id
 COMMENT ON COLUMN deposits.tiny
   IS 'Set to TRUE if we decided that the amount is too small to ever trigger a 
wire transfer by itself (requires real aggregation)';
 
+-- FIXME: we sometimes go ONLY by 'deposit_serial_id',
+--        check if queries could be improved by adding shard or adding another 
index without shard here, or inverting the order of the index here!
 CREATE INDEX IF NOT EXISTS deposits_deposit_by_serial_id_index
   ON deposits
   (shard,deposit_serial_id);
-CREATE INDEX IF NOT EXISTS deposits_for_get_ready_index
-  ON deposits
-  (shard ASC
-  ,done
-  ,extension_blocked
-  ,tiny
-  ,wire_deadline ASC
-  );
-COMMENT ON INDEX deposits_for_get_ready_index
-  IS 'for deposits_get_ready';
 
-CREATE INDEX IF NOT EXISTS deposits_for_iterate_matching_index
+CREATE INDEX IF NOT EXISTS deposits_by_coin_pub_index
   ON deposits
-  (shard
-  ,merchant_pub
-  ,wire_target_h_payto
-  ,done
-  ,extension_blocked
-  ,refund_deadline ASC
-  );
-COMMENT ON INDEX deposits_for_iterate_matching_index
-  IS 'for deposits_iterate_matching';
+  (coin_pub);
 
 
 CREATE TABLE IF NOT EXISTS deposits_default
@@ -732,66 +705,198 @@ $$;
 SELECT add_constraints_to_deposits_partition('default');
 
 
+CREATE TABLE IF NOT EXISTS deposits_by_ready
+  (wire_deadline INT8 NOT NULL
+  ,shard INT8 NOT NULL
+  ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)
+  ,deposit_serial_id INT8
+  )
+  PARTITION BY RANGE (wire_deadline);
+COMMENT ON TABLE deposits_by_ready
+  IS 'Enables fast lookups for deposits_get_ready, auto-populated via TRIGGER 
below';
+
+CREATE INDEX IF NOT EXISTS deposits_by_ready_main_index
+  ON deposits_by_ready
+  (wire_deadline ASC, shard ASC, coin_pub);
+
+CREATE TABLE IF NOT EXISTS deposits_by_ready_default
+  PARTITION OF deposits_by_ready
+  DEFAULT;
 
-CREATE TABLE IF NOT EXISTS deposits_by_coin
-  (deposit_serial_id BIGINT
+
+CREATE TABLE IF NOT EXISTS deposits_for_matching
+  (refund_deadline INT8 NOT NULL
   ,shard INT8 NOT NULL
   ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)
+  ,deposit_serial_id INT8
   )
-  PARTITION BY HASH (coin_pub);
-COMMENT ON TABLE deposits_by_coin
-  IS 'Enables fast lookups of deposit by coin_pub, auto-populated via TRIGGER 
below';
+  PARTITION BY RANGE (refund_deadline);
+COMMENT ON TABLE deposits_for_matching
+  IS 'Enables fast lookups for deposits_iterate_matching, auto-populated via 
TRIGGER below';
 
-CREATE INDEX IF NOT EXISTS deposits_by_coin_main_index
-  ON deposits_by_coin
-  (coin_pub);
+CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index
+  ON deposits_for_matching
+  (refund_deadline ASC, shard, coin_pub);
 
-CREATE TABLE IF NOT EXISTS deposits_by_coin_default
-  PARTITION OF deposits_by_coin
-  FOR VALUES WITH (MODULUS 1, REMAINDER 0);
+CREATE TABLE IF NOT EXISTS deposits_for_matching_default
+  PARTITION OF deposits_for_matching
+  DEFAULT;
+  
 
-CREATE OR REPLACE FUNCTION deposits_by_coin_insert_trigger()
+CREATE OR REPLACE FUNCTION deposits_insert_trigger()
   RETURNS trigger
   LANGUAGE plpgsql
   AS $$
+DECLARE
+  is_ready BOOLEAN;
+DECLARE
+  is_tready BOOLEAN; -- is ready, but may be tiny
 BEGIN
-  INSERT INTO deposits_by_coin
-    (deposit_serial_id
-    ,shard
-    ,coin_pub)
-  VALUES
-    (NEW.deposit_serial_id
-    ,NEW.shard
-    ,NEW.coin_pub);
+  is_ready  = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked);
+  is_tready = NOT (NEW.done OR NEW.extension_blocked);
+
+  IF (is_ready)
+  THEN
+    INSERT INTO deposits_by_ready
+      (wire_deadline
+      ,shard
+      ,coin_pub
+      ,deposit_serial_id)
+    VALUES
+      (NEW.wire_deadline
+      ,NEW.shard
+      ,NEW.coin_pub
+      ,NEW.deposit_serial_id);
+  END IF;
+  IF (is_tready)
+  THEN
+    INSERT INTO deposits_for_matching
+      (refund_deadline
+      ,shard
+      ,coin_pub
+      ,deposit_serial_id)
+    VALUES
+      (NEW.refund_deadline
+      ,NEW.shard
+      ,NEW.coin_pub
+      ,NEW.deposit_serial_id);
+  END IF;
   RETURN NEW;
 END $$;  
-COMMENT ON FUNCTION deposits_by_coin_insert_trigger()
-  IS 'Replicate deposit inserts into deposits_by_coin table.';
+COMMENT ON FUNCTION deposits_insert_trigger()
+  IS 'Replicate deposit inserts into materialized indices.';
 
 CREATE TRIGGER deposits_on_insert
   AFTER INSERT
    ON deposits
-   FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_insert_trigger();
+   FOR EACH ROW EXECUTE FUNCTION deposits_insert_trigger();
 
-CREATE OR REPLACE FUNCTION deposits_by_coin_delete_trigger()
+CREATE OR REPLACE FUNCTION deposits_update_trigger()
   RETURNS trigger
   LANGUAGE plpgsql
   AS $$
+DECLARE
+  was_ready BOOLEAN;
+DECLARE
+  is_ready BOOLEAN;
+DECLARE
+  was_tready BOOLEAN; -- was ready, but may be tiny
+DECLARE
+  is_tready BOOLEAN; -- is ready, but may be tiny
 BEGIN
-  DELETE FROM deposits_by_coin
-   WHERE coin_pub = OLD.coin_pub
-     AND shard = OLD.shard
-     AND deposit_serial_id = OLD.deposit_serial_id;
-  RETURN OLD;
+  was_ready = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked);
+  is_ready  = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked);
+  was_tready = NOT (OLD.done OR OLD.extension_blocked);
+  is_tready  = NOT (NEW.done OR NEW.extension_blocked);
+  IF (was_ready AND NOT is_ready)
+  THEN
+    DELETE FROM deposits_by_ready
+     WHERE wire_deadline = OLD.wire_deadline
+       AND shard = OLD.shard
+       AND coin_pub = OLD.coin_pub
+       AND deposit_serial_id = OLD.deposit_serial_id;
+  END IF;
+  IF (was_tready AND NOT is_tready)
+  THEN
+    DELETE FROM deposits_for_matching
+     WHERE refund_deadline = OLD.refund_deadline
+       AND shard = OLD.shard
+       AND coin_pub = OLD.coin_pub
+       AND deposit_serial_id = OLD.deposit_serial_id;
+  END IF;
+  IF (is_ready AND NOT was_ready)
+  THEN
+    INSERT INTO deposits_by_ready
+      (wire_deadline
+      ,shard
+      ,coin_pub
+      ,deposit_serial_id)
+    VALUES
+      (NEW.wire_deadline
+      ,NEW.shard
+      ,NEW.coin_pub
+      ,NEW.deposit_serial_id);
+  END IF;
+  IF (is_tready AND NOT was_tready)
+  THEN
+    INSERT INTO deposits_for_matching
+      (refund_deadline
+      ,shard
+      ,coin_pub
+      ,deposit_serial_id)
+    VALUES
+      (NEW.refund_deadline
+      ,NEW.shard
+      ,NEW.coin_pub
+      ,NEW.deposit_serial_id);
+  END IF;
+  RETURN NEW;
+END $$;
+COMMENT ON FUNCTION deposits_update_trigger()
+  IS 'Replicate deposits changes into materialized indices.';
+
+CREATE TRIGGER deposits_on_update
+  AFTER UPDATE
+    ON deposits
+   FOR EACH ROW EXECUTE FUNCTION deposits_update_trigger();
+
+CREATE OR REPLACE FUNCTION deposits_delete_trigger()
+  RETURNS trigger
+  LANGUAGE plpgsql
+  AS $$
+DECLARE
+  was_ready BOOLEAN;
+DECLARE
+  was_tready BOOLEAN; -- is ready, but may be tiny
+BEGIN
+  was_ready  = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked);
+  was_tready = NOT (OLD.done OR OLD.extension_blocked);
+
+  IF (was_ready)
+  THEN
+    DELETE FROM deposits_by_ready
+     WHERE wire_deadline = OLD.wire_deadline
+       AND shard = OLD.shard
+       AND coin_pub = OLD.coin_pub
+       AND deposit_serial_id = OLD.deposit_serial_id;
+  END IF;
+  IF (was_tready)
+  THEN
+    DELETE FROM deposits_for_matching
+     WHERE refund_deadline = OLD.refund_deadline
+       AND shard = OLD.shard
+       AND coin_pub = OLD.coin_pub
+       AND deposit_serial_id = OLD.deposit_serial_id;
+  END IF;
+  RETURN NEW;
 END $$;  
-COMMENT ON FUNCTION deposits_by_coin_delete_trigger()
-  IS 'Replicate deposits deletions into deposits_by_coin table.';
+COMMENT ON FUNCTION deposits_delete_trigger()
+  IS 'Replicate deposit deletions into materialized indices.';
 
 CREATE TRIGGER deposits_on_delete
   AFTER DELETE
-    ON deposits
-   FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_delete_trigger();
-
+   ON deposits
+   FOR EACH ROW EXECUTE FUNCTION deposits_delete_trigger();
 
 
 CREATE TABLE IF NOT EXISTS refunds
@@ -2011,7 +2116,7 @@ DECLARE
 BEGIN
 -- Shards: INSERT extension_details (by extension_details_serial_id)
 --         INSERT wire_targets (by h_payto), on CONFLICT DO NOTHING;
---         INSERT deposits (by shard + merchant_pub + h_payto), ON CONFLICT DO 
NOTHING;
+--         INSERT deposits (by coin_pub, shard), ON CONFLICT DO NOTHING;
 --         UPDATE known_coins (by coin_pub)
 
 IF NOT NULL in_extension_details
@@ -2356,27 +2461,26 @@ DECLARE
 DECLARE
   deposit_frac INT8; -- amount that was originally deposited
 BEGIN
--- Shards: SELECT deposits (by shard, coin_pub, h_contract_terms, merchant_pub)
+-- Shards: SELECT deposits (coin_pub, shard, h_contract_terms, merchant_pub)
 --         INSERT refunds (by deposit_serial_id, rtransaction_id) ON CONFLICT 
DO NOTHING
 --         SELECT refunds (by deposit_serial_id)
 --         UPDATE known_coins (by coin_pub)
 
 SELECT
-   dep.deposit_serial_id
-  ,dep.amount_with_fee_val
-  ,dep.amount_with_fee_frac
-  ,dep.done
+   deposit_serial_id
+  ,amount_with_fee_val
+  ,amount_with_fee_frac
+  ,done
 INTO
    dsi
   ,deposit_val
   ,deposit_frac
   ,out_gone
-FROM deposits_by_coin dbc
-  JOIN deposits dep USING (shard,deposit_serial_id)
- WHERE dbc.coin_pub=in_coin_pub
-  AND dep.shard=in_deposit_shard
-  AND dep.merchant_pub=in_merchant_pub
-  AND dep.h_contract_terms=in_h_contract_terms;
+FROM deposits
+ WHERE coin_pub=in_coin_pub
+  AND shard=in_deposit_shard
+  AND merchant_pub=in_merchant_pub
+  AND h_contract_terms=in_h_contract_terms;
 
 IF NOT FOUND
 THEN
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c 
b/src/exchangedb/plugin_exchangedb_postgres.c
index 120f475d..3cde9773 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -990,12 +990,11 @@ prepare_statements (struct PostgresClosure *pg)
       ",rtransaction_id "
       ",amount_with_fee_val "
       ",amount_with_fee_frac "
-      ") SELECT dbc.deposit_serial_id, $3, $5, $6, $7"
-      "    FROM deposits_by_coin dbc"
-      "    JOIN deposits dep USING (shard,deposit_serial_id)"
-      "   WHERE dbc.coin_pub=$1"
-      "     AND dep.h_contract_terms=$4"
-      "     AND dep.merchant_pub=$2",
+      ") SELECT deposit_serial_id, $3, $5, $6, $7"
+      "    FROM deposits" /* FIXME: check if adding additional AND on the 
'shard' would help (possibly after reviewing indices on deposits!) */
+      "   WHERE coin_pub=$1"
+      "     AND h_contract_terms=$4"
+      "     AND merchant_pub=$2",
       7),
     /* Query the 'refunds' by coin public key */
     GNUNET_PQ_make_prepare (
@@ -1010,12 +1009,11 @@ prepare_statements (struct PostgresClosure *pg)
       ",denom.fee_refund_val "
       ",denom.fee_refund_frac "
       ",ref.refund_serial_id"
-      " FROM deposits_by_coin dbc"
+      " FROM deposits dep"
       " JOIN refunds ref USING (deposit_serial_id)"
-      " JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id 
= dep.deposit_serial_id)"
-      " JOIN known_coins kc ON (dbc.coin_pub = kc.coin_pub)"
+      " JOIN known_coins kc ON (dep.coin_pub = kc.coin_pub)"
       " JOIN denominations denom USING (denominations_serial)"
-      " WHERE dbc.coin_pub=$1;",
+      " WHERE dep.coin_pub=$1;",
       1),
     /* Query the 'refunds' by coin public key, merchant_pub and contract hash 
*/
     GNUNET_PQ_make_prepare (
@@ -1023,10 +1021,9 @@ prepare_statements (struct PostgresClosure *pg)
       "SELECT"
       " ref.amount_with_fee_val"
       ",ref.amount_with_fee_frac"
-      " FROM deposits_by_coin dbc"
+      " FROM deposits dep"
       " JOIN refunds ref USING (shard,deposit_serial_id)"
-      " JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id 
= dep.deposit_serial_id)"
-      " WHERE dbc.coin_pub=$1"
+      " WHERE dep.coin_pub=$1"
       "   AND dep.merchant_pub=$2"
       "   AND dep.h_contract_terms=$3;",
       3),
@@ -1053,6 +1050,7 @@ prepare_statements (struct PostgresClosure *pg)
     /* Lock deposit table; NOTE: we may want to eventually shard the
        deposit table to avoid this lock being the main point of
        contention limiting transaction performance. */
+    // FIXME: check if this query is even still used!
     GNUNET_PQ_make_prepare (
       "lock_deposit",
       "LOCK TABLE deposits;",
@@ -1098,12 +1096,11 @@ prepare_statements (struct PostgresClosure *pg)
       ",dep.h_contract_terms"
       ",dep.wire_salt"
       ",wt.payto_uri AS receiver_wire_account"
-      " FROM deposits_by_coin dbc"
-      " JOIN deposits dep USING (shard,deposit_serial_id)"
-      " JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)"
+      " FROM deposits dep"
+      " JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)"
       " JOIN denominations USING (denominations_serial)"
       " JOIN wire_targets wt USING (wire_target_h_payto)"
-      " WHERE dbc.coin_pub=$1"
+      " WHERE dep.coin_pub=$1"
       "   AND dep.merchant_pub=$3"
       "   AND dep.h_contract_terms=$2;",
       3),
@@ -1150,12 +1147,11 @@ prepare_statements (struct PostgresClosure *pg)
       ",denom.fee_deposit_val"
       ",denom.fee_deposit_frac"
       ",dep.wire_deadline"
-      " FROM deposits_by_coin dbc"
-      "    JOIN deposits dep USING (shard,deposit_serial_id)"
+      " FROM deposits dep"
       "    JOIN wire_targets wt USING (wire_target_h_payto)"
-      "    JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)"
+      "    JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)"
       "    JOIN denominations denom USING (denominations_serial)"
-      " WHERE dbc.coin_pub=$1"
+      " WHERE dep.coin_pub=$1"
       "   AND dep.merchant_pub=$3"
       "   AND dep.h_contract_terms=$2;",
       3),
@@ -1163,7 +1159,7 @@ prepare_statements (struct PostgresClosure *pg)
     GNUNET_PQ_make_prepare (
       "deposits_get_ready",
       "SELECT"
-      " deposit_serial_id"
+      " dep.deposit_serial_id"
       ",amount_with_fee_val"
       ",amount_with_fee_frac"
       ",denom.fee_deposit_val"
@@ -1173,47 +1169,46 @@ prepare_statements (struct PostgresClosure *pg)
       ",wire_target_serial_id"
       ",merchant_pub"
       ",kc.coin_pub"
-      " FROM deposits"
+      " FROM deposits_by_ready dbr"
+      "  JOIN deposits dep"
+      "    ON (dbr.coin_pub = dep.coin_pub AND dbr.deposit_serial_id = 
dep.deposit_serial_id)"
       "  JOIN wire_targets "
       "    USING (wire_target_h_payto)"
       "  JOIN known_coins kc"
-      "    USING (coin_pub)"
+      "    ON (kc.coin_pub = dep.coin_pub)"
       "  JOIN denominations denom"
       "    USING (denominations_serial)"
-      " WHERE "
-      "       shard >= $2"
-      "   AND shard <= $3"
-      "   AND done=FALSE"
-      "   AND extension_blocked=FALSE"
-      "   AND tiny=FALSE"
-      "   AND wire_deadline<=$1"
+      " WHERE dbr.wire_deadline<=$1"
+      "   AND dbr.shard >= $2"
+      "   AND dbr.shard <= $3"
       "   AND (kyc_ok OR $4)"
       " ORDER BY "
-      "   shard ASC"
-      "  ,wire_deadline ASC"
+      "   dbr.wire_deadline ASC"
+      "  ,dbr.shard ASC"
       " LIMIT 1;",
       4),
     /* Used in #postgres_iterate_matching_deposits() */
     GNUNET_PQ_make_prepare (
       "deposits_iterate_matching",
       "SELECT"
-      " deposit_serial_id"
-      ",amount_with_fee_val"
-      ",amount_with_fee_frac"
+      " dep.deposit_serial_id"
+      ",dep.amount_with_fee_val"
+      ",dep.amount_with_fee_frac"
       ",denom.fee_deposit_val"
       ",denom.fee_deposit_frac"
-      ",h_contract_terms"
-      ",kc.coin_pub"
-      " FROM deposits"
-      "    JOIN known_coins kc USING (coin_pub)"
-      "    JOIN denominations denom USING (denominations_serial)"
-      " WHERE shard=$4"
-      "  AND merchant_pub=$1"
-      "  AND wire_target_h_payto=$2"
-      "  AND done=FALSE"
-      "  AND extension_blocked=FALSE"
-      "  AND refund_deadline<$3"
-      " ORDER BY refund_deadline ASC"
+      ",dep.h_contract_terms"
+      ",dfm.coin_pub"
+      " FROM deposits_for_matching dfm"
+      "    JOIN deposits dep "
+      "      ON (dep.coin_pub = dfm.coin_pub and dep.deposit_serial_id = 
dfm.deposit_serial_id)"
+      "    JOIN known_coins kc"
+      "      ON (dep.coin_pub = kc.coin_pub)"
+      "    JOIN denominations denom"
+      "      USING (denominations_serial)"
+      " WHERE dfm.refund_deadline<$3"
+      "  AND dfm.shard=$4"
+      "  AND dep.merchant_pub=$1"
+      "  AND dep.wire_target_h_payto=$2"
       " LIMIT "
       TALER_QUOTE (
         TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";",
@@ -1223,16 +1218,16 @@ prepare_statements (struct PostgresClosure *pg)
       "mark_deposit_tiny",
       "UPDATE deposits"
       " SET tiny=TRUE"
-      " WHERE shard=$2"
-      "   AND deposit_serial_id=$1",
+      " WHERE coin_pub=$1"
+      "   AND deposit_serial_id=$2",
       2),
     /* Used in #postgres_mark_deposit_done() */
     GNUNET_PQ_make_prepare (
       "mark_deposit_done",
       "UPDATE deposits"
       " SET done=TRUE"
-      " WHERE shard=$2"
-      "   AND deposit_serial_id=$1;",
+      " WHERE coin_pub=$1"
+      "   AND deposit_serial_id=$2;",
       2),
     /* Used in #postgres_get_coin_transactions() to obtain information
        about how a coin has been spend with /deposit requests. */
@@ -1255,16 +1250,14 @@ prepare_statements (struct PostgresClosure *pg)
       ",dep.coin_sig"
       ",dep.deposit_serial_id"
       ",dep.done"
-      " FROM deposits_by_coin dbc"
-      "    JOIN deposits dep"
-      "      USING (shard,deposit_serial_id)"
+      " FROM deposits dep"
       "    JOIN wire_targets wt"
       "      USING (wire_target_h_payto)"
       "    JOIN known_coins kc"
-      "      ON (kc.coin_pub = dbc.coin_pub)"
+      "      ON (kc.coin_pub = dep.coin_pub)"
       "    JOIN denominations denoms"
       "      USING (denominations_serial)"
-      " WHERE dbc.coin_pub=$1;",
+      " WHERE dep.coin_pub=$1;",
       1),
 
     /* Used in #postgres_get_link_data(). */
@@ -1329,20 +1322,18 @@ prepare_statements (struct PostgresClosure *pg)
       ",wt.payto_uri"
       ",denom.fee_deposit_val"
       ",denom.fee_deposit_frac"
-      " FROM deposits_by_coin dbc"
-      "    JOIN deposits dep"
-      "      USING (shard,deposit_serial_id)"
+      " FROM deposits dep"
       "    JOIN wire_targets wt"
       "      USING (wire_target_h_payto)"
       "    JOIN aggregation_tracking"
       "      USING (deposit_serial_id)"
       "    JOIN known_coins kc"
-      "      ON (kc.coin_pub = dbc.coin_pub)"
+      "      ON (kc.coin_pub = dep.coin_pub)"
       "    JOIN denominations denom"
       "      USING (denominations_serial)"
       "    JOIN wire_out"
       "      USING (wtid_raw)"
-      " WHERE dbc.coin_pub=$1"
+      " WHERE dep.coin_pub=$1"
       "   AND dep.merchant_pub=$3"
       "   AND dep.h_contract_terms=$2",
       3),
@@ -5898,14 +5889,13 @@ postgres_have_deposit2 (
  */
 static enum GNUNET_DB_QueryStatus
 postgres_mark_deposit_tiny (void *cls,
-                            const struct TALER_MerchantPublicKeyP 
*merchant_pub,
+                            const struct TALER_CoinSpendPublicKeyP *coin_pub,
                             uint64_t rowid)
 {
   struct PostgresClosure *pg = cls;
-  uint64_t deposit_shard = compute_shard (merchant_pub);
   struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_auto_from_type (coin_pub),
     GNUNET_PQ_query_param_uint64 (&rowid),
-    GNUNET_PQ_query_param_uint64 (&deposit_shard),
     GNUNET_PQ_query_param_end
   };
 
@@ -5927,14 +5917,13 @@ postgres_mark_deposit_tiny (void *cls,
  */
 static enum GNUNET_DB_QueryStatus
 postgres_mark_deposit_done (void *cls,
-                            const struct TALER_MerchantPublicKeyP 
*merchant_pub,
+                            const struct TALER_CoinSpendPublicKeyP *coin_pub,
                             uint64_t rowid)
 {
   struct PostgresClosure *pg = cls;
-  uint64_t deposit_shard = compute_shard (merchant_pub);
   struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_auto_from_type (coin_pub),
     GNUNET_PQ_query_param_uint64 (&rowid),
-    GNUNET_PQ_query_param_uint64 (&deposit_shard),
     GNUNET_PQ_query_param_end
   };
 
@@ -6431,6 +6420,12 @@ postgres_insert_deposit (void *cls,
     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     return qs;
   }
+  if (GNUNET_TIME_timestamp_cmp (deposit->wire_deadline,
+                                 <,
+                                 deposit->refund_deadline))
+  {
+    GNUNET_break (0);
+  }
   {
     uint64_t shard = compute_shard (&deposit->merchant_pub);
     struct GNUNET_PQ_QueryParam params[] = {
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index 012cac64..79b09e0e 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -2284,7 +2284,7 @@ run (void *cls)
                          "test-2"));
   FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
           plugin->mark_deposit_tiny (plugin->cls,
-                                     &deposit.merchant_pub,
+                                     &deposit.coin.coin_pub,
                                      deposit_rowid));
   FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->get_ready_deposit (plugin->cls,
@@ -2306,7 +2306,7 @@ run (void *cls)
                          "test-3"));
   FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
           plugin->mark_deposit_done (plugin->cls,
-                                     &deposit.merchant_pub,
+                                     &deposit.coin.coin_pub,
                                      deposit_rowid));
   FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->commit (plugin->cls));
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index cee50954..2a462aba 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -3060,13 +3060,13 @@ struct TALER_EXCHANGEDB_Plugin
    * returned by @e iterate_ready_deposits()
    *
    * @param cls the @e cls of this struct with the plugin-specific state
-   * @param merchant_pub identifies the beneficiary of the deposit
+   * @param coin_pub identifies the coin of the deposit
    * @param deposit_rowid identifies the deposit row to modify
    * @return query result status
    */
   enum GNUNET_DB_QueryStatus
   (*mark_deposit_tiny)(void *cls,
-                       const struct TALER_MerchantPublicKeyP *merchant_pub,
+                       const struct TALER_CoinSpendPublicKeyP *coin_pub,
                        uint64_t rowid);
 
 
@@ -3076,13 +3076,13 @@ struct TALER_EXCHANGEDB_Plugin
    * @e iterate_ready_deposits() or @e iterate_matching_deposits().
    *
    * @param cls the @e cls of this struct with the plugin-specific state
-   * @param merchant_pub identifies the beneficiary of the deposit
+   * @param coin_pub identifies the coin of the deposit
    * @param deposit_rowid identifies the deposit row to modify
    * @return query result status
    */
   enum GNUNET_DB_QueryStatus
   (*mark_deposit_done)(void *cls,
-                       const struct TALER_MerchantPublicKeyP *merchant_pub,
+                       const struct TALER_CoinSpendPublicKeyP *coin_pub,
                        uint64_t rowid);
 
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]