gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[libeufin] branch master updated: Improve the 409 Conflict detection, a


From: gnunet
Subject: [libeufin] branch master updated: Improve the 409 Conflict detection, address DB concurrency.
Date: Wed, 22 Sep 2021 14:41:33 +0200

This is an automated email from the git hooks/post-receive script.

ms pushed a commit to branch master
in repository libeufin.

The following commit(s) were added to refs/heads/master by this push:
     new d446df5  Improve the 409 Conflict detection, address DB concurrency.
d446df5 is described below

commit d446df589a1ce8f2364c933a839c437783700e09
Author: ms <ms@taler.net>
AuthorDate: Wed Sep 22 14:32:51 2021 +0200

    Improve the 409 Conflict detection, address DB concurrency.
    
    The conflict now happens only if under the same withdraw
    operation ID the wallet tries to select two different exchanges
    or reserve public keys.
    
    As of DB concurrency, there is now one thread (named "DB")
    that should run all the database operations, in order to avoid
    conflicts on the disk.  At this moment, and mostly to see where
    the current implementation fails with regard to concurrent DB
    access, not all the database operations were migrated into such thread.
---
 .../src/main/kotlin/tech/libeufin/sandbox/DB.kt    |  4 +++
 .../src/main/kotlin/tech/libeufin/sandbox/Main.kt  | 35 +++++++++++++++++-----
 .../kotlin/tech/libeufin/sandbox/bankAccount.kt    | 10 +++++--
 3 files changed, 39 insertions(+), 10 deletions(-)

diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
index f1fbe56..ef6d370 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
@@ -422,6 +422,8 @@ object TalerWithdrawalsTable : LongIdTable() {
      * the payment arrived at the exchange's bank yet.
      */
     val transferDone = bool("transferDone").default(false)
+    val reservePub = text("reservePub").nullable()
+    val selectedExchangePayto = text("selectedExchangePayto").nullable()
 
 }
 class TalerWithdrawalEntity(id: EntityID<Long>) : LongEntity(id) {
@@ -429,6 +431,8 @@ class TalerWithdrawalEntity(id: EntityID<Long>) : 
LongEntity(id) {
     var wopid by TalerWithdrawalsTable.wopid
     var selectionDone by TalerWithdrawalsTable.selectionDone
     var transferDone by TalerWithdrawalsTable.transferDone
+    var reservePub by TalerWithdrawalsTable.reservePub
+    var selectedExchangePayto by TalerWithdrawalsTable.selectedExchangePayto
 }
 
 object BankAccountReportsTable : IntIdTable() {
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
index bc82621..ad90770 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
@@ -70,6 +70,8 @@ import io.ktor.http.*
 import io.ktor.http.content.*
 import io.ktor.request.*
 import io.ktor.util.date.*
+import kotlinx.coroutines.newSingleThreadContext
+import 
org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
 import tech.libeufin.util.*
 import tech.libeufin.util.ebics_h004.EbicsResponse
 import tech.libeufin.util.ebics_h004.EbicsTypes
@@ -364,6 +366,8 @@ suspend inline fun <reified T : Any> 
ApplicationCall.receiveJson(): T {
     }
 }
 
+val singleThreadContext = newSingleThreadContext("DB")
+
 fun serverMain(dbName: String, port: Int) {
     execThrowableOrTerminate { dbCreateTables(dbName) }
     val myLogger = logger
@@ -983,6 +987,7 @@ fun serverMain(dbName: String, port: Int) {
                     // At this point, the three actors exist and a new 
withdraw operation can be created.
                     TalerWithdrawalEntity.new {
                         // wopid is autogenerated, and momentarily the only 
column
+
                     }
                 }
                 /**
@@ -1042,30 +1047,46 @@ fun serverMain(dbName: String, port: Int) {
                 val wopid: String = ensureNonNull(call.parameters["wopid"])
                 val body = call.receiveJson<TalerWithdrawalConfirmation>()
 
-                transaction {
+                newSuspendedTransaction(context = singleThreadContext) {
                     var wo = TalerWithdrawalEntity.find {
                         TalerWithdrawalsTable.wopid eq UUID.fromString(wopid)
                     }.firstOrNull() ?: throw SandboxError(
                         HttpStatusCode.NotFound, "Withdrawal operation $wopid 
not found."
                     )
-                    if (wo.transferDone) {
-                        throw SandboxError(
+                    if (wo.selectionDone) {
+                        if (wo.transferDone) {
+                            logger.info("Wallet performs again this operation 
that was paid out earlier: idempotent")
+                            return@newSuspendedTransaction
+                        }
+                        // reservePub+exchange selected but not payed: check 
consistency
+                        if (body.reserve_pub != wo.reservePub) throw 
SandboxError(
                             HttpStatusCode.Conflict,
-                            "This withdraw operation was already funded.  
Aborting"
+                            "Selecting a different reserve from the one 
already selected"
+                        )
+                        if (body.selected_exchange != 
wo.selectedExchangePayto) throw SandboxError(
+                            HttpStatusCode.Conflict,
+                            "Selecting a different exchange from the one 
already selected"
                         )
                     }
-                    if (wo.selectionDone) {
-                        logger.warn("This withdraw operation was already 
confirmed, but not funded.  Trying again")
-                    }
+                    // here only if (1) no selection done or (2) _only_ 
selection done:
+                    // both ways no transfer must have happened.
+                    SandboxAssert(!wo.transferDone, "Sandbox allowed paid but 
unselected reserve")
+
                     wireTransfer(
                         "sandbox-account-customer",
                         "sandbox-account-exchange",
                         "$currencyEnv:5",
                         body.reserve_pub
                     )
+                    wo.reservePub = body.reserve_pub
+                    wo.selectedExchangePayto = body.selected_exchange
                     wo.selectionDone = true
                     wo.transferDone = true
                 }
+                /**
+                 * NOTE: is this always guaranteed to run AFTER the suspended
+                 * transaction block above?
+                 */
                 call.respond(object {
                     val transfer_done = true
                 })
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt 
b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
index 0a43245..3d5236a 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
@@ -1,14 +1,14 @@
 package tech.libeufin.sandbox
 
 import io.ktor.http.*
-import org.apache.http.HttpStatus
 import org.jetbrains.exposed.sql.and
+import 
org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
+import 
org.jetbrains.exposed.sql.transactions.experimental.suspendedTransactionAsync
 import org.jetbrains.exposed.sql.transactions.transaction
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 import tech.libeufin.util.*
 import java.math.BigDecimal
-import kotlin.system.exitProcess
 
 private val logger: Logger = LoggerFactory.getLogger("tech.libeufin.sandbox")
 
@@ -117,12 +117,16 @@ fun historyForAccount(bankAccount: BankAccountEntity): 
MutableList<RawPayment> {
     return history
 }
 
+/**
+ * 
https://github.com/JetBrains/Exposed/wiki/Transactions#working-with-coroutines
+ * 
https://medium.com/androiddevelopers/threading-models-in-coroutines-and-android-sqlite-api-6cab11f7eb90
+ */
 fun wireTransfer(
     debitAccount: String, creditAccount: String,
     amount: String, subjectArg: String
 ) {
-    // check accounts exist
     transaction {
+        // check accounts exist
         val credit = BankAccountEntity.find {
             BankAccountsTable.label eq creditAccount
         }.firstOrNull() ?: run {

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]