gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] 01/02: wallet-core: refresh cleanup, preparations fo


From: gnunet
Subject: [taler-wallet-core] 01/02: wallet-core: refresh cleanup, preparations for #8568
Date: Tue, 02 Apr 2024 18:40:56 +0200

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

commit 651db75296bfe7c35dc7d29e39f25e6dacb72930
Author: Florian Dold <florian@dold.me>
AuthorDate: Tue Apr 2 18:19:11 2024 +0200

    wallet-core: refresh cleanup, preparations for #8568
---
 .../test-wallet-refresh-blocked.ts                 |  66 ++++
 .../src/integrationtests/testrunner.ts             |   2 +
 packages/taler-wallet-core/src/common.ts           |  10 +-
 packages/taler-wallet-core/src/db.ts               |   5 +
 packages/taler-wallet-core/src/deposits.ts         |   8 +-
 packages/taler-wallet-core/src/exchanges.ts        |   1 +
 packages/taler-wallet-core/src/pay-merchant.ts     |   4 +
 .../taler-wallet-core/src/pay-peer-pull-debit.ts   |   2 +
 .../taler-wallet-core/src/pay-peer-push-debit.ts   |   3 +
 packages/taler-wallet-core/src/recoup.ts           |   1 +
 packages/taler-wallet-core/src/refresh.ts          | 407 +++++++++++----------
 11 files changed, 306 insertions(+), 203 deletions(-)

diff --git 
a/packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts 
b/packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts
new file mode 100644
index 000000000..806802612
--- /dev/null
+++ b/packages/taler-harness/src/integrationtests/test-wallet-refresh-blocked.ts
@@ -0,0 +1,66 @@
+/*
+ This file is part of GNU Taler
+ (C) 2020 Taler Systems S.A.
+
+ GNU Taler is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ GNU Taler; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
+ */
+
+/**
+ * Imports.
+ */
+import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
+import { GlobalTestState } from "../harness/harness.js";
+import {
+  createSimpleTestkudosEnvironmentV2,
+  createWalletDaemonWithClient,
+  withdrawViaBankV2,
+} from "../harness/helpers.js";
+
+/**
+ * Run test for refreshe after a payment.
+ */
+export async function runWalletRefreshBlockedTest(t: GlobalTestState) {
+  // Set up test environment
+
+  const { walletClient, bank, exchange, merchant } =
+    await createSimpleTestkudosEnvironmentV2(t);
+
+  // Withdraw digital cash into the wallet.
+
+  const { walletClient: w1 } = await createWalletDaemonWithClient(t, {
+    name: "w1",
+    config: {
+      testing: {
+        devModeActive: true,
+      },
+    },
+  });
+
+  await withdrawViaBankV2(t, {
+    walletClient: w1,
+    bank,
+    exchange,
+    amount: "TESTKUDOS:20",
+  });
+
+  await w1.call(WalletApiOperation.TestingWaitTransactionsFinal, {});
+
+  // Prevent the wallet from doing refreshes by injecting a 5xx
+  // status for all refresh requests.
+  await w1.call(WalletApiOperation.ApplyDevExperiment, {
+    devExperimentUri: "taler://dev-experiment/start-block-refresh",
+  });
+
+  // FIXME: Now force a refresh, check balance
+}
+
+runWalletRefreshBlockedTest.suites = ["wallet"];
diff --git a/packages/taler-harness/src/integrationtests/testrunner.ts 
b/packages/taler-harness/src/integrationtests/testrunner.ts
index 380251b76..2bca91e45 100644
--- a/packages/taler-harness/src/integrationtests/testrunner.ts
+++ b/packages/taler-harness/src/integrationtests/testrunner.ts
@@ -101,6 +101,7 @@ import { runWalletGenDbTest } from "./test-wallet-gendb.js";
 import { runWalletInsufficientBalanceTest } from 
"./test-wallet-insufficient-balance.js";
 import { runWalletNotificationsTest } from "./test-wallet-notifications.js";
 import { runWalletObservabilityTest } from "./test-wallet-observability.js";
+import { runWalletRefreshBlockedTest } from "./test-wallet-refresh-blocked.js";
 import { runWalletRefreshTest } from "./test-wallet-refresh.js";
 import { runWalletWirefeesTest } from "./test-wallet-wirefees.js";
 import { runWallettestingTest } from "./test-wallettesting.js";
@@ -212,6 +213,7 @@ const allTests: TestMainFunction[] = [
   runWalletWirefeesTest,
   runDenomLostTest,
   runWalletDenomExpireTest,
+  runWalletRefreshBlockedTest,
 ];
 
 export interface TestRunSpec {
diff --git a/packages/taler-wallet-core/src/common.ts 
b/packages/taler-wallet-core/src/common.ts
index eb06b8eb0..5b7ceeead 100644
--- a/packages/taler-wallet-core/src/common.ts
+++ b/packages/taler-wallet-core/src/common.ts
@@ -42,8 +42,10 @@ import {
 } from "@gnu-taler/taler-util";
 import {
   BackupProviderRecord,
+  CoinAvailabilityRecord,
   CoinRecord,
   DbPreciseTimestamp,
+  DenominationRecord,
   DepositGroupRecord,
   ExchangeEntryDbRecordStatus,
   ExchangeEntryDbUpdateStatus,
@@ -145,7 +147,13 @@ export async function makeCoinAvailable(
 export async function spendCoins(
   wex: WalletExecutionContext,
   tx: WalletDbReadWriteTransaction<
-    ["coins", "coinAvailability", "refreshGroups", "denominations"]
+    [
+      "coins",
+      "coinAvailability",
+      "refreshGroups",
+      "refreshSessions",
+      "denominations",
+    ]
   >,
   csi: CoinsSpendInfo,
 ): Promise<void> {
diff --git a/packages/taler-wallet-core/src/db.ts 
b/packages/taler-wallet-core/src/db.ts
index 487927b8f..de22d78a8 100644
--- a/packages/taler-wallet-core/src/db.ts
+++ b/packages/taler-wallet-core/src/db.ts
@@ -2198,6 +2198,11 @@ export interface CoinAvailabilityRecord {
    * a final state.
    */
   visibleCoinCount: number;
+
+  /**
+   * Number of coins that we expect to obtain via a pending refresh.
+   */
+  pendingRefreshOutputCount?: number;
 }
 
 export interface ContractTermsRecord {
diff --git a/packages/taler-wallet-core/src/deposits.ts 
b/packages/taler-wallet-core/src/deposits.ts
index 6c04b20de..a8612744f 100644
--- a/packages/taler-wallet-core/src/deposits.ts
+++ b/packages/taler-wallet-core/src/deposits.ts
@@ -489,6 +489,7 @@ async function refundDepositGroup(
     [
       "depositGroups",
       "refreshGroups",
+      "refreshSessions",
       "coins",
       "denominations",
       "coinAvailability",
@@ -755,9 +756,9 @@ async function processDepositGroupPendingTrack(
     let updatedTxStatus: DepositElementStatus | undefined = undefined;
     let newWiredCoin:
       | {
-        id: string;
-        value: DepositTrackingInfo;
-      }
+          id: string;
+          value: DepositTrackingInfo;
+        }
       | undefined;
 
     if (depositGroup.statusPerCoin[i] !== DepositElementStatus.Wired) {
@@ -1448,6 +1449,7 @@ export async function createDepositGroup(
       "recoupGroups",
       "denominations",
       "refreshGroups",
+      "refreshSessions",
       "coinAvailability",
       "contractTerms",
     ],
diff --git a/packages/taler-wallet-core/src/exchanges.ts 
b/packages/taler-wallet-core/src/exchanges.ts
index 7fb387a9e..0fbe7297c 100644
--- a/packages/taler-wallet-core/src/exchanges.ts
+++ b/packages/taler-wallet-core/src/exchanges.ts
@@ -1614,6 +1614,7 @@ export async function updateExchangeFromUrlHandler(
         "denominations",
         "coinAvailability",
         "refreshGroups",
+        "refreshSessions",
         "exchanges",
       ],
       async (tx) => {
diff --git a/packages/taler-wallet-core/src/pay-merchant.ts 
b/packages/taler-wallet-core/src/pay-merchant.ts
index 812d32429..3b58c1e0a 100644
--- a/packages/taler-wallet-core/src/pay-merchant.ts
+++ b/packages/taler-wallet-core/src/pay-merchant.ts
@@ -272,6 +272,7 @@ export class PayMerchantTransactionContext implements 
TransactionContext {
       [
         "purchases",
         "refreshGroups",
+        "refreshSessions",
         "denominations",
         "coinAvailability",
         "coins",
@@ -1175,6 +1176,7 @@ async function handleInsufficientFunds(
       "coinAvailability",
       "denominations",
       "refreshGroups",
+      "refreshSessions",
     ],
     async (tx) => {
       const p = await tx.purchases.get(proposalId);
@@ -1854,6 +1856,7 @@ export async function confirmPay(
       "purchases",
       "coins",
       "refreshGroups",
+      "refreshSessions",
       "denominations",
       "coinAvailability",
     ],
@@ -3047,6 +3050,7 @@ async function storeRefunds(
       "coins",
       "coinAvailability",
       "refreshGroups",
+      "refreshSessions",
     ],
     async (tx) => {
       const myPurchase = await tx.purchases.get(purchase.proposalId);
diff --git a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts 
b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
index 6cc552714..da68d7839 100644
--- a/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-pull-debit.ts
@@ -234,6 +234,7 @@ export class PeerPullDebitTransactionContext implements 
TransactionContext {
           "coinAvailability",
           "denominations",
           "refreshGroups",
+          "refreshSessions",
           "coins",
           "coinAvailability",
         ],
@@ -609,6 +610,7 @@ export async function confirmPeerPullDebit(
       "coins",
       "denominations",
       "refreshGroups",
+      "refreshSessions",
       "peerPullDebit",
       "coinAvailability",
     ],
diff --git a/packages/taler-wallet-core/src/pay-peer-push-debit.ts 
b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
index ab80888eb..20001e040 100644
--- a/packages/taler-wallet-core/src/pay-peer-push-debit.ts
+++ b/packages/taler-wallet-core/src/pay-peer-push-debit.ts
@@ -588,6 +588,7 @@ async function processPeerPushDebitAbortingDeletePurse(
     [
       "peerPushDebit",
       "refreshGroups",
+      "refreshSessions",
       "denominations",
       "coinAvailability",
       "coins",
@@ -821,6 +822,7 @@ async function processPeerPushDebitReady(
       [
         "peerPushDebit",
         "refreshGroups",
+        "refreshSessions",
         "denominations",
         "coinAvailability",
         "coins",
@@ -971,6 +973,7 @@ export async function initiatePeerPushDebit(
       "coinAvailability",
       "denominations",
       "refreshGroups",
+      "refreshSessions",
       "peerPushDebit",
     ],
     async (tx) => {
diff --git a/packages/taler-wallet-core/src/recoup.ts 
b/packages/taler-wallet-core/src/recoup.ts
index b8b2cf808..758ba106d 100644
--- a/packages/taler-wallet-core/src/recoup.ts
+++ b/packages/taler-wallet-core/src/recoup.ts
@@ -390,6 +390,7 @@ export async function processRecoupGroup(
       "coinAvailability",
       "denominations",
       "refreshGroups",
+      "refreshSessions",
       "coins",
     ],
     async (tx) => {
diff --git a/packages/taler-wallet-core/src/refresh.ts 
b/packages/taler-wallet-core/src/refresh.ts
index 516d5e3da..490b1b5f5 100644
--- a/packages/taler-wallet-core/src/refresh.ts
+++ b/packages/taler-wallet-core/src/refresh.ts
@@ -66,7 +66,6 @@ import {
 } from "@gnu-taler/taler-util/http";
 import {
   constructTaskIdentifier,
-  makeCoinAvailable,
   makeCoinsVisible,
   PendingTaskType,
   TaskIdStr,
@@ -82,6 +81,7 @@ import {
 } from "./crypto/cryptoTypes.js";
 import { CryptoApiStoppedError } from "./crypto/workers/crypto-dispatcher.js";
 import {
+  CoinAvailabilityRecord,
   CoinRecord,
   CoinSourceType,
   DenominationRecord,
@@ -305,192 +305,130 @@ export function getTotalRefreshCost(
   return totalCost;
 }
 
-function updateGroupStatus(rg: RefreshGroupRecord): { final: boolean } {
-  const allFinal = fnutil.all(
-    rg.statusPerCoin,
-    (x) => x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed,
-  );
-  const anyFailed = fnutil.any(
-    rg.statusPerCoin,
-    (x) => x === RefreshCoinStatus.Failed,
-  );
-  if (allFinal) {
-    if (anyFailed) {
-      rg.timestampFinished = timestampPreciseToDb(TalerPreciseTimestamp.now());
-      rg.operationStatus = RefreshOperationStatus.Failed;
-    } else {
-      rg.timestampFinished = timestampPreciseToDb(TalerPreciseTimestamp.now());
-      rg.operationStatus = RefreshOperationStatus.Finished;
-    }
-    return { final: true };
+export async function getCoinAvailabilityForDenom(
+  wex: WalletExecutionContext,
+  tx: WalletDbReadWriteTransaction<
+    ["coins", "coinAvailability", "denominations"]
+  >,
+  denom: DenominationInfo,
+  ageRestriction: number,
+): Promise<CoinAvailabilityRecord> {
+  checkDbInvariant(!!denom);
+  let car = await tx.coinAvailability.get([
+    denom.exchangeBaseUrl,
+    denom.denomPubHash,
+    ageRestriction,
+  ]);
+  if (!car) {
+    car = {
+      maxAge: ageRestriction,
+      value: denom.value,
+      currency: Amounts.currencyOf(denom.value),
+      denomPubHash: denom.denomPubHash,
+      exchangeBaseUrl: denom.exchangeBaseUrl,
+      freshCoinCount: 0,
+      visibleCoinCount: 0,
+    };
   }
-  return { final: false };
+  return car;
 }
 
 /**
  * Create a refresh session for one particular coin inside a refresh group.
- *
- * If the session already exists, return the existing one.
- *
- * If the session doesn't need to be created (refresh group gone or session 
already
- * finished), return undefined.
  */
-async function provideRefreshSession(
+async function initRefreshSession(
   wex: WalletExecutionContext,
-  refreshGroupId: string,
+  tx: WalletDbReadWriteTransaction<
+    ["refreshSessions", "coinAvailability", "coins", "denominations"]
+  >,
+  refreshGroup: RefreshGroupRecord,
   coinIndex: number,
-): Promise<RefreshSessionRecord | undefined> {
+): Promise<void> {
+  const refreshGroupId = refreshGroup.refreshGroupId;
   logger.trace(
     `creating refresh session for coin ${coinIndex} in refresh group 
${refreshGroupId}`,
   );
-
-  const d = await wex.db.runReadWriteTx(
-    ["coins", "refreshGroups", "refreshSessions"],
-    async (tx) => {
-      const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
-      if (!refreshGroup) {
-        return;
-      }
-      if (
-        refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished
-      ) {
-        return;
-      }
-      const existingRefreshSession = await tx.refreshSessions.get([
-        refreshGroupId,
-        coinIndex,
-      ]);
-      const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
-      const coin = await tx.coins.get(oldCoinPub);
-      if (!coin) {
-        throw Error("Can't refresh, coin not found");
-      }
-      return { refreshGroup, coin, existingRefreshSession };
-    },
-  );
-
-  if (!d) {
-    return undefined;
-  }
-
-  if (d.existingRefreshSession) {
-    return d.existingRefreshSession;
+  const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
+  const oldCoin = await tx.coins.get(oldCoinPub);
+  if (!oldCoin) {
+    throw Error("Can't refresh, coin not found");
   }
 
-  const { refreshGroup, coin } = d;
-
-  const exch = await fetchFreshExchange(wex, coin.exchangeBaseUrl);
+  const exchangeBaseUrl = oldCoin.exchangeBaseUrl;
 
-  // FIXME: use helper functions from withdraw.ts
-  // to update and filter withdrawable denoms.
+  const sessionSecretSeed = encodeCrock(getRandomBytes(64));
 
-  const { availableAmount, availableDenoms } = await wex.db.runReadOnlyTx(
-    ["denominations"],
-    async (tx) => {
-      const oldDenom = await getDenomInfo(
-        wex,
-        tx,
-        exch.exchangeBaseUrl,
-        coin.denomPubHash,
-      );
+  const oldDenom = await getDenomInfo(
+    wex,
+    tx,
+    exchangeBaseUrl,
+    oldCoin.denomPubHash,
+  );
 
-      if (!oldDenom) {
-        throw Error("db inconsistent: denomination for coin not found");
-      }
+  if (!oldDenom) {
+    throw Error("db inconsistent: denomination for coin not found");
+  }
 
-      // FIXME: Use denom groups instead of querying all denominations!
-      const availableDenoms: DenominationRecord[] =
-        await tx.denominations.indexes.byExchangeBaseUrl.getAll(
-          exch.exchangeBaseUrl,
-        );
+  const currency = refreshGroup.currency;
 
-      const availableAmount = Amounts.sub(
-        refreshGroup.inputPerCoin[coinIndex],
-        oldDenom.feeRefresh,
-      ).amount;
-      return { availableAmount, availableDenoms };
-    },
+  const availableDenoms = await getCandidateWithdrawalDenomsTx(
+    wex,
+    tx,
+    exchangeBaseUrl,
+    currency,
   );
 
+  const availableAmount = Amounts.sub(
+    refreshGroup.inputPerCoin[coinIndex],
+    oldDenom.feeRefresh,
+  ).amount;
+
   const newCoinDenoms = selectWithdrawalDenominations(
     availableAmount,
     availableDenoms,
     wex.ws.config.testing.denomselAllowLate,
   );
 
-  const transactionId = constructTransactionIdentifier({
-    tag: TransactionType.Refresh,
-    refreshGroupId,
-  });
-
   if (newCoinDenoms.selectedDenoms.length === 0) {
     logger.trace(
       `not refreshing, available amount ${amountToPretty(
         availableAmount,
       )} too small`,
     );
-    const transitionInfo = await wex.db.runReadWriteTx(
-      ["refreshGroups", "coins", "coinAvailability"],
-      async (tx) => {
-        const rg = await tx.refreshGroups.get(refreshGroupId);
-        if (!rg) {
-          return;
-        }
-        const oldTxState = computeRefreshTransactionState(rg);
-        rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
-        const updateRes = updateGroupStatus(rg);
-        if (updateRes.final) {
-          await makeCoinsVisible(wex, tx, transactionId);
-        }
-        await tx.refreshGroups.put(rg);
-        const newTxState = computeRefreshTransactionState(rg);
-        return { oldTxState, newTxState };
-      },
-    );
-    wex.ws.notify({
-      type: NotificationType.BalanceChange,
-      hintTransactionId: transactionId,
-    });
-    notifyTransition(wex, transactionId, transitionInfo);
-    return;
+    refreshGroup.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
   }
 
-  const sessionSecretSeed = encodeCrock(getRandomBytes(64));
+  for (let i = 0; i < newCoinDenoms.selectedDenoms.length; i++) {
+    const dph = newCoinDenoms.selectedDenoms[i].denomPubHash;
+    const denom = await getDenomInfo(wex, tx, oldDenom.exchangeBaseUrl, dph);
+    if (!denom) {
+      logger.error(`denom ${dph} not in DB`);
+      continue;
+    }
+    const car = await getCoinAvailabilityForDenom(
+      wex,
+      tx,
+      denom,
+      oldCoin.maxAge,
+    );
+    car.pendingRefreshOutputCount =
+      (car.pendingRefreshOutputCount ?? 0) +
+      newCoinDenoms.selectedDenoms[i].count;
+    await tx.coinAvailability.put(car);
+  }
 
-  // Store refresh session for this coin in the database.
-  const mySession = await wex.db.runReadWriteTx(
-    ["refreshGroups", "refreshSessions"],
-    async (tx) => {
-      const rg = await tx.refreshGroups.get(refreshGroupId);
-      if (!rg) {
-        return;
-      }
-      const existingSession = await tx.refreshSessions.get([
-        refreshGroupId,
-        coinIndex,
-      ]);
-      if (existingSession) {
-        return existingSession;
-      }
-      const newSession: RefreshSessionRecord = {
-        coinIndex,
-        refreshGroupId,
-        norevealIndex: undefined,
-        sessionSecretSeed: sessionSecretSeed,
-        newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({
-          count: x.count,
-          denomPubHash: x.denomPubHash,
-        })),
-        amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue),
-      };
-      await tx.refreshSessions.put(newSession);
-      return newSession;
-    },
-  );
-  logger.trace(
-    `found/created refresh session for coin #${coinIndex} in 
${refreshGroupId}`,
-  );
-  return mySession;
+  const newSession: RefreshSessionRecord = {
+    coinIndex,
+    refreshGroupId,
+    norevealIndex: undefined,
+    sessionSecretSeed: sessionSecretSeed,
+    newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({
+      count: x.count,
+      denomPubHash: x.denomPubHash,
+    })),
+    amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue),
+  };
+  await tx.refreshSessions.put(newSession);
 }
 
 function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration {
@@ -499,6 +437,14 @@ function getRefreshRequestTimeout(rg: RefreshGroupRecord): 
Duration {
   });
 }
 
+/**
+ * Run the melt step of a refresh session.
+ *
+ * If the melt step succeeds or fails permanently,
+ * the status in the refresh group is updated.
+ *
+ * When a transient error occurs, an exception is thrown.
+ */
 async function refreshMelt(
   wex: WalletExecutionContext,
   refreshGroupId: string,
@@ -627,7 +573,7 @@ async function refreshMelt(
 
   if (resp.status === HttpStatusCode.NotFound) {
     const errDetails = await readUnexpectedResponseDetails(resp);
-    const transitionInfo = await wex.db.runReadWriteTx(
+    await wex.db.runReadWriteTx(
       ["refreshGroups", "refreshSessions", "coins", "coinAvailability"],
       async (tx) => {
         const rg = await tx.refreshGroups.get(refreshGroupId);
@@ -640,7 +586,6 @@ async function refreshMelt(
         if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
           return;
         }
-        const oldTxState = computeRefreshTransactionState(rg);
         rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
         const refreshSession = await tx.refreshSessions.get([
           refreshGroupId,
@@ -652,24 +597,10 @@ async function refreshMelt(
           );
         }
         refreshSession.lastError = errDetails;
-        const updateRes = updateGroupStatus(rg);
-        if (updateRes.final) {
-          await makeCoinsVisible(wex, tx, transactionId);
-        }
         await tx.refreshGroups.put(rg);
         await tx.refreshSessions.put(refreshSession);
-        const newTxState = computeRefreshTransactionState(rg);
-        return {
-          oldTxState,
-          newTxState,
-        };
       },
     );
-    wex.ws.notify({
-      type: NotificationType.BalanceChange,
-      hintTransactionId: transactionId,
-    });
-    notifyTransition(wex, transactionId, transitionInfo);
     return;
   }
 
@@ -718,6 +649,7 @@ async function refreshMelt(
             const input = Amounts.parseOrThrow(rg.inputPerCoin[rs.coinIndex]);
             const newSel = selectWithdrawalDenominations(input, candidates);
             rs.amountRefreshOutput = newSel.totalCoinValue;
+            // FIXME: This is wrong! When denoms are re-selected, the melt 
commitment breaks.
             rs.newDenoms = newSel.selectedDenoms.map((x) => ({
               count: x.count,
               denomPubHash: x.denomPubHash,
@@ -1043,7 +975,7 @@ async function refreshReveal(
     }
   }
 
-  const transitionInfo = await wex.db.runReadWriteTx(
+  await wex.db.runReadWriteTx(
     [
       "coins",
       "denominations",
@@ -1061,19 +993,32 @@ async function refreshReveal(
       if (!rs) {
         return;
       }
-      const oldTxState = computeRefreshTransactionState(rg);
       rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
-      updateGroupStatus(rg);
       for (const coin of coins) {
-        await makeCoinAvailable(wex, tx, coin);
+        const denomInfo = await getDenomInfo(
+          wex,
+          tx,
+          coin.exchangeBaseUrl,
+          coin.denomPubHash,
+        );
+        checkDbInvariant(!!denomInfo);
+        const car = await getCoinAvailabilityForDenom(
+          wex,
+          tx,
+          denomInfo,
+          coin.maxAge,
+        );
+        checkDbInvariant(
+          car.pendingRefreshOutputCount != null &&
+            car.pendingRefreshOutputCount > 0,
+        );
+        car.pendingRefreshOutputCount--;
+        car.freshCoinCount++;
+        await tx.coinAvailability.put(car);
       }
-      await makeCoinsVisible(wex, tx, transactionId);
       await tx.refreshGroups.put(rg);
-      const newTxState = computeRefreshTransactionState(rg);
-      return { oldTxState, newTxState };
     },
   );
-  notifyTransition(wex, transactionId, transitionInfo);
   logger.trace("refresh finished (end of reveal)");
 }
 
@@ -1121,17 +1066,71 @@ export async function processRefreshGroup(
       errors.push(getErrorDetailFromException(x));
     }),
   );
-  try {
-    logger.info("waiting for refreshes");
-    await Promise.all(ps);
-    logger.info("refresh group finished");
-  } catch (e) {
-    logger.warn("process refresh sessions got exception");
-    logger.warn(`exception: ${e}`);
-  }
+  await Promise.all(ps);
   if (inShutdown) {
-    return TaskRunResult.backoff();
+    return TaskRunResult.finished();
   }
+
+  const ctx = new RefreshTransactionContext(wex, refreshGroupId);
+
+  // We've processed all refresh session and can now update the
+  // status of the whole refresh group.
+
+  const transitionInfo = await wex.db.runReadWriteTx(
+    ["coins", "coinAvailability", "refreshGroups"],
+    async (tx) => {
+      const rg = await tx.refreshGroups.get(refreshGroupId);
+      if (!rg) {
+        return;
+      }
+      switch (rg.operationStatus) {
+        case RefreshOperationStatus.Pending:
+          break;
+        default:
+          return undefined;
+      }
+      const oldTxState = computeRefreshTransactionState(rg);
+      const allFinal = fnutil.all(
+        rg.statusPerCoin,
+        (x) =>
+          x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed,
+      );
+      const anyFailed = fnutil.any(
+        rg.statusPerCoin,
+        (x) => x === RefreshCoinStatus.Failed,
+      );
+      if (allFinal) {
+        if (anyFailed) {
+          rg.timestampFinished = timestampPreciseToDb(
+            TalerPreciseTimestamp.now(),
+          );
+          rg.operationStatus = RefreshOperationStatus.Failed;
+        } else {
+          rg.timestampFinished = timestampPreciseToDb(
+            TalerPreciseTimestamp.now(),
+          );
+          rg.operationStatus = RefreshOperationStatus.Finished;
+        }
+      }
+      if (allFinal) {
+        await makeCoinsVisible(wex, tx, ctx.transactionId);
+        await tx.refreshGroups.put(rg);
+        const newTxState = computeRefreshTransactionState(rg);
+        return {
+          oldTxState,
+          newTxState,
+        };
+      } else {
+        return undefined;
+      }
+    },
+  );
+
+  if (transitionInfo) {
+    notifyTransition(wex, ctx.transactionId, transitionInfo);
+    return TaskRunResult.progress();
+  }
+
   if (errors.length > 0) {
     return {
       type: TaskRunResultType.Error,
@@ -1174,16 +1173,7 @@ async function processRefreshSession(
     return;
   }
   if (!refreshSession) {
-    refreshSession = await provideRefreshSession(
-      wex,
-      refreshGroupId,
-      coinIndex,
-    );
-  }
-  if (!refreshSession) {
-    // We tried to create the refresh session, but didn't get a result back.
-    // This means that either the session is finished, or that creating
-    // one isn't necessary.
+    // No refresh session for that coin.
     return;
   }
   if (refreshSession.norevealIndex === undefined) {
@@ -1268,7 +1258,7 @@ export async function calculateRefreshOutput(
   };
 }
 
-async function applyRefresh(
+async function applyRefreshToOldCoins(
   wex: WalletExecutionContext,
   tx: WalletDbReadWriteTransaction<
     ["denominations", "coins", "refreshGroups", "coinAvailability"]
@@ -1347,20 +1337,29 @@ export interface CreateRefreshGroupResult {
 export async function createRefreshGroup(
   wex: WalletExecutionContext,
   tx: WalletDbReadWriteTransaction<
-    ["denominations", "coins", "refreshGroups", "coinAvailability"]
+    [
+      "denominations",
+      "coins",
+      "refreshGroups",
+      "refreshSessions",
+      "coinAvailability",
+    ]
   >,
   currency: string,
   oldCoinPubs: CoinRefreshRequest[],
   refreshReason: RefreshReason,
   originatingTransactionId: string | undefined,
 ): Promise<CreateRefreshGroupResult> {
+  // FIXME: Check that involved exchanges are reasonably up-to-date.
+  // Otherwise, error out.
+
   const refreshGroupId = encodeCrock(getRandomBytes(32));
 
   const outInfo = await calculateRefreshOutput(wex, tx, currency, oldCoinPubs);
 
   const estimatedOutputPerCoin = outInfo.outputPerCoin;
 
-  await applyRefresh(wex, tx, oldCoinPubs, refreshGroupId);
+  await applyRefreshToOldCoins(wex, tx, oldCoinPubs, refreshGroupId);
 
   const refreshGroup: RefreshGroupRecord = {
     operationStatus: RefreshOperationStatus.Pending,
@@ -1387,6 +1386,10 @@ export async function createRefreshGroup(
     refreshGroup.operationStatus = RefreshOperationStatus.Finished;
   }
 
+  for (let i = 0; i < oldCoinPubs.length; i++) {
+    await initRefreshSession(wex, tx, refreshGroup, i);
+  }
+
   await tx.refreshGroups.put(refreshGroup);
 
   const newTxState = computeRefreshTransactionState(refreshGroup);
@@ -1487,7 +1490,13 @@ export async function forceRefresh(
     throw Error("refusing to create empty refresh group");
   }
   const res = await wex.db.runReadWriteTx(
-    ["refreshGroups", "coinAvailability", "denominations", "coins"],
+    [
+      "refreshGroups",
+      "coinAvailability",
+      "refreshSessions",
+      "denominations",
+      "coins",
+    ],
     async (tx) => {
       let coinPubs: CoinRefreshRequest[] = [];
       for (const c of req.coinPubList) {

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]