gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-wallet-core] branch master updated: wallet-core: safer long-polli


From: gnunet
Subject: [taler-wallet-core] branch master updated: wallet-core: safer long-polling
Date: Mon, 19 Feb 2024 21:12:59 +0100

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository wallet-core.

The following commit(s) were added to refs/heads/master by this push:
     new 862ac9a16 wallet-core: safer long-polling
862ac9a16 is described below

commit 862ac9a16aa891c26355f9ad5858283c3aa029d6
Author: Florian Dold <florian@dold.me>
AuthorDate: Mon Feb 19 21:13:00 2024 +0100

    wallet-core: safer long-polling
    
    We now wait for some time if long-polling returns too early
---
 packages/taler-wallet-core/src/common.ts   | 15 ++++++++
 packages/taler-wallet-core/src/shepherd.ts | 57 +++++++++++++++++++-----------
 packages/taler-wallet-core/src/withdraw.ts | 28 ++++++++++-----
 3 files changed, 71 insertions(+), 29 deletions(-)

diff --git a/packages/taler-wallet-core/src/common.ts 
b/packages/taler-wallet-core/src/common.ts
index 9d7f2e763..45351f680 100644
--- a/packages/taler-wallet-core/src/common.ts
+++ b/packages/taler-wallet-core/src/common.ts
@@ -350,6 +350,7 @@ export enum TaskRunResultType {
   Backoff = "backoff",
   Progress = "progress",
   Error = "error",
+  LongpollReturnedPending = "longpoll-returned-pending",
   ScheduleLater = "schedule-later",
 }
 
@@ -358,6 +359,7 @@ export type TaskRunResult =
   | TaskRunErrorResult
   | TaskRunBackoffResult
   | TaskRunProgressResult
+  | TaskRunLongpollReturnedPendingResult
   | TaskRunScheduleLaterResult;
 
 export namespace TaskRunResult {
@@ -396,6 +398,15 @@ export namespace TaskRunResult {
       runAt,
     };
   }
+  /**
+   * Longpolling returned, but what we're waiting for
+   * is still pending on the other side.
+   */
+  export function longpollReturnedPending(): 
TaskRunLongpollReturnedPendingResult {
+    return {
+      type: TaskRunResultType.LongpollReturnedPending,
+    };
+  }
 }
 
 export interface TaskRunFinishedResult {
@@ -415,6 +426,10 @@ export interface TaskRunScheduleLaterResult {
   runAt: AbsoluteTime;
 }
 
+export interface TaskRunLongpollReturnedPendingResult {
+  type: TaskRunResultType.LongpollReturnedPending;
+}
+
 export interface TaskRunErrorResult {
   type: TaskRunResultType.Error;
   errorDetail: TalerErrorDetail;
diff --git a/packages/taler-wallet-core/src/shepherd.ts 
b/packages/taler-wallet-core/src/shepherd.ts
index d6fc604e8..0639b7976 100644
--- a/packages/taler-wallet-core/src/shepherd.ts
+++ b/packages/taler-wallet-core/src/shepherd.ts
@@ -227,6 +227,18 @@ export class TaskScheduler {
     this.startShepherdTask(taskId);
   }
 
+  private async wait(
+    taskId: TaskId,
+    info: ShepherdInfo,
+    delay: Duration,
+  ): Promise<void> {
+    try {
+      await info.cts.token.racePromise(this.ws.timerGroup.resolveAfter(delay));
+    } catch (e) {
+      logger.info(`waiting for ${taskId} interrupted`);
+    }
+  }
+
   private async internalShepherdTask(
     taskId: TaskId,
     info: ShepherdInfo,
@@ -250,6 +262,7 @@ export class TaskScheduler {
           Duration.fromSpec({ seconds: 60 }),
         );
       }
+      const startTime = AbsoluteTime.now();
       logger.trace(`Shepherd for ${taskId} will call handler`);
       // FIXME: This should already return the retry record.
       const res = await runTaskWithErrorReporting(this.ws, taskId, async () => 
{
@@ -273,13 +286,7 @@ export class TaskScheduler {
             const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
             delay = AbsoluteTime.remaining(t);
             logger.trace(`Waiting for ${delay.d_ms} ms`);
-            try {
-              await info.cts.token.racePromise(
-                this.ws.timerGroup.resolveAfter(delay),
-              );
-            } catch (e) {
-              logger.info(`waiting for ${taskId} interrupted`);
-            }
+            await this.wait(taskId, info, delay);
           } else {
             logger.trace("Retrying immediately.");
           }
@@ -292,13 +299,7 @@ export class TaskScheduler {
             const t = timestampAbsoluteFromDb(retryRecord.retryInfo.nextRetry);
             delay = AbsoluteTime.remaining(t);
             logger.trace(`Waiting for ${delay.d_ms} ms`);
-            try {
-              await info.cts.token.racePromise(
-                this.ws.timerGroup.resolveAfter(delay),
-              );
-            } catch (e) {
-              logger.info(`waiting for ${taskId} interrupted`);
-            }
+            await this.wait(taskId, info, delay);
           } else {
             logger.trace("Retrying immediately.");
           }
@@ -314,17 +315,27 @@ export class TaskScheduler {
           logger.trace(`Shepherd for ${taskId} got schedule-later result.`);
           const delay = AbsoluteTime.remaining(res.runAt);
           logger.trace(`Waiting for ${delay.d_ms} ms`);
-          try {
-            await info.cts.token.racePromise(
-              this.ws.timerGroup.resolveAfter(delay),
-            );
-          } catch (e) {
-            logger.info(`waiting for ${taskId} interrupted`);
-          }
+          await this.wait(taskId, info, delay);
           break;
         case TaskRunResultType.Finished:
           logger.trace(`Shepherd for ${taskId} got finished result.`);
           return;
+        case TaskRunResultType.LongpollReturnedPending: {
+          // Make sure that we are waiting a bit if long-polling returned too 
early.
+          const endTime = AbsoluteTime.now();
+          const taskDuration = AbsoluteTime.difference(endTime, startTime);
+          if (
+            Duration.cmp(taskDuration, Duration.fromSpec({ seconds: 20 })) < 0
+          ) {
+            logger.info(
+              `long-poller for ${taskId} returned unexpectedly early 
(${taskDuration.d_ms} ms), waiting 10 seconds`,
+            );
+            await this.wait(taskId, info, Duration.fromSpec({ seconds: 10 }));
+          } else {
+            logger.info(`task ${taskId} will long-poll again`);
+          }
+          break;
+        }
         default:
           assertUnreachable(res);
       }
@@ -435,6 +446,10 @@ async function runTaskWithErrorReporting(
       case TaskRunResultType.Progress:
         await storeTaskProgress(ws, opId);
         return resp;
+      case TaskRunResultType.LongpollReturnedPending:
+        // Longpoll should be run again immediately.
+        await storeTaskProgress(ws, opId);
+        return resp;
     }
   } catch (e) {
     if (e instanceof CryptoApiStoppedError) {
diff --git a/packages/taler-wallet-core/src/withdraw.ts 
b/packages/taler-wallet-core/src/withdraw.ts
index 9cf1ad36d..bfcf23588 100644
--- a/packages/taler-wallet-core/src/withdraw.ts
+++ b/packages/taler-wallet-core/src/withdraw.ts
@@ -911,6 +911,7 @@ async function processPlanchetExchangeBatchRequest(
   ws: InternalWalletState,
   wgContext: WithdrawalGroupContext,
   args: WithdrawalRequestBatchArgs,
+  cancellationToken: CancellationToken,
 ): Promise<WithdrawalBatchResult> {
   const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
   logger.info(
@@ -997,6 +998,8 @@ async function processPlanchetExchangeBatchRequest(
     const resp = await ws.http.fetch(reqUrl, {
       method: "POST",
       body: batchReq,
+      cancellationToken,
+      timeout: Duration.fromSpec({ seconds: 40 }),
     });
     if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
       await handleKycRequired(ws, withdrawalGroup, resp, 0, requestCoinIdxs);
@@ -1300,7 +1303,7 @@ async function processQueryReserve(
       `got reserve status error, EC=${result.talerErrorResponse.code}`,
     );
     if (resp.status === HttpStatusCode.NotFound) {
-      return TaskRunResult.backoff();
+      return TaskRunResult.longpollReturnedPending();
     } else {
       throwUnexpectedRequestError(resp, result.talerErrorResponse);
     }
@@ -1491,6 +1494,7 @@ async function processWithdrawalGroupPendingKyc(
 async function processWithdrawalGroupPendingReady(
   ws: InternalWalletState,
   withdrawalGroup: WithdrawalGroupRecord,
+  cancellationToken: CancellationToken,
 ): Promise<TaskRunResult> {
   const { withdrawalGroupId } = withdrawalGroup;
   const transactionId = constructTransactionIdentifier({
@@ -1553,10 +1557,15 @@ async function processWithdrawalGroupPendingReady(
   const maxBatchSize = 100;
 
   for (let i = 0; i < numTotalCoins; i += maxBatchSize) {
-    const resp = await processPlanchetExchangeBatchRequest(ws, wgContext, {
-      batchSize: maxBatchSize,
-      coinStartIndex: i,
-    });
+    const resp = await processPlanchetExchangeBatchRequest(
+      ws,
+      wgContext,
+      {
+        batchSize: maxBatchSize,
+        coinStartIndex: i,
+      },
+      cancellationToken,
+    );
     let work: Promise<void>[] = [];
     work = [];
     for (let j = 0; j < resp.coinIdxs.length; j++) {
@@ -1688,7 +1697,11 @@ export async function processWithdrawalGroup(
       );
     case WithdrawalGroupStatus.PendingReady:
       // Continue with the actual withdrawal!
-      return await processWithdrawalGroupPendingReady(ws, withdrawalGroup);
+      return await processWithdrawalGroupPendingReady(
+        ws,
+        withdrawalGroup,
+        cancellationToken,
+      );
     case WithdrawalGroupStatus.AbortingBank:
       return await processWithdrawalGroupAbortingBank(ws, withdrawalGroup);
     case WithdrawalGroupStatus.AbortedBank:
@@ -2265,8 +2278,7 @@ async function processReserveBankStatus(
   }
 
   if (!status.transfer_done) {
-    // FIXME: This is a long-poll result
-    return TaskRunResult.backoff();
+    return TaskRunResult.longpollReturnedPending();
   }
 
   const transitionInfo = await ws.db.runReadWriteTx(

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]