gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: new aggregator mega transaction


From: gnunet
Subject: [taler-exchange] branch master updated: new aggregator mega transaction logic
Date: Sun, 27 Mar 2022 13:53:09 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new b9a9af3a new aggregator mega transaction logic
b9a9af3a is described below

commit b9a9af3a59f3abdb09afb9d0f9e4c0d83df789b7
Author: Christian Grothoff <grothoff@gnunet.org>
AuthorDate: Sun Mar 27 13:48:25 2022 +0200

    new aggregator mega transaction logic
---
 src/exchange/taler-exchange-aggregator.c    | 545 ++++++----------------------
 src/exchangedb/plugin_exchangedb_postgres.c | 111 ++----
 src/include/taler_exchangedb_plugin.h       |   8 +-
 3 files changed, 159 insertions(+), 505 deletions(-)

diff --git a/src/exchange/taler-exchange-aggregator.c 
b/src/exchange/taler-exchange-aggregator.c
index c34d47f9..04cf426d 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -28,18 +28,6 @@
 #include "taler_json_lib.h"
 #include "taler_bank_service.h"
 
-struct AdditionalDeposit
-{
-  /**
-   * Public key of the coin.
-   */
-  struct TALER_CoinSpendPublicKeyP coin_pub;
-
-  /**
-   * Row of the deposit.
-   */
-  uint64_t row;
-};
 
 /**
  * Information about one aggregation process to be executed.  There is
@@ -54,11 +42,6 @@ struct AggregationUnit
    */
   struct TALER_MerchantPublicKeyP merchant_pub;
 
-  /**
-   * Public key of the coin.
-   */
-  struct TALER_CoinSpendPublicKeyP coin_pub;
-
   /**
    * Total amount to be transferred, before subtraction of @e fees.wire and 
rounding down.
    */
@@ -79,11 +62,6 @@ struct AggregationUnit
    */
   struct TALER_WireTransferIdentifierRawP wtid;
 
-  /**
-   * Row ID of the transaction that started it all.
-   */
-  uint64_t row_id;
-
   /**
    * The current time (which triggered the aggregation and
    * defines the wire fee).
@@ -100,33 +78,12 @@ struct AggregationUnit
    */
   struct TALER_PaytoHashP h_payto;
 
-  /**
-   * Serial number of the wire target.
-   */
-  uint64_t wire_target;
-
   /**
    * Exchange wire account to be used for the preparation and
    * eventual execution of the aggregate wire transfer.
    */
   const struct TALER_EXCHANGEDB_AccountInfo *wa;
 
-  /**
-   * Array of row_ids from the aggregation.
-   */
-  struct AdditionalDeposit
-    additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
-
-  /**
-   * Offset specifying how many @e additional_rows are in use.
-   */
-  unsigned int rows_offset;
-
-  /**
-   * Set to true if we encountered a refund during #refund_by_coin_cb.
-   * Used to wave the deposit fee.
-   */
-  bool have_refund;
 };
 
 
@@ -340,331 +297,6 @@ parse_wirewatch_config (void)
 }
 
 
-/**
- * Callback invoked with information about refunds applicable
- * to a particular coin.  Subtract refunded amount(s) from
- * the aggregation unit's total amount.
- *
- * @param cls closure with a `struct AggregationUnit *`
- * @param amount_with_fee what was the refunded amount with the fee
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
- */
-static enum GNUNET_GenericReturnValue
-refund_by_coin_cb (void *cls,
-                   const struct TALER_Amount *amount_with_fee)
-{
-  struct AggregationUnit *aux = cls;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Aggregator subtracts applicable refund of amount %s\n",
-              TALER_amount2s (amount_with_fee));
-  aux->have_refund = true;
-  if (0 >
-      TALER_amount_subtract (&aux->total_amount,
-                             &aux->total_amount,
-                             amount_with_fee))
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  return GNUNET_OK;
-}
-
-
-/**
- * Function called with details about deposits that have been made,
- * with the goal of executing the corresponding wire transaction.
- *
- * @param cls a `struct AggregationUnit`
- * @param row_id identifies database entry
- * @param merchant_pub public key of the merchant
- * @param coin_pub public key of the coin
- * @param amount_with_fee amount that was deposited including fee
- * @param deposit_fee amount the exchange gets to keep as transaction fees
- * @param h_contract_terms hash of the proposal data known to merchant and 
customer
- * @param wire_target target account for the wire transfer
- * @param payto_uri URI of the target account
- * @return transaction status code,  #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to 
continue to iterate
- */
-static enum GNUNET_DB_QueryStatus
-deposit_cb (void *cls,
-            uint64_t row_id,
-            const struct TALER_MerchantPublicKeyP *merchant_pub,
-            const struct TALER_CoinSpendPublicKeyP *coin_pub,
-            const struct TALER_Amount *amount_with_fee,
-            const struct TALER_Amount *deposit_fee,
-            const struct TALER_PrivateContractHashP *h_contract_terms,
-            uint64_t wire_target,
-            const char *payto_uri)
-{
-  struct AggregationUnit *au = cls;
-  enum GNUNET_DB_QueryStatus qs;
-
-  au->merchant_pub = *merchant_pub;
-  au->coin_pub = *coin_pub;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Aggregator processing payment %s with amount %s\n",
-              TALER_B2S (coin_pub),
-              TALER_amount2s (amount_with_fee));
-  au->row_id = row_id;
-  au->total_amount = *amount_with_fee;
-  au->have_refund = false;
-  qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
-                                          coin_pub,
-                                          &au->merchant_pub,
-                                          h_contract_terms,
-                                          &refund_by_coin_cb,
-                                          au);
-  if (0 > qs)
-  {
-    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
-    return qs;
-  }
-  if (! au->have_refund)
-  {
-    struct TALER_Amount ntotal;
-
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Non-refunded transaction, subtracting deposit fee %s\n",
-                TALER_amount2s (deposit_fee));
-    if (0 >
-        TALER_amount_subtract (&ntotal,
-                               amount_with_fee,
-                               deposit_fee))
-    {
-      /* This should never happen, issue a warning, but continue processing
-         with an amount of zero, least we hang here for good. */
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Fatally malformed record at row %llu over %s (deposit fee 
exceeds deposited value)\n",
-                  (unsigned long long) row_id,
-                  TALER_amount2s (amount_with_fee));
-      GNUNET_assert (GNUNET_OK ==
-                     TALER_amount_set_zero (au->total_amount.currency,
-                                            &au->total_amount));
-    }
-    else
-    {
-      au->total_amount = ntotal;
-    }
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Amount after fee is %s\n",
-              TALER_amount2s (&au->total_amount));
-
-  GNUNET_assert (NULL == au->payto_uri);
-  au->payto_uri = GNUNET_strdup (payto_uri);
-  TALER_payto_hash (payto_uri,
-                    &au->h_payto);
-  au->wire_target = wire_target;
-  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
-                              &au->wtid,
-                              sizeof (au->wtid));
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Starting aggregation under H(WTID)=%s, starting amount %s at 
%llu\n",
-              TALER_B2S (&au->wtid),
-              TALER_amount2s (amount_with_fee),
-              (unsigned long long) row_id);
-  au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (payto_uri);
-  if (NULL == au->wa)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "No exchange account configured for `%s', please fix your 
setup to continue!\n",
-                payto_uri);
-    return GNUNET_DB_STATUS_HARD_ERROR;
-  }
-
-  /* make sure we have current fees */
-  au->execution_time = GNUNET_TIME_timestamp_get ();
-  {
-    struct GNUNET_TIME_Timestamp start_date;
-    struct GNUNET_TIME_Timestamp end_date;
-    struct TALER_MasterSignatureP master_sig;
-    enum GNUNET_DB_QueryStatus qs;
-
-    qs = db_plugin->get_wire_fee (db_plugin->cls,
-                                  au->wa->method,
-                                  au->execution_time,
-                                  &start_date,
-                                  &end_date,
-                                  &au->fees,
-                                  &master_sig);
-    if (0 >= qs)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Could not get wire fees for %s at %s. Aborting run.\n",
-                  au->wa->method,
-                  GNUNET_TIME_timestamp2s (au->execution_time));
-      return GNUNET_DB_STATUS_HARD_ERROR;
-    }
-  }
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Aggregator starts aggregation for deposit %llu to %s with wire 
fee %s\n",
-              (unsigned long long) row_id,
-              TALER_B2S (&au->wtid),
-              TALER_amount2s (&au->fees.wire));
-  qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
-                                               &au->wtid,
-                                               row_id);
-  if (qs <= 0)
-  {
-    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
-    return qs;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Aggregator marks deposit %llu as done\n",
-              (unsigned long long) row_id);
-  qs = db_plugin->mark_deposit_done (db_plugin->cls,
-                                     coin_pub,
-                                     row_id);
-  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
-  {
-    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
-    return qs;
-  }
-  return qs;
-}
-
-
-/**
- * Function called with details about another deposit we
- * can aggregate into an existing aggregation unit.
- *
- * @param cls a `struct AggregationUnit`
- * @param row_id identifies database entry
- * @param coin_pub public key of the coin
- * @param amount_with_fee amount that was deposited including fee
- * @param deposit_fee amount the exchange gets to keep as transaction fees
- * @param h_contract_terms hash of the proposal data known to merchant and 
customer
- * @return transaction status code
- */
-static enum GNUNET_DB_QueryStatus
-aggregate_cb (void *cls,
-              uint64_t row_id,
-              const struct TALER_CoinSpendPublicKeyP *coin_pub,
-              const struct TALER_Amount *amount_with_fee,
-              const struct TALER_Amount *deposit_fee,
-              const struct TALER_PrivateContractHashP *h_contract_terms)
-{
-  struct AggregationUnit *au = cls;
-  struct TALER_Amount old;
-  enum GNUNET_DB_QueryStatus qs;
-
-  if (row_id == au->row_id)
-    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-  if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
-  {
-    /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT 
results! */
-    GNUNET_break (0);
-    /* Skip this one, but keep going with the overall transaction */
-    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-  }
-
-  /* add to total */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Adding transaction amount %s from row %llu to aggregation\n",
-              TALER_amount2s (amount_with_fee),
-              (unsigned long long) row_id);
-  /* save the existing total aggregate in 'old', for later */
-  old = au->total_amount;
-  /* we begin with the total contribution of the current coin */
-  au->total_amount = *amount_with_fee;
-  /* compute contribution of this coin (after fees) */
-  au->have_refund = false;
-  qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
-                                          coin_pub,
-                                          &au->merchant_pub,
-                                          h_contract_terms,
-                                          &refund_by_coin_cb,
-                                          au);
-  if (0 > qs)
-  {
-    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
-    return qs;
-  }
-  if (! au->have_refund)
-  {
-    struct TALER_Amount tmp;
-
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Subtracting deposit fee %s for non-refunded coin\n",
-                TALER_amount2s (deposit_fee));
-    if (0 >
-        TALER_amount_subtract (&tmp,
-                               &au->total_amount,
-                               deposit_fee))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Fatally malformed record at %llu over amount %s (deposit 
fee exceeds deposited value)\n",
-                  (unsigned long long) row_id,
-                  TALER_amount2s (&au->total_amount));
-      GNUNET_assert (GNUNET_OK ==
-                     TALER_amount_set_zero (old.currency,
-                                            &au->total_amount));
-    }
-    else
-    {
-      au->total_amount = tmp;
-    }
-  }
-
-  /* now add the au->total_amount with the (remaining) contribution of
-     the current coin to the 'old' value with the current aggregate value */
-  {
-    struct TALER_Amount tmp;
-
-    if (0 >
-        TALER_amount_add (&tmp,
-                          &au->total_amount,
-                          &old))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Overflow or currency incompatibility during aggregation at 
%llu\n",
-                  (unsigned long long) row_id);
-      /* Skip this one, but keep going! */
-      au->total_amount = old;
-      return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-    }
-    au->total_amount = tmp;
-  }
-
-  /* "append" to our list of rows */
-  au->additional_rows[au->rows_offset].coin_pub = *coin_pub;
-  au->additional_rows[au->rows_offset].row = row_id;
-  au->rows_offset++;
-  /* insert into aggregation tracking table */
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Adding %llu to aggregate %s\n",
-              (unsigned long long) row_id,
-              TALER_B2S (&au->wtid));
-  qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
-                                               &au->wtid,
-                                               row_id);
-  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Failed to add %llu to aggregate %s: %d\n",
-                (unsigned long long) row_id,
-                TALER_B2S (&au->wtid),
-                qs);
-    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
-    return qs;
-  }
-  qs = db_plugin->mark_deposit_done (db_plugin->cls,
-                                     coin_pub,
-                                     row_id);
-  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
-  {
-    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
-    return qs;
-  }
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Aggregator marked deposit %llu as DONE\n",
-              (unsigned long long) row_id);
-  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-}
-
-
 /**
  * Perform a database commit. If it fails, print a warning.
  *
@@ -727,10 +359,17 @@ run_aggregation (void *cls)
   struct Shard *s = cls;
   struct AggregationUnit au_active;
   enum GNUNET_DB_QueryStatus qs;
+  struct TALER_Amount trans;
+  bool have_transient;
 
   task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Checking for ready deposits to aggregate\n");
+  /* make sure we have current fees */
+  memset (&au_active,
+          0,
+          sizeof (au_active));
+  au_active.execution_time = GNUNET_TIME_timestamp_get ();
   if (GNUNET_OK !=
       db_plugin->start_deferred_wire_out (db_plugin->cls))
   {
@@ -741,16 +380,13 @@ run_aggregation (void *cls)
     release_shard (s);
     return;
   }
-  memset (&au_active,
-          0,
-          sizeof (au_active));
   qs = db_plugin->get_ready_deposit (
     db_plugin->cls,
     s->shard_start,
     s->shard_end,
     kyc_off ? true : false,
-    &deposit_cb,
-    &au_active);
+    &au_active.merchant_pub,
+    &au_active.payto_uri);
   switch (qs)
   {
   case GNUNET_DB_STATUS_HARD_ERROR:
@@ -808,22 +444,98 @@ run_aggregation (void *cls)
     /* continued below */
     break;
   }
+  au_active.wa = TALER_EXCHANGEDB_find_account_by_payto_uri (
+    au_active.payto_uri);
+  if (NULL == au_active.wa)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "No exchange account configured for `%s', please fix your 
setup to continue!\n",
+                au_active.payto_uri);
+    global_ret = EXIT_FAILURE;
+    GNUNET_SCHEDULER_shutdown ();
+    release_shard (s);
+    return;
+  }
+
+  {
+    struct GNUNET_TIME_Timestamp start_date;
+    struct GNUNET_TIME_Timestamp end_date;
+    struct TALER_MasterSignatureP master_sig;
+
+    qs = db_plugin->get_wire_fee (db_plugin->cls,
+                                  au_active.wa->method,
+                                  au_active.execution_time,
+                                  &start_date,
+                                  &end_date,
+                                  &au_active.fees,
+                                  &master_sig);
+    if (0 >= qs)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Could not get wire fees for %s at %s. Aborting run.\n",
+                  au_active.wa->method,
+                  GNUNET_TIME_timestamp2s (au_active.execution_time));
+      global_ret = EXIT_FAILURE;
+      GNUNET_SCHEDULER_shutdown ();
+      release_shard (s);
+      return;
+    }
+  }
+
 
   /* Now try to find other deposits to aggregate */
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Found ready deposit for %s, aggregating by target %llu\n",
+              "Found ready deposit for %s, aggregating by target %s\n",
               TALER_B2S (&au_active.merchant_pub),
-              (unsigned long long) au_active.wire_target);
-  qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
-                                             &au_active.h_payto,
-                                             &au_active.merchant_pub,
-                                             &aggregate_cb,
-                                             &au_active,
-                                             
TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT);
+              au_active.payto_uri);
+  TALER_payto_hash (au_active.payto_uri,
+                    &au_active.h_payto);
+
+  qs = db_plugin->select_aggregation_transient (db_plugin->cls,
+                                                &au_active.h_payto,
+                                                au_active.wa->section_name,
+                                                &au_active.wtid,
+                                                &trans);
+  switch (qs)
+  {
+  case GNUNET_DB_STATUS_HARD_ERROR:
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Failed to lookup transient aggregates!\n");
+    cleanup_au (&au_active);
+    db_plugin->rollback (db_plugin->cls);
+    global_ret = EXIT_FAILURE;
+    GNUNET_SCHEDULER_shutdown ();
+    release_shard (s);
+    return;
+  case GNUNET_DB_STATUS_SOFT_ERROR:
+    /* serializiability issue, try again */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Serialization issue, trying again later!\n");
+    db_plugin->rollback (db_plugin->cls);
+    cleanup_au (&au_active);
+    GNUNET_assert (NULL == task);
+    task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+                                     s);
+    return;
+  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+                                &au_active.wtid,
+                                sizeof (au_active.wtid));
+    have_transient = false;
+    break;
+  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+    have_transient = true;
+    break;
+  }
+  qs = db_plugin->aggregate (db_plugin->cls,
+                             &au_active.h_payto,
+                             &au_active.merchant_pub,
+                             &au_active.wtid,
+                             &au_active.total_amount);
   if (GNUNET_DB_STATUS_HARD_ERROR == qs)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Failed to execute deposit iteration!\n");
+                "Failed to execute aggregation!\n");
     cleanup_au (&au_active);
     db_plugin->rollback (db_plugin->cls);
     global_ret = EXIT_FAILURE;
@@ -844,13 +556,17 @@ run_aggregation (void *cls)
     return;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Found %d other deposits to combine into wire transfer with fee 
%s.\n",
-              qs,
-              TALER_amount2s (&au_active.fees.wire));
+              "Aggregation total is %s.\n",
+              TALER_amount2s (&au_active.total_amount));
 
   /* Subtract wire transfer fee and round to the unit supported by the
      wire transfer method; Check if after rounding down, we still have
      an amount to transfer, and if not mark as 'tiny'. */
+  if (have_transient)
+    GNUNET_assert (0 <=
+                   TALER_amount_add (&au_active.total_amount,
+                                     &au_active.total_amount,
+                                     &trans));
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Rounding aggregate of %s\n",
               TALER_amount2s (&au_active.total_amount));
@@ -867,45 +583,17 @@ run_aggregation (void *cls)
                 "Aggregate value too low for transfer (%d/%s)\n",
                 qs,
                 TALER_amount2s (&au_active.final_amount));
-    /* Rollback ongoing transaction, as we will not use the respective
-       WTID and thus need to remove the tracking data */
-    db_plugin->rollback (db_plugin->cls);
-
-    /* There were results, just the value was too low.  Start another
-       transaction to mark all* of the selected deposits as minor! */
-    if (GNUNET_OK !=
-        db_plugin->start (db_plugin->cls,
-                          "aggregator mark tiny transactions"))
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "Failed to start database transaction!\n");
-      global_ret = EXIT_FAILURE;
-      cleanup_au (&au_active);
-      GNUNET_SCHEDULER_shutdown ();
-      release_shard (s);
-      return;
-    }
-    /* Mark transactions by row_id as minor */
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "Marking %s (%llu) as tiny\n",
-                TALER_B2S (&au_active.coin_pub),
-                (unsigned long long) au_active.row_id);
-    qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
-                                       &au_active.coin_pub,
-                                       au_active.row_id);
-    if (0 < qs)
-    {
-      for (unsigned int i = 0; i<au_active.rows_offset; i++)
-      {
-        qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
-                                           &au_active.additional_rows[i].
-                                           coin_pub,
-                                           au_active.additional_rows[i].row);
-        if (0 >= qs)
-          break;
-      }
-    }
-    GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs);
+    if (have_transient)
+      qs = db_plugin->update_aggregation_transient (db_plugin->cls,
+                                                    &au_active.h_payto,
+                                                    &au_active.wtid,
+                                                    &au_active.total_amount);
+    else
+      qs = db_plugin->create_aggregation_transient (db_plugin->cls,
+                                                    &au_active.h_payto,
+                                                    au_active.wa->section_name,
+                                                    &au_active.wtid,
+                                                    &au_active.total_amount);
     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -962,8 +650,7 @@ run_aggregation (void *cls)
                                               buf_size);
     GNUNET_free (buf);
   }
-  /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
-     table constraints */
+  /* Commit the WTID data to 'wire_out'  */
   if (qs >= 0)
     qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
                                              au_active.execution_time,
@@ -971,6 +658,12 @@ run_aggregation (void *cls)
                                              &au_active.h_payto,
                                              au_active.wa->section_name,
                                              &au_active.final_amount);
+
+  if ( (qs >= 0) &&
+       have_transient)
+    qs = db_plugin->delete_aggregation_transient (db_plugin->cls,
+                                                  &au_active.h_payto,
+                                                  &au_active.wtid);
   cleanup_au (&au_active);
 
   if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c 
b/src/exchangedb/plugin_exchangedb_postgres.c
index 1709f17e..36a5e48b 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1160,29 +1160,17 @@ prepare_statements (struct PostgresClosure *pg)
     GNUNET_PQ_make_prepare (
       "deposits_get_ready",
       "SELECT"
-      " dep.deposit_serial_id"
-      ",amount_with_fee_val"
-      ",amount_with_fee_frac"
-      ",denom.fee_deposit_val"
-      ",denom.fee_deposit_frac"
-      ",h_contract_terms"
-      ",payto_uri"
-      ",wire_target_serial_id"
+      " payto_uri"
       ",merchant_pub"
-      ",kc.coin_pub"
       " FROM deposits_by_ready dbr"
       "  JOIN deposits dep"
       "    ON (dbr.coin_pub = dep.coin_pub AND dbr.deposit_serial_id = 
dep.deposit_serial_id)"
-      "  JOIN wire_targets "
+      "  JOIN wire_targets wt"
       "    USING (wire_target_h_payto)"
-      "  JOIN known_coins kc"
-      "    ON (kc.coin_pub = dep.coin_pub)"
-      "  JOIN denominations denom"
-      "    USING (denominations_serial)"
       " WHERE dbr.wire_deadline<=$1"
       "   AND dbr.shard >= $2"
       "   AND dbr.shard <= $3"
-      "   AND (kyc_ok OR $4)"
+      "   AND (wt.kyc_ok OR $4)"
       " ORDER BY "
       "   dbr.wire_deadline ASC"
       "  ,dbr.shard ASC"
@@ -1218,22 +1206,23 @@ prepare_statements (struct PostgresClosure *pg)
     /* Used in #postgres_aggregate() */
     GNUNET_PQ_make_prepare (
       "aggregate",
-      "WITH rdy AS (" /* find deposits ready */
+      "WITH rdy AS (" /* find deposits ready by merchant */
       "  SELECT"
       "    coin_pub"
       "    FROM deposits_for_matching"
-      "    WHERE refund_deadline<$1"
-      "      AND merchant_pub=$2"
+      "    WHERE refund_deadline<$1" /* filter by shard, only actually 
executable deposits */
+      "      AND merchant_pub=$2" /* filter by target merchant */
       "    ORDER BY refund_deadline ASC" /* ordering is not critical */
       "    LIMIT "
-      TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
+      TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) /* limits 
transaction size */
       " )"
-      " ,dep AS (" /* restrict to our merchant and account */
+      " ,dep AS (" /* restrict to our merchant and account and mark as done */
       "  UPDATE deposits"
       "     SET done=TRUE"
       "   WHERE coin_pub IN (SELECT coin_pub FROM rdy)"
-      "     AND merchant_pub=$2"
-      "     AND wire_target_h_payto=$3"
+      "     AND merchant_pub=$2" /* theoretically, same coin could be spent at 
another merchant */
+      "     AND wire_target_h_payto=$3" /* merchant could have a 2nd bank 
account */
+      "     AND done=FALSE" /* theoretically, same coin could be spend at the 
same merchant a 2nd time */
       "   RETURNING"
       "     deposit_serial_id"
       "    ,coin_pub"
@@ -1244,18 +1233,26 @@ prepare_statements (struct PostgresClosure *pg)
       "    amount_with_fee_val AS refund_val"
       "   ,amount_with_fee_frac AS refund_frac"
       "   ,coin_pub"
+      "   ,deposit_serial_id" /* theoretically, coin could be in multiple 
refunded transactions */
       "    FROM refunds"
       "   WHERE coin_pub IN (SELECT coin_pub FROM dep)"
       "     AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))"
+      " ,coins_with_fees AS (" /* find coins for which deposit fees apply */
+      "  SELECT"
+      "     coin_pub"
+      "    ,deposit_serial_id" /* ensures that if the same coin is deposited 
twice, it is in the list twice */
+      "    FROM dep"
+      "   WHERE deposit_serial_id NOT IN (SELECT deposit_serial_id FROM ref))"
       " ,fees AS (" /* find deposit fees for non-refunded deposits */
       "  SELECT"
       "    denom.fee_deposit_val AS fee_val"
       "   ,denom.fee_deposit_frac AS fee_frac"
-      "    FROM known_coins kc"
+      "   ,cs.deposit_serial_id" /* ensures we get the fee for each coin, not 
once per denomination */
+      "    FROM coins_with_fees cs"
+      "    JOIN known_coins kc"
+      "      USING (coin_pub)"
       "    JOIN denominations denom"
-      "      USING (denominations_serial)"
-      "    WHERE coin_pub IN (SELECT coin_pub FROM dep)"
-      "      AND coin_pub NOT IN (SELECT coin_pub FROM ref))"
+      "      USING (denominations_serial))"
       " ,dummy AS (" /* add deposits to aggregation_tracking */
       "    INSERT INTO aggregation_tracking"
       "    (deposit_serial_id"
@@ -1263,14 +1260,14 @@ prepare_statements (struct PostgresClosure *pg)
       "    SELECT deposit_serial_id,$4"
       "      FROM dep)"
       "SELECT" /* calculate totals (deposits, refunds and fees) */
-      "  CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value"
-      " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction"
+      "  CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" 
/* cast needed, otherwise we get NUMBER */
+      " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" /* SUM over 
INT returns INT8 */
       " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value"
       " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction"
       " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value"
       " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction"
       " FROM dep "
-      "   FULL OUTER JOIN ref ON (FALSE)"
+      "   FULL OUTER JOIN ref ON (FALSE)"    /* We just want all sums */
       "   FULL OUTER JOIN fees ON (FALSE);",
       4),
 
@@ -6270,8 +6267,8 @@ postgres_mark_deposit_done (void *cls,
  * @param end_shard_row maximum shard row to select (inclusive)
  * @param kyc_off true if we should not check the KYC status because
  *                this exchange does not need/support KYC checks.
- * @param deposit_cb function to call for ONE such deposit
- * @param deposit_cb_cls closure for @a deposit_cb
+ * @param[out] merchant_pub set to the public key of a merchant with a ready 
deposit
+ * @param[out] payto_uri set to the account of the merchant, to be freed by 
caller
  * @return transaction status code
  */
 static enum GNUNET_DB_QueryStatus
@@ -6279,8 +6276,8 @@ postgres_get_ready_deposit (void *cls,
                             uint64_t start_shard_row,
                             uint64_t end_shard_row,
                             bool kyc_off,
-                            TALER_EXCHANGEDB_DepositIterator deposit_cb,
-                            void *deposit_cb_cls)
+                            struct TALER_MerchantPublicKeyP *merchant_pub,
+                            char **payto_uri)
 {
   struct PostgresClosure *pg = cls;
   struct GNUNET_TIME_Absolute now = {0};
@@ -6291,34 +6288,13 @@ postgres_get_ready_deposit (void *cls,
     GNUNET_PQ_query_param_bool (kyc_off),
     GNUNET_PQ_query_param_end
   };
-  struct TALER_Amount amount_with_fee;
-  struct TALER_Amount deposit_fee;
-  struct TALER_PrivateContractHashP h_contract_terms;
-  struct TALER_MerchantPublicKeyP merchant_pub;
-  struct TALER_CoinSpendPublicKeyP coin_pub;
-  uint64_t serial_id;
-  uint64_t wire_target;
-  char *payto_uri;
   struct GNUNET_PQ_ResultSpec rs[] = {
-    GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
-                                  &serial_id),
-    GNUNET_PQ_result_spec_uint64 ("wire_target_serial_id",
-                                  &wire_target),
-    TALER_PQ_RESULT_SPEC_AMOUNT ("amount_with_fee",
-                                 &amount_with_fee),
-    TALER_PQ_RESULT_SPEC_AMOUNT ("fee_deposit",
-                                 &deposit_fee),
-    GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
-                                          &h_contract_terms),
     GNUNET_PQ_result_spec_auto_from_type ("merchant_pub",
-                                          &merchant_pub),
-    GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
-                                          &coin_pub),
+                                          merchant_pub),
     GNUNET_PQ_result_spec_string ("payto_uri",
-                                  &payto_uri),
+                                  payto_uri),
     GNUNET_PQ_result_spec_end
   };
-  enum GNUNET_DB_QueryStatus qs;
 
   now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
                                          pg->aggregator_shift);
@@ -6328,25 +6304,10 @@ postgres_get_ready_deposit (void *cls,
               "Finding ready deposits by deadline %s (%llu)\n",
               GNUNET_TIME_absolute2s (now),
               (unsigned long long) now.abs_value_us);
-
-  qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
-                                                 "deposits_get_ready",
-                                                 params,
-                                                 rs);
-  if (qs <= 0)
-    return qs;
-
-  qs = deposit_cb (deposit_cb_cls,
-                   serial_id,
-                   &merchant_pub,
-                   &coin_pub,
-                   &amount_with_fee,
-                   &deposit_fee,
-                   &h_contract_terms,
-                   wire_target,
-                   payto_uri);
-  GNUNET_PQ_cleanup_result (rs);
-  return qs;
+  return GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+                                                   "deposits_get_ready",
+                                                   params,
+                                                   rs);
 }
 
 
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index 4ca6905e..06810a7d 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -3097,8 +3097,8 @@ struct TALER_EXCHANGEDB_Plugin
    * @param end_shard_row maximum shard row to select (inclusive)
    * @param kyc_off true if we should not check the KYC status because
    *                this exchange does not need/support KYC checks.
-   * @param deposit_cb function to call for ONE such deposit
-   * @param deposit_cb_cls closure for @a deposit_cb
+   * @param[out] merchant_pub set to the public key of a merchant with a ready 
deposit
+   * @param[out] payto_uri set to the account of the merchant, to be freed by 
caller
    * @return transaction status code
    */
   enum GNUNET_DB_QueryStatus
@@ -3106,8 +3106,8 @@ struct TALER_EXCHANGEDB_Plugin
                        uint64_t start_shard_row,
                        uint64_t end_shard_row,
                        bool kyc_off,
-                       TALER_EXCHANGEDB_DepositIterator deposit_cb,
-                       void *deposit_cb_cls);
+                       struct TALER_MerchantPublicKeyP *merchant_pub,
+                       char **payto_uri);
 
 
 /**

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]