gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-cashless2ecash] branch master updated: feat: retry mechanism


From: gnunet
Subject: [taler-cashless2ecash] branch master updated: feat: retry mechanism
Date: Mon, 15 Apr 2024 23:21:51 +0200

This is an automated email from the git hooks/post-receive script.

joel-haeberli pushed a commit to branch master
in repository cashless2ecash.

The following commit(s) were added to refs/heads/master by this push:
     new fd7a571  feat: retry mechanism
fd7a571 is described below

commit fd7a57115ed10b6c52a4bfb7316a3b290758039f
Author: Joel-Haeberli <haebu@rubigen.ch>
AuthorDate: Mon Apr 15 23:21:30 2024 +0200

    feat: retry mechanism
---
 c2ec/attestor.go                                   |  89 ++++++++++---
 c2ec/c2ec-config.yaml                              |   2 +
 c2ec/config.go                                     |   2 +
 c2ec/db.go                                         |  17 ++-
 c2ec/db/proc-c2ec_retry_listener.sql               |  31 +++++
 c2ec/main.go                                       |  70 ++++++----
 c2ec/postgres.go                                   | 145 ++++++++++++++++-----
 c2ec/retrier.go                                    | 103 +++++++++++++++
 c2ec/wallee-client.go                              |  25 ++++
 simulation/c2ec-simulation                         | Bin 7572527 -> 7571703 
bytes
 simulation/main.go                                 |   2 +-
 wallee-c2ec/.idea/deploymentTargetDropDown.xml     |  15 ++-
 wallee-c2ec/.idea/misc.xml                         |   1 -
 wallee-c2ec/.idea/vcs.xml                          |   6 +
 .../ch/bfh/habej2/wallee_c2ec/ExchangeActivity.kt  |  30 +++--
 .../java/ch/bfh/habej2/wallee_c2ec/MainActivity.kt |  30 ++++-
 .../ch/bfh/habej2/wallee_c2ec/PaymentActivity.kt   |  15 +++
 .../wallee_c2ec/WithdrawalCreationActivity.kt      |  54 +++++---
 .../habej2/wallee_c2ec/client/c2ec/C2ECClient.kt   |  24 +++-
 19 files changed, 546 insertions(+), 115 deletions(-)

diff --git a/c2ec/attestor.go b/c2ec/attestor.go
index 5b8a696..ea777b8 100644
--- a/c2ec/attestor.go
+++ b/c2ec/attestor.go
@@ -3,8 +3,10 @@ package main
 import (
        "context"
        "errors"
+       "fmt"
        "strconv"
        "strings"
+       "time"
 )
 
 const PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE = 10
@@ -96,45 +98,96 @@ func dispatch(notification *Notification, errs chan error) {
        }
 
        transaction, err := client.GetTransaction(providerTransactionId)
+       if err != nil {
+               LogError("attestor", err)
+               prepareRetryOrAbort(withdrawalRowId, errs)
+               return
+       }
+
        finaliseOrSetRetry(
                transaction,
-               providerName,
                withdrawalRowId,
-               providerTransactionId,
-               err,
+               errs,
        )
 }
 
 func finaliseOrSetRetry(
-       t ProviderTransaction,
-       providerName string,
+       transaction ProviderTransaction,
+       withdrawalRowId int,
+       errs chan error,
+) {
+
+       if transaction == nil {
+               err := errors.New("transaction was nil. will set retry or 
abort")
+               LogError("attestor", err)
+               errs <- err
+               prepareRetryOrAbort(withdrawalRowId, errs)
+               return
+       }
+
+       completionProof := transaction.Bytes()
+       if len(completionProof) > 0 {
+               // only allow finalization operation, when the completion
+               // proof of the transaction could be retrieved
+               if transaction.AllowWithdrawal() {
+
+                       err := DB.FinaliseWithdrawal(withdrawalRowId, 
CONFIRMED, completionProof)
+                       if err != nil {
+                               LogError("attestor", err)
+                               prepareRetryOrAbort(withdrawalRowId, errs)
+                       }
+               } else {
+                       // when the received transaction is not allowed, we 
first check if the
+                       // transaction is in a final state which will not allow 
the withdrawal
+                       // and therefore the operation can be aborted, without 
further retries.
+                       if transaction.AbortWithdrawal() {
+                               err := DB.FinaliseWithdrawal(withdrawalRowId, 
ABORTED, completionProof)
+                               if err != nil {
+                                       LogError("attestor", err)
+                                       prepareRetryOrAbort(withdrawalRowId, 
errs)
+                                       return
+                               }
+                       }
+                       prepareRetryOrAbort(withdrawalRowId, errs)
+               }
+               return
+       }
+       // when the transaction proof was not present (empty proof), retry.
+       prepareRetryOrAbort(withdrawalRowId, errs)
+}
+
+// Checks wether the maximal amount of retries was already
+// reached and the withdrawal operation shall be aborted or
+// triggers the next retry by setting the last_retry_ts field
+// which will trigger the stored procedure triggering the retry
+// process. The retry counter of the retries is handled by the
+// retrier logic and shall not be set here!
+func prepareRetryOrAbort(
        withdrawalRowId int,
-       providerTransactionId string,
-       err error,
+       errs chan error,
 ) {
 
+       withdrawal, err := DB.GetWithdrawalById(withdrawalRowId)
        if err != nil {
                LogError("attestor", err)
-               // set retry
+               errs <- err
+               return
        }
 
-       // TODO: call generic function with parameters. This function must 
trigger the retry flow when not (ABORTED | CONFIRMED)
-       if t.AllowWithdrawal() {
+       if withdrawal.RetryCounter > CONFIG.Server.MaxRetries {
 
-               err = DB.FinaliseWithdrawal(withdrawalRowId, CONFIRMED, 
t.Bytes())
+               LogInfo("attestor", fmt.Sprintf("max retries for withdrawal 
with id=%d was reached. withdrawal is aborted.", withdrawal.WithdrawalId))
+               err := DB.FinaliseWithdrawal(withdrawalRowId, ABORTED, 
make([]byte, 0))
                if err != nil {
                        LogError("attestor", err)
-                       // set retry
-                       //errs <- err
                }
        } else {
-               // TODO : this might be too early ?! What if the payment was 
not yet
-               //            processed by the Wallee backend? Needs testing.
-               err = DB.FinaliseWithdrawal(withdrawalRowId, ABORTED, t.Bytes())
+
+               lastRetryTs := time.Now().Unix()
+               err := DB.SetLastRetry(withdrawalRowId, lastRetryTs)
                if err != nil {
                        LogError("attestor", err)
-                       // set retry
-                       //errs <- err
                }
        }
+
 }
diff --git a/c2ec/c2ec-config.yaml b/c2ec/c2ec-config.yaml
index 6ae068e..7698a60 100644
--- a/c2ec/c2ec-config.yaml
+++ b/c2ec/c2ec-config.yaml
@@ -6,6 +6,8 @@ c2ec:
   unix-socket-path: "c2ec.sock"
   fail-on-missing-attestors: false # forced if prod=true
   credit-account: "payto://iban/CH50030202099498" # this account must be 
specified at the providers backends as well
+  max-retries: 3
+  retry-delay-ms: 1000
   wire-gateway:
     username: "wire"
     password: "secret"
diff --git a/c2ec/config.go b/c2ec/config.go
index d54cd76..25617ff 100644
--- a/c2ec/config.go
+++ b/c2ec/config.go
@@ -21,6 +21,8 @@ type C2ECServerConfig struct {
        UnixSocketPath      string                `yaml:"unix-socket-path"`
        StrictAttestors     bool                  
`yaml:"fail-on-missing-attestors"`
        CreditAccount       string                `yaml:"credit-account"`
+       MaxRetries          int32                 `yaml:"max-retries"`
+       RetryDelayMs        int                   `yaml:"retry-delay-ms"`
        WireGateway         C2ECWireGatewayConfig `yaml:"wire-gateway"`
 }
 
diff --git a/c2ec/db.go b/c2ec/db.go
index ff1e3ca..fed8cdb 100644
--- a/c2ec/db.go
+++ b/c2ec/db.go
@@ -45,7 +45,7 @@ type Provider struct {
 }
 
 type Terminal struct {
-       TerminalID  int64  `db:"terminal_id"`
+       TerminalId  int64  `db:"terminal_id"`
        AccessToken string `db:"access_token"`
        Active      bool   `db:"active"`
        Description string `db:"description"`
@@ -95,6 +95,9 @@ type C2ECDatabase interface {
                terminalId uint64,
        ) error
 
+       // Get the withdrawal associated with the given withdrawal identifier.
+       GetWithdrawalById(withdrawalId int) (*Withdrawal, error)
+
        // Get the withdrawal associated with the given wopid.
        GetWithdrawalByWopid(wopid string) (*Withdrawal, error)
 
@@ -128,6 +131,15 @@ type C2ECDatabase interface {
                completionProof []byte,
        ) error
 
+       // Set retry will set the last_retry_ts field
+       // on the database. A trigger will then start
+       // the retry process. The timestamp must be a
+       // unix timestamp
+       SetLastRetry(withdrawalId int, lastRetryTsUnix int64) error
+
+       // Sets the retry counter for the given withdrawal.
+       SetRetryCounter(withdrawalId int, retryCounter int) error
+
        // The wire gateway allows the exchange to retrieve transactions
        // starting at a certain starting point up until a certain delta
        // if the delta is negative, previous transactions relative to the
@@ -135,6 +147,9 @@ type C2ECDatabase interface {
        // id shall be used as starting point.
        GetConfirmedWithdrawals(start int, delta int) ([]*Withdrawal, error)
 
+       // Get the provider of a terminal by the terminals id
+       GetProviderByTerminal(terminalId int) (*Provider, error)
+
        // Get a provider entry by its name
        GetTerminalProviderByName(name string) (*Provider, error)
 
diff --git a/c2ec/db/proc-c2ec_retry_listener.sql 
b/c2ec/db/proc-c2ec_retry_listener.sql
new file mode 100644
index 0000000..e98f828
--- /dev/null
+++ b/c2ec/db/proc-c2ec_retry_listener.sql
@@ -0,0 +1,31 @@
+BEGIN;
+
+SELECT _v.register_patch('proc-c2ec-retry-listener', 
ARRAY['0001-c2ec-schema'], NULL);
+
+SET search_path TO c2ec;
+
+-- to create a function, the user needs USAGE privilege on arguments and 
return types
+CREATE OR REPLACE FUNCTION emit_retry_notification() 
+RETURNS TRIGGER AS $$
+BEGIN
+    PERFORM pg_notify('retry', NEW.withdrawal_id);
+    RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+COMMENT ON FUNCTION emit_retry_notification 
+       IS 'The function emits the id of the withdrawal for which the last 
+    retry timestamp was updated. This shall trigger a retry operation.
+    How many retries are attempted is specified and handled by the 
application';
+
+-- for creating a trigger the user must have TRIGGER pivilege on the table.
+-- to execute the trigger, the user needs EXECUTE privilege on the trigger 
function.
+CREATE OR REPLACE TRIGGER c2ec_retry_notify 
+    AFTER UPDATE OF last_retry_ts
+    ON withdrawal
+    FOR EACH ROW
+    EXECUTE FUNCTION emit_retry_notification();
+COMMENT ON TRIGGER c2ec_retry_notify ON withdrawal
+    IS 'After setting the last retry timestamp on the withdrawal,
+    trigger the retry mechanism through the respective mechanism.';
+
+COMMIT;
diff --git a/c2ec/main.go b/c2ec/main.go
index f8e9029..839408f 100644
--- a/c2ec/main.go
+++ b/c2ec/main.go
@@ -69,14 +69,18 @@ func main() {
        }
        LogInfo("main", "provider clients are setup")
 
+       retryCtx, retryCancel := context.WithCancel(context.Background())
+       defer retryCancel()
+       retryErrs := make(chan error)
+       RunRetrier(retryCtx, retryErrs)
+       LogInfo("main", "retrier is running")
+
        attestorCtx, attestorCancel := context.WithCancel(context.Background())
-       defer attestorCancel() // run cancel anyway when main exits.
+       defer attestorCancel()
        attestorErrs := make(chan error)
        RunAttestor(attestorCtx, attestorErrs)
        LogInfo("main", "attestor is running")
 
-       // TODO run retry process here
-
        router := http.NewServeMux()
 
        setupBankIntegrationRoutes(router)
@@ -87,6 +91,7 @@ func main() {
                Handler: router,
        }
 
+       routerErrs := make(chan error)
        if CONFIG.Server.UseUnixDomainSocket {
 
                socket, err := net.Listen("unix", CONFIG.Server.UnixSocketPath)
@@ -103,23 +108,23 @@ func main() {
                        os.Exit(1)
                }()
 
-               // move this to goroutine
-               LogInfo("main", "serving at unix-domain-socket "+server.Addr)
-               if err = server.Serve(socket); err != nil {
-                       panic(err.Error())
-               }
+               go func() {
+                       LogInfo("main", "serving at unix-domain-socket 
"+server.Addr)
+                       if err = server.Serve(socket); err != nil {
+                               routerErrs <- err
+                       }
+               }()
        } else {
 
-               // move this to goroutine
-               server.Addr = fmt.Sprintf("%s:%d", CONFIG.Server.Host, 
CONFIG.Server.Port)
-               LogInfo("main", "serving at "+server.Addr)
-               if err = server.ListenAndServe(); err != nil {
-                       panic(err.Error())
-               }
+               go func() {
+                       server.Addr = fmt.Sprintf("%s:%d", CONFIG.Server.Host, 
CONFIG.Server.Port)
+                       LogInfo("main", "serving at "+server.Addr)
+                       if err = server.ListenAndServe(); err != nil {
+                               routerErrs <- err
+                       }
+               }()
        }
 
-       // TODO : do proper
-
        // since listening for incoming request, attesting payments and
        // retrying payments are three separated processes who can fail
        // we must take care of this here. The main process is used to
@@ -129,17 +134,28 @@ func main() {
        // or retry process fail, they will be restarted and the error is
        // written to the log. If some setup tasks are failing, the program
        // panics.
-       // for {
-       //      select {
-       //      case attestationError := <-attestorErrs:
-       //              LogError("main", attestationError)
-       //      case <-attestorCtx.Done():
-       //              // The attestation process died for some reason. let's 
restart it.
-       //              attestorCtx, attestorCancel = 
context.WithCancel(context.Background())
-       //              defer attestorCancel() // does this the right thing?
-       //              RunAttestor(attestorCtx, attestorErrs)
-       //      }
-       // }
+       for {
+               select {
+               case routerError := <-routerErrs:
+                       LogError("main", routerError)
+                       attestorCancel()
+                       retryCancel()
+                       panic(routerError)
+               case attestationError := <-attestorErrs:
+                       LogError("main from attestor", attestationError)
+               case <-attestorCtx.Done():
+                       // The attestation process died for some reason. let's 
restart it.
+                       attestorCancel() // first run old cancellation function
+                       attestorCtx, attestorCancel = 
context.WithCancel(context.Background())
+                       RunAttestor(attestorCtx, attestorErrs)
+               case retryError := <-retryErrs:
+                       LogError("main from retrier", retryError)
+               case <-retryCtx.Done():
+                       retryCancel() // first run old cancellation function
+                       retryCtx, retryCancel = 
context.WithCancel(context.Background())
+                       RunRetrier(retryCtx, retryErrs)
+               }
+       }
 }
 
 func setupDatabase(cfg *C2ECDatabseConfig) (C2ECDatabase, error) {
diff --git a/c2ec/postgres.go b/c2ec/postgres.go
index 4e8bfd2..a7264bf 100644
--- a/c2ec/postgres.go
+++ b/c2ec/postgres.go
@@ -42,17 +42,35 @@ const PS_FINALISE_PAYMENT = "UPDATE " + 
WITHDRAWAL_TABLE_NAME + " SET (" +
        " = ($1, $2)" +
        " WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$3"
 
+const PS_SET_LAST_RETRY = "UPDATE " + WITHDRAWAL_TABLE_NAME + " SET (" +
+       WITHDRAWAL_FIELD_NAME_LAST_RETRY + ")" +
+       " = ($1)" +
+       " WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$2"
+
+const PS_SET_RETRY_COUNTER = "UPDATE " + WITHDRAWAL_TABLE_NAME + " SET (" +
+       WITHDRAWAL_FIELD_NAME_RETRY_COUNTER + ")" +
+       " = ($1)" +
+       " WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$2"
+
 const PS_CONFIRMED_TRANSACTIONS = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
        " LIMIT $1" +
        " OFFSET $2" +
        " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " $3"
 
+const PS_GET_WITHDRAWAL_BY_ID = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
+       " WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$1"
+
 const PS_GET_WITHDRAWAL_BY_WOPID = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
        " WHERE " + WITHDRAWAL_FIELD_NAME_WOPID + "=$1"
 
 const PS_GET_WITHDRAWAL_BY_PTID = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
        " WHERE " + WITHDRAWAL_FIELD_NAME_TRANSACTION_ID + "=$1"
 
+const PS_GET_PROVIDER_BY_TERMINAL = "SELECT * FROM " + PROVIDER_TABLE_NAME +
+       " WHERE " + PROVIDER_FIELD_NAME_ID +
+       " = (SELECT " + TERMINAL_FIELD_NAME_PROVIDER_ID + " FROM " + 
TERMINAL_TABLE_NAME +
+       " WHERE " + TERMINAL_FIELD_NAME_ID + "=$1)"
+
 const PS_GET_PROVIDER_BY_NAME = "SELECT * FROM " + PROVIDER_TABLE_NAME +
        " WHERE " + PROVIDER_FIELD_NAME_NAME + "=$1"
 
@@ -156,6 +174,33 @@ func (db *C2ECPostgres) RegisterWithdrawal(
        return nil
 }
 
+func (db *C2ECPostgres) GetWithdrawalById(withdrawalId int) (*Withdrawal, 
error) {
+
+       if row, err := db.pool.Query(
+               db.ctx,
+               PS_GET_WITHDRAWAL_BY_ID,
+               withdrawalId,
+       ); err != nil {
+               LogError("postgres", err)
+               if row != nil {
+                       row.Close()
+               }
+               return nil, err
+       } else {
+
+               defer row.Close()
+
+               withdrawal, err := pgx.CollectExactlyOneRow(row, 
pgx.RowToAddrOfStructByName[Withdrawal])
+               if err != nil {
+                       LogError("postgres", err)
+                       return nil, err
+               }
+
+               LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_ID)
+               return withdrawal, nil
+       }
+}
+
 func (db *C2ECPostgres) GetWithdrawalByWopid(wopid string) (*Withdrawal, 
error) {
 
        wopidBytes, err := base64.StdEncoding.DecodeString(string(wopid))
@@ -177,17 +222,14 @@ func (db *C2ECPostgres) GetWithdrawalByWopid(wopid 
string) (*Withdrawal, error)
 
                defer row.Close()
 
-               withdrawals, err := pgx.CollectRows(row, 
pgx.RowToAddrOfStructByName[Withdrawal])
+               withdrawal, err := pgx.CollectExactlyOneRow(row, 
pgx.RowToAddrOfStructByName[Withdrawal])
                if err != nil {
                        LogError("postgres", err)
                        return nil, err
                }
 
-               if len(withdrawals) < 1 {
-                       return nil, nil
-               }
                LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_WOPID)
-               return withdrawals[0], nil
+               return withdrawal, nil
        }
 }
 
@@ -288,7 +330,7 @@ func (db *C2ECPostgres) FinaliseWithdrawal(
                return errors.New("can only finalise payment when new status is 
either confirmed or aborted")
        }
 
-       res, err := db.pool.Query(
+       _, err := db.pool.Exec(
                db.ctx,
                PS_FINALISE_PAYMENT,
                confirmOrAbort,
@@ -299,11 +341,42 @@ func (db *C2ECPostgres) FinaliseWithdrawal(
                LogError("postgres", err)
                return err
        }
-       res.Close()
        LogInfo("postgres", "query="+PS_FINALISE_PAYMENT)
        return nil
 }
 
+func (db *C2ECPostgres) SetLastRetry(withdrawalId int, lastRetryTsUnix int64) 
error {
+
+       _, err := db.pool.Exec(
+               db.ctx,
+               PS_SET_LAST_RETRY,
+               lastRetryTsUnix,
+               withdrawalId,
+       )
+       if err != nil {
+               LogError("postgres", err)
+               return err
+       }
+       LogInfo("postgres", "query="+PS_SET_LAST_RETRY)
+       return nil
+}
+
+func (db *C2ECPostgres) SetRetryCounter(withdrawalId int, retryCounter int) 
error {
+
+       _, err := db.pool.Exec(
+               db.ctx,
+               PS_SET_RETRY_COUNTER,
+               retryCounter,
+               withdrawalId,
+       )
+       if err != nil {
+               LogError("postgres", err)
+               return err
+       }
+       LogInfo("postgres", "query="+PS_SET_RETRY_COUNTER)
+       return nil
+}
+
 // The query at the postgres database works as specified by the
 // wire gateway api.
 func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) 
([]*Withdrawal, error) {
@@ -366,6 +439,33 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, 
delta int) ([]*Withdr
        }
 }
 
+func (db *C2ECPostgres) GetProviderByTerminal(terminalId int) (*Provider, 
error) {
+
+       if row, err := db.pool.Query(
+               db.ctx,
+               PS_GET_PROVIDER_BY_TERMINAL,
+               terminalId,
+       ); err != nil {
+               LogError("postgres", err)
+               if row != nil {
+                       row.Close()
+               }
+               return nil, err
+       } else {
+
+               defer row.Close()
+
+               provider, err := pgx.CollectExactlyOneRow(row, 
pgx.RowToAddrOfStructByName[Provider])
+               if err != nil {
+                       LogError("postgres", err)
+                       return nil, err
+               }
+
+               LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_TERMINAL)
+               return provider, nil
+       }
+}
+
 func (db *C2ECPostgres) GetTerminalProviderByName(name string) (*Provider, 
error) {
 
        if row, err := db.pool.Query(
@@ -382,18 +482,14 @@ func (db *C2ECPostgres) GetTerminalProviderByName(name 
string) (*Provider, error
 
                defer row.Close()
 
-               provider, err := pgx.CollectRows(row, 
pgx.RowToAddrOfStructByName[Provider])
+               provider, err := pgx.CollectExactlyOneRow(row, 
pgx.RowToAddrOfStructByName[Provider])
                if err != nil {
                        LogError("postgres", err)
                        return nil, err
                }
 
-               if len(provider) < 1 {
-                       return nil, nil
-               }
-
                LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_NAME)
-               return provider[0], nil
+               return provider, nil
        }
 }
 
@@ -413,18 +509,14 @@ func (db *C2ECPostgres) 
GetTerminalProviderByPaytoTargetType(paytoTargetType str
 
                defer row.Close()
 
-               provider, err := pgx.CollectRows(row, 
pgx.RowToAddrOfStructByName[Provider])
+               provider, err := pgx.CollectExactlyOneRow(row, 
pgx.RowToAddrOfStructByName[Provider])
                if err != nil {
                        LogError("postgres", err)
                        return nil, err
                }
 
-               if len(provider) < 1 {
-                       return nil, nil
-               }
-
                LogInfo("postgres", 
"query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE)
-               return provider[0], nil
+               return provider, nil
        }
 }
 
@@ -444,18 +536,14 @@ func (db *C2ECPostgres) GetTerminalById(id int) 
(*Terminal, error) {
 
                defer row.Close()
 
-               terminals, err := pgx.CollectRows(row, 
pgx.RowToAddrOfStructByName[Terminal])
+               terminal, err := pgx.CollectExactlyOneRow(row, 
pgx.RowToAddrOfStructByName[Terminal])
                if err != nil {
                        LogError("postgres", err)
                        return nil, err
                }
 
-               if len(terminals) < 1 {
-                       return nil, nil
-               }
-
                LogInfo("postgres", "query="+PS_GET_TERMINAL_BY_ID)
-               return terminals[0], nil
+               return terminal, nil
        }
 }
 
@@ -475,17 +563,14 @@ func (db *C2ECPostgres) GetTransferById(requestUid 
HashCode) (*Transfer, error)
 
                defer row.Close()
 
-               transfers, err := pgx.CollectRows(row, 
pgx.RowToAddrOfStructByName[Transfer])
+               transfer, err := pgx.CollectExactlyOneRow(row, 
pgx.RowToAddrOfStructByName[Transfer])
                if err != nil {
                        LogError("postgres", err)
                        return nil, err
                }
 
-               if len(transfers) < 1 {
-                       return nil, nil
-               }
                LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_ID)
-               return transfers[0], nil
+               return transfer, nil
        }
 
 }
diff --git a/c2ec/retrier.go b/c2ec/retrier.go
new file mode 100644
index 0000000..8124a29
--- /dev/null
+++ b/c2ec/retrier.go
@@ -0,0 +1,103 @@
+package main
+
+import (
+       "context"
+       "errors"
+       "strconv"
+       "time"
+)
+
+const RETRY_CHANNEL_BUFFER_SIZE = 10
+const PS_RETRY_CHANNEL = "retry"
+
+func RunRetrier(ctx context.Context, errs chan error) {
+
+       for _, p := range CONFIG.Providers {
+               if PROVIDER_CLIENTS[p.Name] == nil {
+                       err := errors.New("no provider client initialized for 
provider " + p.Name)
+                       LogError("retrier", err)
+                       errs <- err
+               }
+       }
+
+       notifications := make(chan *Notification, RETRY_CHANNEL_BUFFER_SIZE)
+       go retryCallback(ctx, notifications, errs)
+}
+
+func retryCallback(ctx context.Context, notifications chan *Notification, errs 
chan error) {
+
+       listener, err := NewListener(PS_PAYMENT_NOTIFICATION_CHANNEL, 
notifications)
+       if err != nil {
+               LogError("retrier", err)
+               errs <- errors.New("retrier needs to be setup first")
+       }
+
+       go func() {
+               LogInfo("retrier", "retrier starts listening for retry 
notifications at the db")
+               err := listener.Listen(ctx)
+               if err != nil {
+                       LogError("retry-listener", err)
+                       errs <- err
+               }
+               close(notifications)
+               close(errs)
+       }()
+
+       // Listen is started async. We can therefore block here and must
+       // not run the retrieval logic in own goroutine
+       for {
+               select {
+               case notification := <-notifications:
+                       // the dispatching and setup of the retry process can
+                       // be kicked off asynchronically, thus not blocking
+                       // further incoming notifications.
+                       go dispatchRetry(notification, errs)
+               case <-ctx.Done():
+                       errs <- ctx.Err()
+                       return
+               }
+       }
+}
+
+func dispatchRetry(n *Notification, errs chan error) {
+
+       withdrawalId, err := strconv.Atoi(n.Payload)
+       if err != nil {
+               LogError("retrier", err)
+               errs <- err
+               return
+       }
+
+       withdrawal, err := DB.GetWithdrawalById(withdrawalId)
+       if err != nil {
+               LogError("retrier", err)
+               errs <- err
+               return
+       }
+
+       provider, err := DB.GetProviderByTerminal(int(withdrawal.TerminalId))
+       if err != nil {
+               LogError("retrier", err)
+               errs <- err
+               return
+       }
+
+       err = DB.SetRetryCounter(withdrawalId, int(withdrawal.RetryCounter)+1)
+       if err != nil {
+               LogError("retrier", err)
+               errs <- err
+               return
+       }
+
+       time.Sleep(time.Duration(CONFIG.Server.RetryDelayMs) * time.Millisecond)
+
+       client := PROVIDER_CLIENTS[provider.Name]
+       transaction, err := 
client.GetTransaction(*withdrawal.ProviderTransactionId)
+       if err != nil {
+               LogError("retrier", err)
+               errs <- err
+               return
+       }
+
+       finaliseOrSetRetry(transaction, withdrawalId, errs)
+}
diff --git a/c2ec/wallee-client.go b/c2ec/wallee-client.go
index 87b48cc..2f304d0 100644
--- a/c2ec/wallee-client.go
+++ b/c2ec/wallee-client.go
@@ -7,6 +7,7 @@ import (
        "encoding/base64"
        "errors"
        "fmt"
+       "io"
        "strconv"
        "strings"
        "time"
@@ -42,6 +43,30 @@ func (wt *WalleeTransaction) AllowWithdrawal() bool {
        return strings.EqualFold(string(wt.State), string(StateFulfill))
 }
 
+func (wt *WalleeTransaction) AbortWithdrawal() bool {
+       // guaranteed abortion is given when the state of
+       // the transaction is a final state but not the
+       // success case (which is FULFILL)
+       return strings.EqualFold(string(wt.State), string(StateFailed)) ||
+               strings.EqualFold(string(wt.State), string(StateVoided)) ||
+               strings.EqualFold(string(wt.State), string(StateDecline))
+}
+
+func (wt *WalleeTransaction) Bytes() []byte {
+
+       reader, err := NewJsonCodec[WalleeTransaction]().Encode(wt)
+       if err != nil {
+               LogError("wallee-client", err)
+               return make([]byte, 0)
+       }
+       bytes, err := io.ReadAll(reader)
+       if err != nil {
+               LogError("wallee-client", err)
+               return make([]byte, 0)
+       }
+       return bytes
+}
+
 func (w *WalleeClient) SetupClient(p *Provider) error {
 
        cfg, err := ConfigForProvider(p.Name)
diff --git a/simulation/c2ec-simulation b/simulation/c2ec-simulation
index df914de..44f4bbb 100755
Binary files a/simulation/c2ec-simulation and b/simulation/c2ec-simulation 
differ
diff --git a/simulation/main.go b/simulation/main.go
index 2118cb0..0a0b8b6 100644
--- a/simulation/main.go
+++ b/simulation/main.go
@@ -5,7 +5,7 @@ import (
        "os"
 )
 
-const DISABLE_DELAYS = true
+const DISABLE_DELAYS = false
 
 const C2EC_BASE_URL = "http://localhost:8082";
 const C2EC_BANK_BASE_URL = C2EC_BASE_URL + "/c2ec"
diff --git a/wallee-c2ec/.idea/deploymentTargetDropDown.xml 
b/wallee-c2ec/.idea/deploymentTargetDropDown.xml
index 0c0c338..24adc84 100644
--- a/wallee-c2ec/.idea/deploymentTargetDropDown.xml
+++ b/wallee-c2ec/.idea/deploymentTargetDropDown.xml
@@ -3,7 +3,20 @@
   <component name="deploymentTargetDropDown">
     <value>
       <entry key="app">
-        <State />
+        <State>
+          <runningDeviceTargetSelectedWithDropDown>
+            <Target>
+              <type value="RUNNING_DEVICE_TARGET" />
+              <deviceKey>
+                <Key>
+                  <type value="VIRTUAL_DEVICE_PATH" />
+                  <value 
value="$USER_HOME$/.android/avd/Pixel_3a_API_34_extension_level_7_x86_64.avd" />
+                </Key>
+              </deviceKey>
+            </Target>
+          </runningDeviceTargetSelectedWithDropDown>
+          <timeTargetWasSelectedWithDropDown 
value="2024-04-14T20:44:26.962703844Z" />
+        </State>
       </entry>
     </value>
   </component>
diff --git a/wallee-c2ec/.idea/misc.xml b/wallee-c2ec/.idea/misc.xml
index 0ad17cb..8978d23 100644
--- a/wallee-c2ec/.idea/misc.xml
+++ b/wallee-c2ec/.idea/misc.xml
@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="ExternalStorageConfigurationManager" enabled="true" />
   <component name="ProjectRootManager" version="2" languageLevel="JDK_17" 
default="true" project-jdk-name="jbr-17" project-jdk-type="JavaSDK">
diff --git a/wallee-c2ec/.idea/vcs.xml b/wallee-c2ec/.idea/vcs.xml
new file mode 100644
index 0000000..6c0b863
--- /dev/null
+++ b/wallee-c2ec/.idea/vcs.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$/.." vcs="Git" />
+  </component>
+</project>
\ No newline at end of file
diff --git 
a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/ExchangeActivity.kt 
b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/ExchangeActivity.kt
index 6aebf8b..134115e 100644
--- 
a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/ExchangeActivity.kt
+++ 
b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/ExchangeActivity.kt
@@ -4,12 +4,15 @@ import android.content.Intent
 import android.os.Bundle
 import androidx.activity.ComponentActivity
 import androidx.activity.compose.setContent
+import androidx.compose.foundation.layout.Column
 import androidx.compose.foundation.layout.fillMaxSize
 import androidx.compose.material3.Button
 import androidx.compose.material3.MaterialTheme
 import androidx.compose.material3.Surface
 import androidx.compose.material3.Text
+import androidx.compose.ui.Alignment
 import androidx.compose.ui.Modifier
+import androidx.compose.ui.platform.LocalContext
 import ch.bfh.habej2.wallee_c2ec.config.EXCHANGES
 import ch.bfh.habej2.wallee_c2ec.ui.theme.Walleec2ecTheme
 
@@ -24,19 +27,26 @@ class ExchangeActivity : ComponentActivity() {
                     modifier = Modifier.fillMaxSize(),
                     color = MaterialTheme.colorScheme.background
                 ) {
-                    Text(text = "Choose the exchange to withdraw from")
 
-                    // TODO let user select exchanges from config here
-                    //  config must contain display name, credentials 
(generated by cli)
-                    //  and the base url of the c2ec bank-integration api
-                    EXCHANGES.forEach { Text(text = it.displayName) }
+                    Column(
+                        horizontalAlignment = Alignment.CenterHorizontally
+                    ) {
 
-                    Button(onClick = { Intent(this.parent, 
WithdrawalCreationActivity::class.java) }) {
-                        title = "withdraw"
-                    }
+                        Text(text = "Choose the exchange to withdraw from")
+
+                        // TODO let user select exchanges from config here
+                        //  config must contain display name, credentials 
(generated by cli)
+                        //  and the base url of the c2ec bank-integration api
+                        EXCHANGES.forEach { Text(text = it.displayName) }
+
+                        val ctx = LocalContext.current
+                        Button(onClick = { 
ctx.startActivity(Intent(this@ExchangeActivity.parent, 
WithdrawalCreationActivity::class.java)) }) {
+                            Text(text = "withdraw")
+                        }
 
-                    Button(onClick = { finish() }) {
-                        title = "back"
+                        Button(onClick = { finish() }) {
+                            Text(text = "back")
+                        }
                     }
                 }
             }
diff --git 
a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/MainActivity.kt 
b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/MainActivity.kt
index f91c419..413a579 100644
--- a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/MainActivity.kt
+++ b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/MainActivity.kt
@@ -4,21 +4,32 @@ import android.content.Intent
 import android.os.Bundle
 import androidx.activity.ComponentActivity
 import androidx.activity.compose.setContent
+import androidx.compose.foundation.layout.Column
+import androidx.compose.foundation.layout.Row
 import androidx.compose.foundation.layout.fillMaxSize
 import androidx.compose.material3.Button
 import androidx.compose.material3.MaterialTheme
 import androidx.compose.material3.Surface
 import androidx.compose.material3.Text
 import androidx.compose.runtime.Composable
+import androidx.compose.ui.Alignment
 import androidx.compose.ui.Modifier
+import androidx.compose.ui.platform.LocalContext
 import androidx.compose.ui.tooling.preview.Preview
 import ch.bfh.habej2.wallee_c2ec.config.loadConfiguredExchanges
 import ch.bfh.habej2.wallee_c2ec.ui.theme.Walleec2ecTheme
 
 class MainActivity : ComponentActivity() {
+
+    init {
+
+        // TODO this crashes somehow
+        //loadConfiguredExchanges()
+
+    }
+
     override fun onCreate(savedInstanceState: Bundle?) {
         super.onCreate(savedInstanceState)
-        loadConfiguredExchanges()
         setContent {
             Walleec2ecTheme {
                 // A surface container using the 'background' color from the 
theme
@@ -26,12 +37,17 @@ class MainActivity : ComponentActivity() {
                     modifier = Modifier.fillMaxSize(),
                     color = MaterialTheme.colorScheme.background
                 ) {
-                    Text(text = "Withdraw Taler using Wallee")
-                    Button(onClick = { Intent(this, 
WithdrawalCreationActivity::class.java) }) {
-                        title = "Start Withdrawal"
-                    }
-                    Button(onClick = { Intent(this, 
ExchangeActivity::class.java) }) {
-                        title = "Choose Exchange"
+                    val ctx = LocalContext.current
+                    Column(
+                        horizontalAlignment = Alignment.CenterHorizontally
+                    ) {
+                        Text(text = "Withdraw Taler using Wallee")
+                        Button(onClick = { 
ctx.startActivity(Intent(this@MainActivity, 
WithdrawalCreationActivity::class.java)) }) {
+                            Text(text = "Start Withdrawal")
+                        }
+                        Button(onClick = { 
ctx.startActivity(Intent(this@MainActivity, ExchangeActivity::class.java)) }) {
+                            Text(text = "Choose Exchange")
+                        }
                     }
                 }
             }
diff --git 
a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/PaymentActivity.kt 
b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/PaymentActivity.kt
index 06f1908..b3bbc5e 100644
--- a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/PaymentActivity.kt
+++ b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/PaymentActivity.kt
@@ -3,9 +3,13 @@ package ch.bfh.habej2.wallee_c2ec
 import android.os.Bundle
 import androidx.activity.ComponentActivity
 import androidx.activity.compose.setContent
+import androidx.compose.foundation.layout.Column
 import androidx.compose.foundation.layout.fillMaxSize
+import androidx.compose.material3.Button
 import androidx.compose.material3.MaterialTheme
 import androidx.compose.material3.Surface
+import androidx.compose.material3.Text
+import androidx.compose.ui.Alignment
 import androidx.compose.ui.Modifier
 import ch.bfh.habej2.wallee_c2ec.ui.theme.Walleec2ecTheme
 
@@ -20,6 +24,17 @@ class PaymentActivity : ComponentActivity() {
                     color = MaterialTheme.colorScheme.background
                 ) {
                     // TODO use wallee sdk here for payment.
+                    Column(
+                        horizontalAlignment = Alignment.CenterHorizontally
+                    ) {
+
+                        Text(text = "present card, trigger payment")
+                        
+                        Button(onClick = { finish() }) {
+                            // TODO: abort payment here
+                            Text(text = "back")
+                        }
+                    }
                 }
             }
         }
diff --git 
a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/WithdrawalCreationActivity.kt
 
b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/WithdrawalCreationActivity.kt
index f789907..e07e400 100644
--- 
a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/WithdrawalCreationActivity.kt
+++ 
b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/WithdrawalCreationActivity.kt
@@ -3,27 +3,46 @@ package ch.bfh.habej2.wallee_c2ec
 import android.os.Bundle
 import androidx.activity.ComponentActivity
 import androidx.activity.compose.setContent
+import androidx.compose.foundation.layout.Column
 import androidx.compose.foundation.layout.fillMaxSize
 import androidx.compose.material3.Button
 import androidx.compose.material3.MaterialTheme
 import androidx.compose.material3.Surface
 import androidx.compose.material3.Text
+import androidx.compose.runtime.LaunchedEffect
+import androidx.compose.runtime.rememberCoroutineScope
+import androidx.compose.ui.Alignment
 import androidx.compose.ui.Modifier
+import ch.bfh.habej2.wallee_c2ec.client.c2ec.C2ECClient
 import ch.bfh.habej2.wallee_c2ec.config.TERMINAL_CONFIG
 import ch.bfh.habej2.wallee_c2ec.ui.theme.Walleec2ecTheme
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.launch
 import java.security.SecureRandom
 import java.util.Base64
 
 class WithdrawalCreationActivity : ComponentActivity() {
+
     override fun onCreate(savedInstanceState: Bundle?) {
         super.onCreate(savedInstanceState)
 
         val encodedWopid = encodeWopid(createWopid())
 
-        // TODO Start long polling here somehow in the background.
-        //  when response arrives, send intent to start PaymentActivity for 
the withdrawal
+        val client = C2ECClient()
 
         setContent {
+
+            LaunchedEffect(key1 = "") {
+                this.launch {
+
+                    val withdrawal = 
client.retrieveWithdrawalStatus(encodedWopid, 30000)
+
+                    // TODO launch payment activity when selected state is 
returned,
+                    //  when response arrives, send intent to start 
PaymentActivity for the withdrawal
+                    //  otherwise show error and leave.
+                }
+            }
+
             Walleec2ecTheme {
                 // A surface container using the 'background' color from the 
theme
                 Surface(
@@ -31,32 +50,31 @@ class WithdrawalCreationActivity : ComponentActivity() {
                     color = MaterialTheme.colorScheme.background
                 ) {
 
-                    Text(text = "Generated Random WOPID=$encodedWopid")
+                    Column(
+                        horizontalAlignment = Alignment.CenterHorizontally
+                    ) {
+
+                        Text(text = "Generated Random WOPID=$encodedWopid")
 
-                    Text(text = "QR-Code content: 
${formatTalerUri(encodedWopid)}")
+                        Text(text = "QR-Code content: 
${formatTalerUri(encodedWopid)}")
 
-                    Button(onClick = { finish() }) {
-                        // TODO: abort payment here
-                        title = "back"
+                        Button(onClick = { finish() }) {
+                            // TODO: abort payment here
+                           Text(text = "back")
+                        }
                     }
                 }
             }
         }
     }
 
-    fun formatTalerUri(encodedWopid: String): String {
-
-        return 
"taler://withdraw/$encodedWopid?terminal_id=${TERMINAL_CONFIG.terminalId}"
-    }
-
-    fun encodeWopid(wopid: ByteArray): String {
+    private fun formatTalerUri(encodedWopid: String) =
+        
"taler://withdraw/$encodedWopid?terminal_id=${TERMINAL_CONFIG.terminalId}"
 
-        return Base64.getUrlEncoder()
-            .encode(wopid)
-            .toString()
-    }
+    private fun encodeWopid(wopid: ByteArray) =
+        String(Base64.getUrlEncoder().encode(wopid))
 
-    fun createWopid(): ByteArray {
+    private fun createWopid(): ByteArray {
 
         val wopid = ByteArray(32)
         val rand = SecureRandom()
diff --git 
a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/client/c2ec/C2ECClient.kt
 
b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/client/c2ec/C2ECClient.kt
index 5868899..71cbc16 100644
--- 
a/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/client/c2ec/C2ECClient.kt
+++ 
b/wallee-c2ec/app/src/main/java/ch/bfh/habej2/wallee_c2ec/client/c2ec/C2ECClient.kt
@@ -1,6 +1,9 @@
 package ch.bfh.habej2.wallee_c2ec.client.c2ec
 
+import ch.bfh.habej2.wallee_c2ec.config.CURRENT_EXCHANGE
+import okhttp3.Interceptor
 import okhttp3.OkHttpClient
+import okhttp3.Response
 
 class C2ECClient {
 
@@ -11,7 +14,9 @@ class C2ECClient {
     }
 
     init {
-        var client = OkHttpClient.Builder().build();
+        var client = OkHttpClient.Builder()
+            .addInterceptor(C2ECBasicAuthInterceptor())
+            .build();
     }
 
     fun retrieveWithdrawalStatus(wopid: String, longPollMs: Int): 
C2ECWithdrawalOperationStatus {
@@ -23,4 +28,21 @@ class C2ECClient {
     fun sendPaymentNotification(payment: C2ECPaymentNotification) {
         println("sending payment notification...")
     }
+
+    private class C2ECBasicAuthInterceptor : Interceptor {
+
+        override fun intercept(chain: Interceptor.Chain): Response {
+
+            val base64EncodedCredentials = java.util.Base64
+                .getUrlEncoder()
+                
.encode("${CURRENT_EXCHANGE!!.terminalId}:${CURRENT_EXCHANGE!!.accessToken}".toByteArray())
+                .toString()
+
+            return chain.proceed(
+                chain.request().newBuilder()
+                    .header("Authorization", base64EncodedCredentials)
+                    .build()
+            )
+        }
+    }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]