gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] branch master updated: wallet-core: unify handling o


From: gnunet
Subject: [taler-wallet-core] branch master updated: wallet-core: unify handling of run-until-done, simplify waiter implementation
Date: Mon, 22 Apr 2024 23:29:14 +0200

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

The following commit(s) were added to refs/heads/master by this push:
     new a181ee06e wallet-core: unify handling of run-until-done, simplify 
waiter implementation
a181ee06e is described below

commit a181ee06e4b52cb35e00ff8c86acff315135faf2
Author: Florian Dold <florian@dold.me>
AuthorDate: Mon Apr 22 23:29:07 2024 +0200

    wallet-core: unify handling of run-until-done, simplify waiter 
implementation
---
 packages/taler-harness/src/bench1.ts               |   9 +-
 packages/taler-harness/src/bench3.ts               |   8 +-
 packages/taler-util/src/notifications.ts           |   8 +-
 packages/taler-util/src/wallet-types.ts            |  10 -
 packages/taler-wallet-cli/src/index.ts             |  31 +--
 packages/taler-wallet-core/src/common.ts           |  52 ++++
 .../taler-wallet-core/src/observable-wrappers.ts   |  15 +-
 packages/taler-wallet-core/src/shepherd.ts         |  54 ++--
 packages/taler-wallet-core/src/testing.ts          | 297 +++++++++------------
 packages/taler-wallet-core/src/wallet-api-types.ts |  11 +
 packages/taler-wallet-core/src/wallet.ts           |  15 +-
 11 files changed, 247 insertions(+), 263 deletions(-)

diff --git a/packages/taler-harness/src/bench1.ts 
b/packages/taler-harness/src/bench1.ts
index 428114e0e..216760260 100644
--- a/packages/taler-harness/src/bench1.ts
+++ b/packages/taler-harness/src/bench1.ts
@@ -29,7 +29,6 @@ import {
 } from "@gnu-taler/taler-util";
 import {
   AccessStats,
-  applyRunConfigDefaults,
   createNativeWalletHost2,
   Wallet,
   WalletApiOperation,
@@ -105,9 +104,7 @@ export async function runBench1(configJson: any): 
Promise<void> {
       exchangeBaseUrl: b1conf.exchange,
     });
 
-    await wallet.runTaskLoop({
-      stopWhenDone: true,
-    });
+    await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
 
     logger.info(
       `Finished withdrawal amount=${withdrawAmount} time=${Date.now() - 
start}`,
@@ -123,9 +120,7 @@ export async function runBench1(configJson: any): 
Promise<void> {
           depositPaytoUri: b1conf.payto,
         });
 
-        await wallet.runTaskLoop({
-          stopWhenDone: true,
-        });
+        await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
 
         logger.info(`Finished deposit amount=10 time=${Date.now() - start}`);
       }
diff --git a/packages/taler-harness/src/bench3.ts 
b/packages/taler-harness/src/bench3.ts
index f138dff68..a5bc094df 100644
--- a/packages/taler-harness/src/bench3.ts
+++ b/packages/taler-harness/src/bench3.ts
@@ -115,9 +115,7 @@ export async function runBench3(configJson: any): 
Promise<void> {
       exchangeBaseUrl: b3conf.exchange,
     });
 
-    await wallet.runTaskLoop({
-      stopWhenDone: true,
-    });
+    await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
 
     logger.info(
       `Finished withdrawal amount=${withdrawAmount} time=${Date.now() - 
start}`,
@@ -135,9 +133,7 @@ export async function runBench3(configJson: any): 
Promise<void> {
         depositPaytoUri: payto,
       });
 
-      await wallet.runTaskLoop({
-        stopWhenDone: true,
-      });
+      await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
 
       logger.info(`Finished deposit amount=10 time=${Date.now() - start}`);
     }
diff --git a/packages/taler-util/src/notifications.ts 
b/packages/taler-util/src/notifications.ts
index 1c6ca4b85..b60fb267c 100644
--- a/packages/taler-util/src/notifications.ts
+++ b/packages/taler-util/src/notifications.ts
@@ -32,6 +32,7 @@ export enum NotificationType {
   TransactionStateTransition = "transaction-state-transition",
   WithdrawalOperationTransition = "withdrawal-operation-transition",
   ExchangeStateTransition = "exchange-state-transition",
+  Idle = "idle",
   TaskObservabilityEvent = "task-observability-event",
   RequestObservabilityEvent = "request-observability-event",
 }
@@ -230,6 +231,10 @@ export interface WithdrawalOperationTransitionNotification 
{
   uri: string;
 }
 
+export interface IdleNotification {
+  type: NotificationType.Idle;
+}
+
 export type WalletNotification =
   | BalanceChangeNotification
   | WithdrawalOperationTransitionNotification
@@ -237,4 +242,5 @@ export type WalletNotification =
   | ExchangeStateTransitionNotification
   | TransactionStateTransitionNotification
   | TaskProgressNotification
-  | RequestProgressNotification;
+  | RequestProgressNotification
+  | IdleNotification;
diff --git a/packages/taler-util/src/wallet-types.ts 
b/packages/taler-util/src/wallet-types.ts
index 0653bc473..d39eb3ce9 100644
--- a/packages/taler-util/src/wallet-types.ts
+++ b/packages/taler-util/src/wallet-types.ts
@@ -3213,16 +3213,6 @@ export const codecForRemoveGlobalCurrencyAuditorRequest =
       .property("auditorPub", codecForString())
       .build("RemoveGlobalCurrencyAuditorRequest");
 
-export interface RetryLoopOpts {
-  /**
-   * Stop the retry loop when all lifeness-giving pending operations
-   * are done.
-   *
-   * Defaults to false.
-   */
-  stopWhenDone?: boolean;
-}
-
 /**
  * Information about one provider.
  *
diff --git a/packages/taler-wallet-cli/src/index.ts 
b/packages/taler-wallet-cli/src/index.ts
index 8c4760223..f0ff02903 100644
--- a/packages/taler-wallet-cli/src/index.ts
+++ b/packages/taler-wallet-cli/src/index.ts
@@ -358,25 +358,6 @@ async function withWallet<T>(
   }
 }
 
-/**
- * Run a function with a local wallet.
- *
- * Stops the wallet after the function is done.
- */
-async function withLocalWallet<T>(
-  walletCliArgs: WalletCliArgsType,
-  f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>,
-): Promise<T> {
-  const wh = await createLocalWallet(walletCliArgs, (notif) => {
-    writeObservabilityLog(notif);
-  });
-  const w = wh.wallet;
-  const res = await f({ client: w.client, ws: w });
-  logger.info("Work done, stopping wallet.");
-  w.stop();
-  return res;
-}
-
 walletCli
   .subcommand("balance", "balance", { help: "Show wallet balance." })
   .flag("json", ["--json"], {
@@ -584,12 +565,8 @@ walletCli
     help: "Run until no more work is left.",
   })
   .action(async (args) => {
-    await withLocalWallet(args, async (wallet) => {
-      logger.info("running until pending operations are finished");
-      await wallet.ws.runTaskLoop({
-        stopWhenDone: true,
-      });
-      wallet.ws.stop();
+    await withWallet(args, async (ctx) => {
+      await ctx.client.call(WalletApiOperation.TestingWaitTasksDone, {});
     });
   });
 
@@ -1330,9 +1307,7 @@ advancedCli
       exchangeBaseUrl: "http://localhost:8081/";,
       merchantBaseUrl: "http://localhost:8083/";,
     });
-    await wallet.runTaskLoop({
-      stopWhenDone: true,
-    });
+    await wallet.client.call(WalletApiOperation.TestingWaitTasksDone, {});
     wallet.stop();
   });
 
diff --git a/packages/taler-wallet-core/src/common.ts 
b/packages/taler-wallet-core/src/common.ts
index 6d116c47e..edaba5ba4 100644
--- a/packages/taler-wallet-core/src/common.ts
+++ b/packages/taler-wallet-core/src/common.ts
@@ -21,6 +21,7 @@ import {
   AbsoluteTime,
   AmountJson,
   Amounts,
+  AsyncFlag,
   CoinRefreshRequest,
   CoinStatus,
   Duration,
@@ -35,6 +36,7 @@ import {
   TalerProtocolTimestamp,
   TombstoneIdStr,
   TransactionIdStr,
+  WalletNotification,
   assertUnreachable,
   checkDbInvariant,
   checkLogicInvariant,
@@ -769,3 +771,53 @@ export enum PendingTaskType {
 
 declare const __taskIdStr: unique symbol;
 export type TaskIdStr = string & { [__taskIdStr]: true };
+
+/**
+ * Wait until the wallet is in a particular state.
+ *
+ * Two functions must be provided:
+ * 1. checkState, which checks if the wallet is in the
+ *    desired state.
+ * 2. filterNotification, which checks whether a notification
+ *    might have lead to a state change.
+ */
+export async function genericWaitForState(
+  wex: WalletExecutionContext,
+  args: {
+    checkState: () => Promise<boolean>;
+    filterNotification: (notif: WalletNotification) => boolean;
+  },
+): Promise<void> {
+  await wex.taskScheduler.ensureRunning();
+
+  // FIXME: Clean up using the new JS "using" / Symbol.dispose syntax.
+  const flag = new AsyncFlag();
+  // Raise purchaseNotifFlag whenever we get a notification
+  // about our refresh.
+  const cancelNotif = wex.ws.addNotificationListener((notif) => {
+    if (args.filterNotification(notif)) {
+      flag.raise();
+    }
+  });
+  const unregisterOnCancelled = wex.cancellationToken.onCancelled(() => {
+    cancelNotif();
+    flag.raise();
+  });
+
+  try {
+    while (true) {
+      if (wex.cancellationToken.isCancelled) {
+        throw Error("cancelled");
+      }
+      if (await args.checkState()) {
+        return;
+      }
+      // Wait for the next transition
+      await flag.wait();
+      flag.reset();
+    }
+  } catch (e) {
+    unregisterOnCancelled();
+    cancelNotif();
+  }
+}
diff --git a/packages/taler-wallet-core/src/observable-wrappers.ts 
b/packages/taler-wallet-core/src/observable-wrappers.ts
index b36f41611..7cd65f38e 100644
--- a/packages/taler-wallet-core/src/observable-wrappers.ts
+++ b/packages/taler-wallet-core/src/observable-wrappers.ts
@@ -25,7 +25,6 @@ import { IDBDatabase } from "@gnu-taler/idb-bridge";
 import {
   ObservabilityContext,
   ObservabilityEventType,
-  RetryLoopOpts,
 } from "@gnu-taler/taler-util";
 import { TaskIdStr } from "./common.js";
 import { TalerCryptoInterface } from "./index.js";
@@ -65,13 +64,14 @@ export class ObservableTaskScheduler implements 
TaskScheduler {
     return this.impl.getActiveTasks();
   }
 
-  ensureRunning(): void {
-    return this.impl.ensureRunning();
+  isIdle(): boolean {
+    return this.impl.isIdle();
   }
 
-  run(opts?: RetryLoopOpts | undefined): Promise<void> {
-    return this.impl.run(opts);
+  ensureRunning(): Promise<void> {
+    return this.impl.ensureRunning();
   }
+
   startShepherdTask(taskId: TaskIdStr): void {
     this.declareDep(taskId);
     this.oc.observe({
@@ -80,6 +80,7 @@ export class ObservableTaskScheduler implements TaskScheduler 
{
     });
     return this.impl.startShepherdTask(taskId);
   }
+
   stopShepherdTask(taskId: TaskIdStr): void {
     this.declareDep(taskId);
     this.oc.observe({
@@ -88,6 +89,7 @@ export class ObservableTaskScheduler implements TaskScheduler 
{
     });
     return this.impl.stopShepherdTask(taskId);
   }
+
   resetTaskRetries(taskId: TaskIdStr): Promise<void> {
     this.declareDep(taskId);
     if (this.taskDepCache.size > 500) {
@@ -99,7 +101,8 @@ export class ObservableTaskScheduler implements 
TaskScheduler {
     });
     return this.impl.resetTaskRetries(taskId);
   }
-  reload(): void {
+
+  async reload(): Promise<void> {
     return this.impl.reload();
   }
 }
diff --git a/packages/taler-wallet-core/src/shepherd.ts 
b/packages/taler-wallet-core/src/shepherd.ts
index 58bdcf0dd..aae6d5a18 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -27,7 +27,6 @@ import {
   NotificationType,
   ObservabilityContext,
   ObservabilityEventType,
-  RetryLoopOpts,
   TalerErrorDetail,
   TaskThrottler,
   TransactionIdStr,
@@ -142,13 +141,13 @@ function taskGivesLiveness(taskId: string): boolean {
 }
 
 export interface TaskScheduler {
-  ensureRunning(): void;
-  run(opts?: RetryLoopOpts): Promise<void>;
+  ensureRunning(): Promise<void>;
   startShepherdTask(taskId: TaskIdStr): void;
   stopShepherdTask(taskId: TaskIdStr): void;
   resetTaskRetries(taskId: TaskIdStr): Promise<void>;
-  reload(): void;
+  reload(): Promise<void>;
   getActiveTasks(): TaskIdStr[];
+  isIdle(): boolean;
 }
 
 export class TaskSchedulerImpl implements TaskScheduler {
@@ -176,10 +175,11 @@ export class TaskSchedulerImpl implements TaskScheduler {
     return [...this.sheps.keys()];
   }
 
-  ensureRunning(): void {
+  async ensureRunning(): Promise<void> {
     if (this.isRunning) {
       return;
     }
+    await this.loadTasksFromDb();
     this.run()
       .catch((e) => {
         logger.error("error running task loop");
@@ -190,7 +190,22 @@ export class TaskSchedulerImpl implements TaskScheduler {
       });
   }
 
-  async run(opts: RetryLoopOpts = {}): Promise<void> {
+  isIdle(): boolean {
+    let alive = false;
+    const taskIds = [...this.sheps.keys()];
+    logger.info(`current task IDs: ${j2s(taskIds)}`);
+    logger.info(`sheps: ${this.sheps.size}`);
+    for (const taskId of taskIds) {
+      if (taskGivesLiveness(taskId)) {
+        alive = true;
+        break;
+      }
+    }
+    // We're idle if no task is alive anymore.
+    return !alive;
+  }
+
+  private async run(): Promise<void> {
     if (this.isRunning) {
       throw Error("task loop already running");
     }
@@ -200,26 +215,17 @@ export class TaskSchedulerImpl implements TaskScheduler {
     logger.info("loaded!");
     logger.info(`sheps: ${this.sheps.size}`);
     while (true) {
-      if (opts.stopWhenDone) {
-        let alive = false;
-        const taskIds = [...this.sheps.keys()];
-        logger.info(`current task IDs: ${j2s(taskIds)}`);
-        logger.info(`sheps: ${this.sheps.size}`);
-        for (const taskId of taskIds) {
-          if (taskGivesLiveness(taskId)) {
-            alive = true;
-            break;
-          }
-        }
-        if (!alive) {
-          logger.info("Breaking out of task loop (no more work).");
-          break;
-        }
-      }
       if (this.ws.stopped) {
         logger.info("Breaking out of task loop (wallet stopped).");
         break;
       }
+
+      if (this.isIdle()) {
+        this.ws.notify({
+          type: NotificationType.Idle,
+        });
+      }
+
       await this.iterCond.wait();
     }
     this.isRunning = false;
@@ -237,8 +243,8 @@ export class TaskSchedulerImpl implements TaskScheduler {
    *
    * Mostly useful to interrupt all waits when time-travelling.
    */
-  reload(): void {
-    this.ensureRunning();
+  async reload(): Promise<void> {
+    await this.ensureRunning();
     const tasksIds = [...this.sheps.keys()];
     logger.info(`reloading sheperd with ${tasksIds.length} tasks`);
     for (const taskId of tasksIds) {
diff --git a/packages/taler-wallet-core/src/testing.ts 
b/packages/taler-wallet-core/src/testing.ts
index 32c0765b4..2f149cfa8 100644
--- a/packages/taler-wallet-core/src/testing.ts
+++ b/packages/taler-wallet-core/src/testing.ts
@@ -39,8 +39,6 @@ import {
   j2s,
   Logger,
   NotificationType,
-  OpenedPromise,
-  openPromise,
   parsePaytoUri,
   PreparePayResultType,
   TalerCorebankApiClient,
@@ -58,6 +56,7 @@ import {
   readSuccessResponseJsonOrThrow,
 } from "@gnu-taler/taler-util/http";
 import { getBalances } from "./balance.js";
+import { genericWaitForState } from "./common.js";
 import { createDepositGroup } from "./deposits.js";
 import { fetchFreshExchange } from "./exchanges.js";
 import {
@@ -402,52 +401,56 @@ export async function waitUntilAllTransactionsFinal(
   wex: WalletExecutionContext,
 ): Promise<void> {
   logger.info("waiting until all transactions are in a final state");
-  wex.taskScheduler.ensureRunning();
-  let p: OpenedPromise<void> | undefined = undefined;
-  const cancelNotifs = wex.ws.addNotificationListener((notif) => {
-    if (!p) {
-      return;
-    }
-    if (notif.type === NotificationType.TransactionStateTransition) {
+  await wex.taskScheduler.ensureRunning();
+  await genericWaitForState(wex, {
+    filterNotification(notif) {
+      if (notif.type !== NotificationType.TransactionStateTransition) {
+        return false;
+      }
       switch (notif.newTxState.major) {
         case TransactionMajorState.Pending:
         case TransactionMajorState.Aborting:
-          break;
+          return false;
         default:
-          p.resolve();
+          return true;
       }
-    }
-  });
-  while (1) {
-    p = openPromise();
-    const txs = await getTransactions(wex, {
-      includeRefreshes: true,
-      filterByState: "nonfinal",
-    });
-    let finished = true;
-    for (const tx of txs.transactions) {
-      switch (tx.txState.major) {
-        case TransactionMajorState.Pending:
-        case TransactionMajorState.Aborting:
-        case TransactionMajorState.Suspended:
-        case TransactionMajorState.SuspendedAborting:
-          finished = false;
-          logger.info(
-            `continuing waiting, ${tx.transactionId} in 
${tx.txState.major}(${tx.txState.minor})`,
-          );
-          break;
+    },
+    async checkState() {
+      const txs = await getTransactions(wex, {
+        includeRefreshes: true,
+        filterByState: "nonfinal",
+      });
+      for (const tx of txs.transactions) {
+        switch (tx.txState.major) {
+          case TransactionMajorState.Pending:
+          case TransactionMajorState.Aborting:
+          case TransactionMajorState.Suspended:
+          case TransactionMajorState.SuspendedAborting:
+            logger.info(
+              `continuing waiting, ${tx.transactionId} in 
${tx.txState.major}(${tx.txState.minor})`,
+            );
+            return false;
+        }
       }
-    }
-    if (finished) {
-      break;
-    }
-    // Wait until transaction state changed
-    await p.promise;
-  }
-  cancelNotifs();
+      return true;
+    },
+  });
   logger.info("done waiting until all transactions are in a final state");
 }
 
+export async function waitTasksDone(
+  wex: WalletExecutionContext,
+): Promise<void> {
+  await genericWaitForState(wex, {
+    async checkState() {
+      return wex.taskScheduler.isIdle();
+    },
+    filterNotification(notif) {
+      return notif.type === NotificationType.Idle;
+    },
+  });
+}
+
 /**
  * Wait until all chosen transactions are in a final state.
  */
@@ -462,59 +465,51 @@ export async function waitUntilGivenTransactionsFinal(
   if (transactionIds.length === 0) {
     return;
   }
-  wex.taskScheduler.ensureRunning();
+
   const txIdSet = new Set(transactionIds);
-  let p: OpenedPromise<void> | undefined = undefined;
-  const cancelNotifs = wex.ws.addNotificationListener((notif) => {
-    if (!p) {
-      return;
-    }
-    if (notif.type === NotificationType.TransactionStateTransition) {
+
+  await genericWaitForState(wex, {
+    filterNotification(notif) {
+      if (notif.type !== NotificationType.TransactionStateTransition) {
+        return false;
+      }
       if (!txIdSet.has(notif.transactionId)) {
-        return;
+        return false;
       }
       switch (notif.newTxState.major) {
         case TransactionMajorState.Pending:
         case TransactionMajorState.Aborting:
         case TransactionMajorState.Suspended:
         case TransactionMajorState.SuspendedAborting:
-          break;
-        default:
-          p.resolve();
-      }
-    }
-  });
-  while (1) {
-    p = openPromise();
-    const txs = await getTransactions(wex, {
-      includeRefreshes: true,
-      filterByState: "nonfinal",
-    });
-    let finished = true;
-    for (const tx of txs.transactions) {
-      if (!txIdSet.has(tx.transactionId)) {
-        // Don't look at this transaction, we're not interested in it.
-        continue;
+          return false;
       }
-      switch (tx.txState.major) {
-        case TransactionMajorState.Pending:
-        case TransactionMajorState.Aborting:
-        case TransactionMajorState.Suspended:
-        case TransactionMajorState.SuspendedAborting:
-          finished = false;
-          logger.info(
-            `continuing waiting, ${tx.transactionId} in 
${tx.txState.major}(${tx.txState.minor})`,
-          );
-          break;
+      return true;
+    },
+    async checkState() {
+      const txs = await getTransactions(wex, {
+        includeRefreshes: true,
+        filterByState: "nonfinal",
+      });
+      for (const tx of txs.transactions) {
+        if (!txIdSet.has(tx.transactionId)) {
+          // Don't look at this transaction, we're not interested in it.
+          continue;
+        }
+        switch (tx.txState.major) {
+          case TransactionMajorState.Pending:
+          case TransactionMajorState.Aborting:
+          case TransactionMajorState.Suspended:
+          case TransactionMajorState.SuspendedAborting:
+            logger.info(
+              `continuing waiting, ${tx.transactionId} in 
${tx.txState.major}(${tx.txState.minor})`,
+            );
+            return false;
+        }
       }
-    }
-    if (finished) {
-      break;
-    }
-    // Wait until transaction state changed
-    await p.promise;
-  }
-  cancelNotifs();
+      // No transaction is pending, we're done waiting!
+      return true;
+    },
+  });
   logger.info("done waiting until given transactions are in a final state");
 }
 
@@ -522,52 +517,43 @@ export async function waitUntilRefreshesDone(
   wex: WalletExecutionContext,
 ): Promise<void> {
   logger.info("waiting until all refresh transactions are in a final state");
-  wex.taskScheduler.ensureRunning();
-  let p: OpenedPromise<void> | undefined = undefined;
-  const cancelNotifs = wex.ws.addNotificationListener((notif) => {
-    if (!p) {
-      return;
-    }
-    if (notif.type === NotificationType.TransactionStateTransition) {
+
+  await genericWaitForState(wex, {
+    filterNotification(notif) {
+      if (notif.type !== NotificationType.TransactionStateTransition) {
+        return false;
+      }
       switch (notif.newTxState.major) {
         case TransactionMajorState.Pending:
         case TransactionMajorState.Aborting:
-          break;
+          return false;
         default:
-          p.resolve();
-      }
-    }
-  });
-  while (1) {
-    p = openPromise();
-    const txs = await getTransactions(wex, {
-      includeRefreshes: true,
-      filterByState: "nonfinal",
-    });
-    let finished = true;
-    for (const tx of txs.transactions) {
-      if (tx.type !== TransactionType.Refresh) {
-        continue;
+          return true;
       }
-      switch (tx.txState.major) {
-        case TransactionMajorState.Pending:
-        case TransactionMajorState.Aborting:
-        case TransactionMajorState.Suspended:
-        case TransactionMajorState.SuspendedAborting:
-          finished = false;
-          logger.info(
-            `continuing waiting, ${tx.transactionId} in 
${tx.txState.major}(${tx.txState.minor})`,
-          );
-          break;
+    },
+    async checkState() {
+      const txs = await getTransactions(wex, {
+        includeRefreshes: true,
+        filterByState: "nonfinal",
+      });
+      for (const tx of txs.transactions) {
+        if (tx.type !== TransactionType.Refresh) {
+          continue;
+        }
+        switch (tx.txState.major) {
+          case TransactionMajorState.Pending:
+          case TransactionMajorState.Aborting:
+          case TransactionMajorState.Suspended:
+          case TransactionMajorState.SuspendedAborting:
+            logger.info(
+              `continuing waiting, ${tx.transactionId} in 
${tx.txState.major}(${tx.txState.minor})`,
+            );
+            return false;
+        }
       }
-    }
-    if (finished) {
-      break;
-    }
-    // Wait until transaction state changed
-    await p.promise;
-  }
-  cancelNotifs();
+      return true;
+    },
+  });
   logger.info("done waiting until all refreshes are in a final state");
 }
 
@@ -575,33 +561,10 @@ async function waitUntilTransactionPendingReady(
   wex: WalletExecutionContext,
   transactionId: string,
 ): Promise<void> {
-  logger.info(`starting waiting for ${transactionId} to be in pending(ready)`);
-  wex.taskScheduler.ensureRunning();
-  let p: OpenedPromise<void> | undefined = undefined;
-  const cancelNotifs = wex.ws.addNotificationListener((notif) => {
-    if (!p) {
-      return;
-    }
-    if (notif.type === NotificationType.TransactionStateTransition) {
-      p.resolve();
-    }
+  return await waitTransactionState(wex, transactionId, {
+    major: TransactionMajorState.Pending,
+    minor: TransactionMinorState.Ready,
   });
-  while (1) {
-    p = openPromise();
-    const tx = await getTransactionById(wex, {
-      transactionId,
-    });
-    if (
-      tx.txState.major == TransactionMajorState.Pending &&
-      tx.txState.minor === TransactionMinorState.Ready
-    ) {
-      break;
-    }
-    // Wait until transaction state changed
-    await p.promise;
-  }
-  logger.info(`done waiting for ${transactionId} to be in pending(ready)`);
-  cancelNotifs();
 }
 
 /**
@@ -617,34 +580,22 @@ export async function waitTransactionState(
       txState,
     )})`,
   );
-  wex.taskScheduler.ensureRunning();
-  let p: OpenedPromise<void> | undefined = undefined;
-  const cancelNotifs = wex.ws.addNotificationListener((notif) => {
-    if (!p) {
-      return;
-    }
-    if (notif.type === NotificationType.TransactionStateTransition) {
-      p.resolve();
-    }
+  await genericWaitForState(wex, {
+    async checkState() {
+      const tx = await getTransactionById(wex, {
+        transactionId,
+      });
+      return (
+        tx.txState.major === txState.major && tx.txState.minor === 
txState.minor
+      );
+    },
+    filterNotification(notif) {
+      return notif.type === NotificationType.TransactionStateTransition;
+    },
   });
-  while (1) {
-    p = openPromise();
-    const tx = await getTransactionById(wex, {
-      transactionId,
-    });
-    if (
-      tx.txState.major === txState.major &&
-      tx.txState.minor === txState.minor
-    ) {
-      break;
-    }
-    // Wait until transaction state changed
-    await p.promise;
-  }
   logger.info(
     `done waiting for ${transactionId} to be in ${JSON.stringify(txState)}`,
   );
-  cancelNotifs();
 }
 
 export async function waitUntilTransactionWithAssociatedRefreshesFinal(
@@ -669,7 +620,7 @@ export async function runIntegrationTest2(
   wex: WalletExecutionContext,
   args: IntegrationTestV2Args,
 ): Promise<void> {
-  wex.taskScheduler.ensureRunning();
+  await wex.taskScheduler.ensureRunning();
   logger.info("running test with arguments", args);
 
   const exchangeInfo = await fetchFreshExchange(wex, args.exchangeBaseUrl);
diff --git a/packages/taler-wallet-core/src/wallet-api-types.ts 
b/packages/taler-wallet-core/src/wallet-api-types.ts
index 15803ce8d..f493a6b8b 100644
--- a/packages/taler-wallet-core/src/wallet-api-types.ts
+++ b/packages/taler-wallet-core/src/wallet-api-types.ts
@@ -238,6 +238,7 @@ export enum WalletApiOperation {
   TestingWaitTransactionsFinal = "testingWaitTransactionsFinal",
   TestingWaitRefreshesFinal = "testingWaitRefreshesFinal",
   TestingWaitTransactionState = "testingWaitTransactionState",
+  TestingWaitTasksDone = "testingWaitTasksDone",
   TestingSetTimetravel = "testingSetTimetravel",
   GetCurrencySpecification = "getCurrencySpecification",
   ListStoredBackups = "listStoredBackups",
@@ -1112,6 +1113,15 @@ export type TestingWaitTransactionsFinalOp = {
   response: EmptyObject;
 };
 
+/**
+ * Wait until all transactions are in a final state.
+ */
+export type TestingWaitTasksDoneOp = {
+  op: WalletApiOperation.TestingWaitTasksDone;
+  request: EmptyObject;
+  response: EmptyObject;
+};
+
 /**
  * Wait until all refresh transactions are in a final state.
  */
@@ -1253,6 +1263,7 @@ export type WalletOperations = {
   [WalletApiOperation.TestingWaitRefreshesFinal]: TestingWaitRefreshesFinalOp;
   [WalletApiOperation.TestingSetTimetravel]: TestingSetTimetravelOp;
   [WalletApiOperation.TestingWaitTransactionState]: 
TestingWaitTransactionStateOp;
+  [WalletApiOperation.TestingWaitTasksDone]: TestingWaitTasksDoneOp;
   [WalletApiOperation.GetCurrencySpecification]: GetCurrencySpecificationOp;
   [WalletApiOperation.CreateStoredBackup]: CreateStoredBackupsOp;
   [WalletApiOperation.ListStoredBackups]: ListStoredBackupsOp;
diff --git a/packages/taler-wallet-core/src/wallet.ts 
b/packages/taler-wallet-core/src/wallet.ts
index e3aca1ac5..45f9e6078 100644
--- a/packages/taler-wallet-core/src/wallet.ts
+++ b/packages/taler-wallet-core/src/wallet.ts
@@ -55,7 +55,6 @@ import {
   PrepareWithdrawExchangeRequest,
   PrepareWithdrawExchangeResponse,
   RecoverStoredBackupRequest,
-  RetryLoopOpts,
   StoredBackupList,
   TalerError,
   TalerErrorCode,
@@ -256,6 +255,7 @@ import {
   runIntegrationTest,
   runIntegrationTest2,
   testPay,
+  waitTasksDone,
   waitTransactionState,
   waitUntilAllTransactionsFinal,
   waitUntilRefreshesDone,
@@ -734,7 +734,7 @@ async function dispatchRequestInternal<Op extends 
WalletApiOperation>(
       };
 
       // After initialization, task loop should run.
-      wex.taskScheduler.ensureRunning();
+      await wex.taskScheduler.ensureRunning();
 
       wex.ws.initCalled = true;
       return resp;
@@ -1325,6 +1325,10 @@ async function dispatchRequestInternal<Op extends 
WalletApiOperation>(
       });
       return {};
     }
+    case WalletApiOperation.TestingWaitTasksDone: {
+      await waitTasksDone(wex);
+      return {};
+    }
     case WalletApiOperation.RemoveGlobalCurrencyAuditor: {
       const req = codecForRemoveGlobalCurrencyAuditorRequest().decode(payload);
       await wex.db.runReadWriteTx(["globalCurrencyAuditors"], async (tx) => {
@@ -1394,7 +1398,7 @@ async function dispatchRequestInternal<Op extends 
WalletApiOperation>(
     case WalletApiOperation.TestingSetTimetravel: {
       const req = codecForTestingSetTimetravelRequest().decode(payload);
       setDangerousTimetravel(req.offsetMs);
-      wex.taskScheduler.reload();
+      await wex.taskScheduler.reload();
       return {};
     }
     case WalletApiOperation.DeleteExchange: {
@@ -1656,11 +1660,6 @@ export class Wallet {
     this.ws.stop();
   }
 
-  async runTaskLoop(opts?: RetryLoopOpts): Promise<void> {
-    await this.ws.ensureWalletDbOpen();
-    return this.ws.taskScheduler.run(opts);
-  }
-
   async handleCoreApiRequest(
     operation: string,
     id: string,

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]