gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-depolymerization] branch master updated (ddd627b -> 8880a7d)


From: gnunet
Subject: [taler-depolymerization] branch master updated (ddd627b -> 8880a7d)
Date: Wed, 02 Mar 2022 18:51:59 +0100

This is an automated email from the git hooks/post-receive script.

antoine pushed a change to branch master
in repository depolymerization.

    from ddd627b  eth-wire: catch rpc EOF
     new e778b15  eth-wire: bump stuck transactions
     new 8880a7d  Repair test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 Cargo.lock                                        |  16 +--
 btc-wire/src/lib.rs                               |  11 +-
 btc-wire/src/loops/analysis.rs                    |   2 +-
 btc-wire/src/loops/worker.rs                      |   7 +-
 btc-wire/src/rpc.rs                               |   2 +-
 common/src/lib.rs                                 |   6 +-
 db/eth.sql                                        |   3 +-
 eth-wire/src/bin/eth-wire-utils.rs                |  52 ++++++++-
 eth-wire/src/lib.rs                               |  38 ++++---
 eth-wire/src/loops/analysis.rs                    |   2 +-
 eth-wire/src/loops/worker.rs                      | 131 +++++++++++++++-------
 eth-wire/src/rpc.rs                               |  24 ++--
 instrumentation/src/eth.rs                        |  22 ++--
 makefile                                          |   1 +
 test/btc/conflict.sh                              |   2 +-
 test/common.sh                                    |  16 +--
 test/conf/{taler_eth.conf => taler_eth_bump.conf} |   3 +-
 test/eth/bumpfee.sh                               | 110 ++++++++++++++++++
 test/eth/reorg.sh                                 |   1 +
 19 files changed, 334 insertions(+), 115 deletions(-)
 copy test/conf/{taler_eth.conf => taler_eth_bump.conf} (85%)
 create mode 100644 test/eth/bumpfee.sh

diff --git a/Cargo.lock b/Cargo.lock
index 0376ba5..4ddd828 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -124,7 +124,7 @@ dependencies = [
  "base64",
  "bech32",
  "bitcoin",
- "clap 3.1.2",
+ "clap 3.1.3",
  "common",
  "criterion",
  "hex",
@@ -206,9 +206,9 @@ dependencies = [
 
 [[package]]
 name = "clap"
-version = "3.1.2"
+version = "3.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5177fac1ab67102d8989464efd043c6ff44191b1557ec1ddd489b4f7e1447e77"
+checksum = "86f8c0e2a6b902acc18214e24a6935cdaf8a8e34231913d4404dcaee659f65a1"
 dependencies = [
  "atty",
  "bitflags",
@@ -497,7 +497,7 @@ dependencies = [
 name = "eth-wire"
 version = "0.1.0"
 dependencies = [
- "clap 3.1.2",
+ "clap 3.1.3",
  "common",
  "ethereum-types",
  "hex",
@@ -890,7 +890,7 @@ version = "0.1.0"
 dependencies = [
  "bitcoin",
  "btc-wire",
- "clap 3.1.2",
+ "clap 3.1.3",
  "common",
  "eth-wire",
  "ethereum-types",
@@ -1408,9 +1408,9 @@ dependencies = [
 
 [[package]]
 name = "redox_syscall"
-version = "0.2.10"
+version = "0.2.11"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff"
+checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c"
 dependencies = [
  "bitflags",
 ]
@@ -2171,7 +2171,7 @@ name = "wire-gateway"
 version = "0.1.0"
 dependencies = [
  "bitcoin",
- "clap 3.1.2",
+ "clap 3.1.3",
  "common",
  "deadpool-postgres",
  "ethereum-types",
diff --git a/btc-wire/src/lib.rs b/btc-wire/src/lib.rs
index 0dd1b14..b7be536 100644
--- a/btc-wire/src/lib.rs
+++ b/btc-wire/src/lib.rs
@@ -13,8 +13,7 @@
   You should have received a copy of the GNU Affero General Public License 
along with
   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
 */
-use std::str::FromStr;
-use std::sync::atomic::AtomicU16;
+use std::{str::FromStr, sync::atomic::AtomicU32};
 
 use bitcoin::{hashes::hex::FromHex, Address, Amount, Network, Txid};
 use common::api_common::Amount as TalerAmount;
@@ -153,8 +152,8 @@ fn config_bounce_fee(config: &BtcConfig) -> Amount {
 }
 
 pub struct WireState {
-    pub confirmation: AtomicU16,
-    pub max_confirmation: u16,
+    pub confirmation: AtomicU32,
+    pub max_confirmation: u32,
     pub config: BtcConfig,
     pub btc_config: BitcoinConfig,
     pub bounce_fee: Amount,
@@ -170,9 +169,9 @@ impl WireState {
             .unwrap_or_else(default_data_dir);
         let btc_config =
             BitcoinConfig::load(&data_dir).expect("Failed to read bitcoin 
configuration file");
-        let init_confirmation = 
config.confirmation.unwrap_or(DEFAULT_CONFIRMATION);
+        let init_confirmation = 
config.confirmation.unwrap_or(DEFAULT_CONFIRMATION) as u32;
         Self {
-            confirmation: AtomicU16::new(init_confirmation),
+            confirmation: AtomicU32::new(init_confirmation),
             max_confirmation: init_confirmation * 2,
             bounce_fee: config_bounce_fee(&config),
             config,
diff --git a/btc-wire/src/loops/analysis.rs b/btc-wire/src/loops/analysis.rs
index f036a73..423e780 100644
--- a/btc-wire/src/loops/analysis.rs
+++ b/btc-wire/src/loops/analysis.rs
@@ -45,7 +45,7 @@ pub fn analysis(mut rpc: AutoRpcCommon, mut db: 
AutoReconnectDb, state: &WireSta
                         (t.status == ChainTipsStatus::ValidFork).then(|| 
t.length)
                     })
                     .max()
-                    .unwrap_or(0) as u16;
+                    .unwrap_or(0) as u32;
                 // The first time we see a fork that big
                 if fork > max_seen {
                     max_seen = fork;
diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs
index b868b41..0e6b303 100644
--- a/btc-wire/src/loops/worker.rs
+++ b/btc-wire/src/loops/worker.rs
@@ -378,7 +378,7 @@ fn sync_chain_withdraw(
     } else {
         // Get previous out tx
         let row = db.query_opt(
-            "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR UPDATE",
+            "SELECT id,status,txid FROM tx_out WHERE wtid=$1 FOR UPDATE",
             &[&wtid.as_ref()],
         )?;
         if let Some(row) = row {
@@ -445,6 +445,7 @@ fn sync_chain_withdraw(
             }
         }
 
+        // Check if stuck
         if let Some(delay) = state.config.bump_delay {
             if confirmations == 0 && full.replaced_by_txid.is_none() {
                 let now = SystemTime::now()
@@ -544,7 +545,7 @@ fn sync_chain_outgoing(
     Ok(false)
 }
 
-/// Send a withdraw transaction on the blockchain, return false if no more 
requested transaction are found
+/// Send a withdraw transaction on the blockchain, return false if no more 
requested transactions are found
 fn withdraw(db: &mut Client, rpc: &mut Rpc) -> LoopResult<bool> {
     // We rely on the advisory lock to ensure we are the only one sending 
transactions
     let row = db.query_opt(
@@ -571,7 +572,7 @@ fn withdraw(db: &mut Client, rpc: &mut Rpc) -> 
LoopResult<bool> {
     Ok(row.is_some())
 }
 
-/// Bounce a transaction on the blockchain, return false if no more requested 
transaction are found
+/// Bounce a transaction on the blockchain, return false if no more requested 
transactions are found
 fn bounce(db: &mut Client, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> 
{
     // We rely on the advisory lock to ensure we are the only one sending 
transactions
     let row = db.query_opt(
diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs
index 89123ef..195989a 100644
--- a/btc-wire/src/rpc.rs
+++ b/btc-wire/src/rpc.rs
@@ -394,7 +394,7 @@ impl Rpc {
     pub fn list_since_block(
         &mut self,
         hash: Option<&BlockHash>,
-        confirmation: u16,
+        confirmation: u32,
     ) -> Result<ListSinceBlock> {
         self.call("listsinceblock", &(hash, confirmation.max(1), (), true))
     }
diff --git a/common/src/lib.rs b/common/src/lib.rs
index 99a815d..fb6c2f5 100644
--- a/common/src/lib.rs
+++ b/common/src/lib.rs
@@ -1,6 +1,3 @@
-use std::{process::exit, thread::JoinHandle};
-
-use ::log::error;
 /*
   This file is part of TALER
   Copyright (C) 2022 Taler Systems SA
@@ -16,6 +13,9 @@ use ::log::error;
   You should have received a copy of the GNU Affero General Public License 
along with
   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
 */
+use std::{process::exit, thread::JoinHandle};
+
+use ::log::error;
 use rand::{rngs::OsRng, RngCore};
 use zeroize::Zeroizing;
 
diff --git a/db/eth.sql b/db/eth.sql
index ad28991..5c6f29d 100644
--- a/db/eth.sql
+++ b/db/eth.sql
@@ -25,7 +25,8 @@ CREATE TABLE tx_out (
     exchange_url TEXT NOT NULL,
     request_uid BYTEA UNIQUE,
     status SMALLINT NOT NULL DEFAULT 0,
-    txid BYTEA UNIQUE
+    txid BYTEA UNIQUE,
+    sent TIMESTAMP DEFAULT NULL
 );
 
 -- Bounced transaction
diff --git a/eth-wire/src/bin/eth-wire-utils.rs 
b/eth-wire/src/bin/eth-wire-utils.rs
index 16a3588..7578ced 100644
--- a/eth-wire/src/bin/eth-wire-utils.rs
+++ b/eth-wire/src/bin/eth-wire-utils.rs
@@ -15,7 +15,9 @@
 */
 use std::{
     path::{Path, PathBuf},
+    process::exit,
     str::FromStr,
+    time::{Duration, Instant},
 };
 
 use clap::StructOpt;
@@ -29,7 +31,7 @@ use common::{
 };
 use eth_wire::{
     rpc::{hex::Hex, Rpc, TransactionRequest},
-    taler_util::taler_to_eth,
+    taler_util::{taler_to_eth, TRUNC},
     SyncState,
 };
 use ethereum_types::{H160, U256};
@@ -81,6 +83,13 @@ enum Cmd {
         /// account address
         addr: String,
     },
+    // Check client and wire balance
+    CheckBalance {
+        client_addr: String,
+        client: u64,
+        wire_addr: String,
+        wire: u64,
+    },
     /// Add a peer
     Connect {
         /// peer datadir
@@ -188,7 +197,32 @@ fn main() {
         Cmd::Balance { addr } => {
             let addr = H160::from_str(&addr).unwrap();
             let balance = rpc.get_balance(&addr).unwrap();
-            println!("{}", (balance / 10_000_000_000u64).as_u64());
+            println!("{}", (balance / TRUNC).as_u64());
+        }
+        Cmd::CheckBalance {
+            client_addr,
+            client,
+            wire_addr,
+            wire,
+        } => {
+            let start = Instant::now();
+            let client_addr = H160::from_str(&client_addr).unwrap();
+            let wire_addr = H160::from_str(&wire_addr).unwrap();
+            loop {
+                let client_balance = (rpc.get_balance(&client_addr).unwrap() / 
TRUNC).as_u64();
+                let wire_balance = (rpc.get_balance(&wire_addr).unwrap() / 
TRUNC).as_u64();
+                if client_balance == client && wire_balance == wire {
+                    break;
+                } else if start.elapsed() > Duration::from_secs(60) {
+                    println!(
+                        "Expected {} {} got {} {}",
+                        client, wire, client_balance, wire_balance
+                    );
+                    exit(1);
+                } else {
+                    std::thread::sleep(Duration::from_secs(1))
+                }
+            }
         }
         Cmd::Connect { datadir } => {
             let mut peer = Rpc::new(datadir.join("geth.ipc")).unwrap();
@@ -196,6 +230,13 @@ fn main() {
             // Replace ip with localhost because it is broken
             enode.set_host(Some("127.0.0.1")).unwrap();
             assert!(rpc.add_peer(&enode).unwrap());
+            let start = Instant::now();
+            while rpc.count_peer().unwrap() == 0 {
+                if start.elapsed() > Duration::from_secs(60) {
+                    panic!("Connect timeout");
+                }
+                std::thread::sleep(Duration::from_secs(1))
+            }
         }
         Cmd::Disconnect { datadir } => {
             let mut peer = Rpc::new(datadir.join("geth.ipc")).unwrap();
@@ -203,6 +244,13 @@ fn main() {
             // Replace ip with localhost because it is broken
             enode.set_host(Some("127.0.0.1")).unwrap();
             assert!(rpc.remove_peer(&enode).unwrap());
+            let start = Instant::now();
+            while rpc.count_peer().unwrap() != 0 {
+                if start.elapsed() > Duration::from_secs(60) {
+                    panic!("Disconnect timeout");
+                }
+                std::thread::sleep(Duration::from_secs(1))
+            }
         }
         Cmd::Abandon { from } => {
             let from = H160::from_str(&from).unwrap();
diff --git a/eth-wire/src/lib.rs b/eth-wire/src/lib.rs
index 923bdaf..3ee6870 100644
--- a/eth-wire/src/lib.rs
+++ b/eth-wire/src/lib.rs
@@ -14,7 +14,7 @@
   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
 */
 
-use std::{str::FromStr, sync::atomic::AtomicU16};
+use std::{str::FromStr, sync::atomic::AtomicU32};
 
 use common::{api_common::Amount, config::EthConfig, url::Url};
 use ethereum_types::{Address, H160, H256, U256, U64};
@@ -92,16 +92,17 @@ impl rpc::Rpc {
     }
 
     /// List new and removed transaction since the last sync state, returning 
a new sync state
-    pub fn list_since_sync_state(
+    pub fn list_since_sync(
         &mut self,
         address: &Address,
         state: SyncState,
-        min_confirmation: u16,
-    ) -> rpc::Result<(Vec<(Transaction, u16)>, Vec<(Transaction, u16)>, 
SyncState)> {
-        let match_tx = |txs: Vec<Transaction>, conf: u16| -> Vec<(Transaction, 
u16)> {
+        min_confirmation: u32,
+    ) -> rpc::Result<ListSinceSync> {
+        let match_tx = |txs: Vec<Transaction>, confirmations: u32| -> 
Vec<SyncTransaction> {
             txs.into_iter()
                 .filter_map(|tx| {
-                    (tx.from == Some(*address) || tx.to == 
Some(*address)).then(|| (tx, conf))
+                    (tx.from == Some(*address) || tx.to == Some(*address))
+                        .then(|| SyncTransaction { tx, confirmations })
                 })
                 .collect()
         };
@@ -144,10 +145,10 @@ impl rpc::Rpc {
             confirmation += 1;
         }
 
-        Ok((
+        Ok(ListSinceSync {
             txs,
             removed,
-            SyncState {
+            state: SyncState {
                 tip_hash: latest.hash.unwrap(),
                 tip_height: latest.number.unwrap(),
                 conf_height: latest
@@ -155,10 +156,21 @@ impl rpc::Rpc {
                     .unwrap()
                     .saturating_sub(U64::from(min_confirmation)),
             },
-        ))
+        })
     }
 }
 
+pub struct SyncTransaction {
+    pub tx: Transaction,
+    pub confirmations: u32,
+}
+
+pub struct ListSinceSync {
+    pub txs: Vec<SyncTransaction>,
+    pub removed: Vec<SyncTransaction>,
+    pub state: SyncState,
+}
+
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub struct SyncState {
     pub tip_hash: H256,
@@ -187,8 +199,8 @@ impl SyncState {
 const DEFAULT_CONFIRMATION: u16 = 24;
 const DEFAULT_BOUNCE_FEE: &'static str = "ETH:0.00001";
 pub struct WireState {
-    pub confirmation: AtomicU16,
-    pub max_confirmations: u16,
+    pub confirmation: AtomicU32,
+    pub max_confirmations: u32,
     pub address: H160,
     pub config: EthConfig,
     pub bounce_fee: U256,
@@ -196,9 +208,9 @@ pub struct WireState {
 
 impl WireState {
     pub fn from_taler_config(config: EthConfig) -> Self {
-        let init_confirmation = 
config.confirmation.unwrap_or(DEFAULT_CONFIRMATION);
+        let init_confirmation = 
config.confirmation.unwrap_or(DEFAULT_CONFIRMATION) as u32;
         Self {
-            confirmation: AtomicU16::new(init_confirmation),
+            confirmation: AtomicU32::new(init_confirmation),
             max_confirmations: init_confirmation * 2,
             address: eth_payto_addr(&config.payto).unwrap(),
             bounce_fee: config_bounce_fee(&config),
diff --git a/eth-wire/src/loops/analysis.rs b/eth-wire/src/loops/analysis.rs
index 4e742cd..cd06d25 100644
--- a/eth-wire/src/loops/analysis.rs
+++ b/eth-wire/src/loops/analysis.rs
@@ -72,7 +72,7 @@ pub fn analysis(mut rpc: AutoRpcCommon, mut db: 
AutoReconnectDb, state: &WireSta
 }
 
 /// Return fork size and new tip
-pub fn check_fork(rpc: &mut Rpc, tip: &Option<(U64, H256)>) -> 
LoopResult<(u16, (U64, H256))> {
+pub fn check_fork(rpc: &mut Rpc, tip: &Option<(U64, H256)>) -> 
LoopResult<(u32, (U64, H256))> {
     let mut size = 0;
     let latest = rpc.latest_block()?;
     if let Some((number, hash)) = tip {
diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs
index 4f8b659..cb5990e 100644
--- a/eth-wire/src/loops/worker.rs
+++ b/eth-wire/src/loops/worker.rs
@@ -25,9 +25,9 @@ use common::{
 };
 use eth_wire::{
     metadata::{InMetadata, OutMetadata},
-    rpc::{self, AutoRpcWallet, Rpc, Transaction},
+    rpc::{self, AutoRpcWallet, Rpc, Transaction, TransactionRequest},
     taler_util::{eth_payto_url, eth_to_taler},
-    SyncState,
+    SyncState, SyncTransaction,
 };
 use ethereum_types::{Address, H256, U256};
 
@@ -97,6 +97,9 @@ pub fn worker(mut rpc: AutoRpcWallet, mut db: 
AutoReconnectDb, state: &WireState
                 // Send requested withdraws
                 while withdraw(db, rpc, state)? {}
 
+                // Bump stuck transactions
+                while bump(db, rpc, state)? {}
+
                 // Send requested bounce
                 while bounce(db, rpc, state.bounce_fee)? {}
             }
@@ -131,11 +134,16 @@ fn sync_chain(
     let block = SyncState::from_bytes(slice.try_into().unwrap());
     let min_confirmations = state.confirmation.load(Ordering::SeqCst);
 
-    let (txs, removed, next_state) =
-        rpc.list_since_sync_state(&state.address, block, min_confirmations)?;
+    let list = rpc.list_since_sync(&state.address, block, min_confirmations)?;
 
     // Check if a confirmed incoming transaction have been removed by a 
blockchain reorganisation
-    let new_status = sync_chain_removed(&txs, &removed, db, &state.address, 
min_confirmations)?;
+    let new_status = sync_chain_removed(
+        &list.txs,
+        &list.removed,
+        db,
+        &state.address,
+        min_confirmations,
+    )?;
 
     // Sync status with database
     if *status != new_status {
@@ -155,28 +163,29 @@ fn sync_chain(
         return Ok(false);
     }
 
-    for (tx, confirmation) in txs {
-        if tx.to == Some(state.address) && confirmation >= min_confirmations {
-            sync_chain_incoming_confirmed(&tx, db, state)?;
+    for sync_tx in list.txs {
+        let tx = &sync_tx.tx;
+        if tx.to == Some(state.address) && sync_tx.confirmations >= 
min_confirmations {
+            sync_chain_incoming_confirmed(tx, db, state)?;
         } else if tx.from == Some(state.address) {
-            sync_chain_outgoing(&tx, db, state)?;
+            sync_chain_outgoing(&sync_tx, db, state)?;
         }
     }
 
     db.execute(
         "UPDATE state SET value=$1 WHERE name='sync'",
-        &[&next_state.to_bytes().as_ref()],
+        &[&list.state.to_bytes().as_ref()],
     )?;
     Ok(true)
 }
 
 /// Sync database with removed transactions, return false if bitcoin backing 
is compromised
 fn sync_chain_removed(
-    txs: &[(Transaction, u16)],
-    removed: &[(Transaction, u16)],
+    txs: &[SyncTransaction],
+    removed: &[SyncTransaction],
     db: &mut Client,
     addr: &Address,
-    min_confirmation: u16,
+    min_confirmation: u32,
 ) -> LoopResult<bool> {
     // Removed transactions are correctness issues in only two cases:
     // - An incoming valid transaction considered confirmed in the database
@@ -189,11 +198,12 @@ fn sync_chain_removed(
 
     // Only keep incoming transaction that are not reconfirmed
     // TODO study risk of accepting only mined transactions for faster recovery
-    for (tx, _) in removed.iter().filter(|(tx, _)| {
-        tx.to == Some(*addr)
+    for tx in removed.iter().filter_map(|sync_tx| {
+        (sync_tx.tx.to == Some(*addr)
             && txs
                 .iter()
-                .all(|(t, conf)| t.hash != tx.hash || *conf < min_confirmation)
+                .all(|it| it.tx.hash != sync_tx.tx.hash || it.confirmations < 
min_confirmation))
+        .then(|| &sync_tx.tx)
     }) {
         match InMetadata::decode(&tx.input) {
             Ok(metadata) => match metadata {
@@ -291,7 +301,8 @@ fn sync_chain_incoming_confirmed(
 }
 
 /// Sync database with an outgoing transaction
-fn sync_chain_outgoing(tx: &Transaction, db: &mut Client, state: &WireState) 
-> LoopResult<()> {
+fn sync_chain_outgoing(tx: &SyncTransaction, db: &mut Client, state: 
&WireState) -> LoopResult<()> {
+    let SyncTransaction { tx, confirmations } = tx;
     match OutMetadata::decode(&tx.input) {
         Ok(metadata) => match metadata {
             OutMetadata::Withdraw { wtid, .. } => {
@@ -299,35 +310,41 @@ fn sync_chain_outgoing(tx: &Transaction, db: &mut Client, 
state: &WireState) ->
                 let credit_addr = tx.to.unwrap();
                 // Get previous out tx
                 let row = db.query_opt(
-                    "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR 
UPDATE",
+                    "SELECT id, status, sent FROM tx_out WHERE wtid=$1 FOR 
UPDATE",
                     &[&wtid.as_ref()],
                 )?;
                 if let Some(row) = row {
                     // If already in database, sync status
                     let row_id: i32 = row.get(0);
                     let status: i16 = row.get(1);
-                    match WithdrawStatus::try_from(status as u8).unwrap() {
-                        WithdrawStatus::Requested => {
-                            let nb_row = db.execute(
-                                "UPDATE tx_out SET status=$1, txid=$2 WHERE 
id=$3 AND status=$4",
-                                &[
-                                    &(WithdrawStatus::Sent as i16),
-                                    &tx.hash.as_ref(),
-                                    &row_id,
-                                    &status,
-                                ],
-                            )?;
-                            if nb_row > 0 {
-                                warn!(
-                                    ">> (recovered) {} {} in {} to {}",
-                                    amount,
-                                    base32(&wtid),
-                                    hex::encode(tx.hash),
-                                    hex::encode(credit_addr)
-                                );
+                    let sent: Option<SystemTime> = row.get(2);
+
+                    let expected_status = WithdrawStatus::Sent as i16;
+                    let expected_send = sent.filter(|_| *confirmations == 0);
+                    if status != expected_status || sent != expected_send {
+                        let nb_row = db.execute(
+                        "UPDATE tx_out SET status=$1, txid=$2, sent=NULL WHERE 
id=$3 AND status=$4",
+                        &[
+                            &(WithdrawStatus::Sent as i16),
+                            &tx.hash.as_ref(),
+                            &row_id,
+                            &status,
+                        ],
+                    )?;
+                        if nb_row > 0 {
+                            match WithdrawStatus::try_from(status as 
u8).unwrap() {
+                                WithdrawStatus::Requested => {
+                                    warn!(
+                                        ">> (recovered) {} {} in {} to {}",
+                                        amount,
+                                        base32(&wtid),
+                                        hex::encode(tx.hash),
+                                        hex::encode(credit_addr)
+                                    );
+                                }
+                                WithdrawStatus::Sent => { /* Status is correct 
*/ }
                             }
                         }
-                        WithdrawStatus::Sent => { /* Status is correct */ }
                     }
                 } else {
                     // Else add to database
@@ -396,7 +413,7 @@ fn sync_chain_outgoing(tx: &Transaction, db: &mut Client, 
state: &WireState) ->
     Ok(())
 }
 
-/// Send a withdraw transaction on the blockchain, return false if no more 
requested transaction are found
+/// Send a withdraw transaction on the blockchain, return false if no more 
requested transactions are found
 fn withdraw(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> 
LoopResult<bool> {
     // We rely on the advisory lock to ensure we are the only one sending 
transactions
     let row = db.query_opt(
@@ -412,7 +429,7 @@ fn withdraw(db: &mut Client, rpc: &mut Rpc, state: 
&WireState) -> LoopResult<boo
         let tx_id = rpc.withdraw(state.address, addr, amount, wtid, url)?;
         fail_point("(injected) fail withdraw", 0.3)?;
         db.execute(
-            "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
+            "UPDATE tx_out SET status=$1, txid=$2, sent=now() WHERE id=$3",
             &[&(WithdrawStatus::Sent as i16), &tx_id.as_ref(), &id],
         )?;
         let amount = eth_to_taler(&amount);
@@ -427,7 +444,39 @@ fn withdraw(db: &mut Client, rpc: &mut Rpc, state: 
&WireState) -> LoopResult<boo
     Ok(row.is_some())
 }
 
-/// Bounce a transaction on the blockchain, return false if no more requested 
transaction are found
+/// Bump a stuck transaction, return false if no more stuck transactions are 
found
+fn bump(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> 
{
+    if let Some(delay) = state.config.bump_delay {
+        // We rely on the advisory lock to ensure we are the only one sending 
transactions
+        let row = db.query_opt(
+        "SELECT id, txid FROM tx_out WHERE status=$1 AND EXTRACT(EPOCH FROM 
(now() - sent)) > $2 ORDER BY _date LIMIT 1",
+        &[&(WithdrawStatus::Sent as i16), &(delay as f64)],
+        )?;
+        if let Some(row) = &row {
+            let id: i32 = row.get(0);
+            let txid = sql_hash(row, 1);
+            let tx = rpc.get_transaction(&txid)?.expect("Bump existing tx");
+            rpc.send_transaction(&TransactionRequest {
+                from: tx.from.unwrap(),
+                to: tx.to.unwrap(),
+                value: tx.value,
+                gas_price: None,
+                data: tx.input,
+                nonce: Some(tx.nonce),
+            })?;
+            let row = db.query_one(
+                "UPDATE tx_out SET sent=now() WHERE id=$1 RETURNING wtid",
+                &[&id],
+            )?;
+            info!(">> (bump) {} in {}", base32(row.get(0)), hex::encode(txid));
+        }
+        Ok(row.is_some())
+    } else {
+        Ok(false)
+    }
+}
+
+/// Bounce a transaction on the blockchain, return false if no more requested 
transactions are found
 fn bounce(db: &mut Client, rpc: &mut Rpc, fee: U256) -> LoopResult<bool> {
     // We rely on the advisory lock to ensure we are the only one sending 
transactions
     let row = db.query_opt(
diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs
index 9ac6bea..545ad1b 100644
--- a/eth-wire/src/rpc.rs
+++ b/eth-wire/src/rpc.rs
@@ -235,12 +235,12 @@ impl Rpc {
         }
     }
 
-    pub fn fill_transaction(&mut self, params: &TransactionRequest) -> 
Result<Filled> {
-        self.call("eth_fillTransaction", &[params])
+    pub fn fill_transaction(&mut self, req: &TransactionRequest) -> 
Result<Filled> {
+        self.call("eth_fillTransaction", &[req])
     }
 
-    pub fn send_transaction(&mut self, params: &TransactionRequest) -> 
Result<H256> {
-        self.call("eth_sendTransaction", &[params])
+    pub fn send_transaction(&mut self, req: &TransactionRequest) -> 
Result<H256> {
+        self.call("eth_sendTransaction", &[req])
     }
 
     pub fn block(&mut self, hash: &H256) -> Result<Option<Block>> {
@@ -268,7 +268,7 @@ impl Rpc {
         }
     }
 
-    pub fn subscribe_new_head(&mut self) -> Result<RpcStream<BlockHead>> {
+    pub fn subscribe_new_head(&mut self) -> Result<RpcStream<Nothing>> {
         let mut rpc = Self::new(&self.path)?;
         let id: String = rpc.call("eth_subscribe", &["newHeads"])?;
         Ok(RpcStream::new(rpc, id))
@@ -299,6 +299,11 @@ impl Rpc {
     pub fn remove_peer(&mut self, url: &Url) -> Result<bool> {
         self.call("admin_removePeer", &[url])
     }
+
+    pub fn count_peer(&mut self) -> Result<usize> {
+        let peers: Vec<Nothing> = self.call("admin_peers", &EMPTY)?;
+        Ok(peers.len())
+    }
 }
 
 pub struct RpcStream<T: Debug + DeserializeOwned> {
@@ -344,26 +349,21 @@ pub enum NotifEnd<T> {
 
 #[derive(Debug, Clone, serde::Deserialize)]
 pub struct Block {
-    /// Hash of the block
     pub hash: Option<H256>,
     /// Block number (None if pending)
     pub number: Option<U64>,
-    /// Hash of the parent
     #[serde(rename = "parentHash")]
     pub parent_hash: H256,
-    /// Transactions
     pub transactions: Vec<Transaction>,
 }
 
 #[derive(Debug, serde::Deserialize)]
-pub struct BlockHead {}
+pub struct Nothing {}
 
 /// Description of a Transaction, pending or in the chain.
 #[derive(Debug, Clone, serde::Deserialize)]
 pub struct Transaction {
-    /// Hash
     pub hash: H256,
-    /// None
     pub nonce: U256,
     /// Sender address (None when coinbase)
     pub from: Option<Address>,
@@ -397,10 +397,8 @@ pub struct Filled {
 pub struct FilledGas {
     /// Supplied gas
     pub gas: U256,
-    /// Gas price
     #[serde(rename = "gasPrice")]
     pub gas_price: Option<U256>,
-    /// Max fee per gas
     #[serde(rename = "maxFeePerGas")]
     pub max_fee_per_gas: Option<U256>,
 }
diff --git a/instrumentation/src/eth.rs b/instrumentation/src/eth.rs
index e76d490..a26f8be 100644
--- a/instrumentation/src/eth.rs
+++ b/instrumentation/src/eth.rs
@@ -7,7 +7,7 @@ use common::{config::load_eth_config, rand_slice};
 use eth_wire::{
     metadata::OutMetadata,
     rpc::{hex::Hex, Rpc, TransactionRequest},
-    taler_util::{eth_to_taler, TRUNC, eth_payto_url},
+    taler_util::{eth_payto_url, eth_to_taler, TRUNC},
     SyncState, WireState,
 };
 use ethereum_types::U256;
@@ -40,7 +40,8 @@ pub fn eth_test(config: Option<&Path>, base_url: &str) {
             .config
             .core
             .data_dir
-            .unwrap_or_else(|| PathBuf::from("/tmp/geth.ipc")),
+            .unwrap_or_else(|| PathBuf::from("/tmp/"))
+            .join("geth.ipc"),
     )
     .unwrap();
 
@@ -113,11 +114,10 @@ pub fn eth_test(config: Option<&Path>, base_url: &str) {
     let bounce = {
         let mut notifier = rpc.subscribe_new_head().unwrap();
         'l: loop {
-            let (transactions, _, new_state) = rpc
-                .list_since_sync_state(&state.address, sync_state, 0)
-                .unwrap();
-            sync_state = new_state;
-            for (tx, _) in transactions {
+            let list = rpc.list_since_sync(&state.address, sync_state, 
0).unwrap();
+            sync_state = list.state;
+            for sync_tx in list.txs {
+                let tx = sync_tx.tx;
                 if tx.to.unwrap() == client_addr && tx.from.unwrap() == 
state.address {
                     let metadata = OutMetadata::decode(&tx.input).unwrap();
                     match metadata {
@@ -165,7 +165,13 @@ pub fn eth_test(config: Option<&Path>, base_url: &str) {
     check_incoming(base_url, &reserve_pub_key, &taler_test_amount);
 
     let wtid = rand_slice();
-    transfer(base_url, &wtid, &state.config.base_url, 
eth_payto_url(&client_addr), &taler_test_amount);
+    transfer(
+        base_url,
+        &wtid,
+        &state.config.base_url,
+        eth_payto_url(&client_addr),
+        &taler_test_amount,
+    );
     wait_for_pending(&mut rpc);
 
     println!("Check balances");
diff --git a/makefile b/makefile
index eabac64..91178a5 100644
--- a/makefile
+++ b/makefile
@@ -32,6 +32,7 @@ test_eth: install_test
        test/eth/reorg.sh
        test/eth/hell.sh
        test/eth/analysis.sh
+       test/eth/bumpfee.sh
        test/eth/maxfee.sh
 
 test: test_gateway test_eth test_btc
diff --git a/test/btc/conflict.sh b/test/btc/conflict.sh
index a930341..6c59b7f 100644
--- a/test/btc/conflict.sh
+++ b/test/btc/conflict.sh
@@ -48,7 +48,7 @@ taler-exchange-wire-gateway-client \
     -b $BANK_ENDPOINT \
     -C payto://bitcoin/$CLIENT \
     -a BTC:0.005 > /dev/null
-sleep 1
+sleep 5
 restart_btc
 mine_btc
 check_balance 9.96299209 0.03698010
diff --git a/test/common.sh b/test/common.sh
index 00b840a..71dcbcb 100644
--- a/test/common.sh
+++ b/test/common.sh
@@ -251,6 +251,8 @@ function init_eth() {
     \"constantinopleBlock\": 0,
     \"petersburgBlock\": 0,
     \"istanbulBlock\": 0,
+    \"berlinBlock\": 0,
+       \"londonBlock:\": 0,
     \"ethash\": {}
   },
   \"difficulty\": \"1\",
@@ -291,10 +293,9 @@ function eth_deco() {
 
 # Create a fork on the second node and reconnect the two node
 function eth_fork() {
-    sleep 5 # Sync before fork
+    sleep 2 # Sync before fork
     $WIRE_UTILS2 mine $RESERVE ${1:-}
     $WIRE_UTILS connect $WIRE_DIR2
-    sleep 10 # Can take soooooo long for nodes to sync
 }
 
 # Restart an initialized geth dev node
@@ -314,16 +315,7 @@ function restart_eth() {
 
 # Check client and wire balance
 function check_balance_eth() {
-    local CLIENT_BALANCE=`$WIRE_UTILS balance $CLIENT`
-    local WIRE_BALANCE=`$WIRE_UTILS balance $WIRE`
-    local CLIENT="${1:-*}"
-    if [ "$1" == "*" ]; then
-        local CLIENT="$CLIENT_BALANCE"
-    fi
-    if [ "$CLIENT_BALANCE" != "$CLIENT" ] || [ "$WIRE_BALANCE" != 
"${2:-$WIRE_BALANCE}" ]; then
-        echo "expected: client $CLIENT wire ${2:-$WIRE_BALANCE}    got: client 
$CLIENT_BALANCE wire $WIRE_BALANCE"
-        exit 1
-    fi
+    $WIRE_UTILS check-balance $CLIENT $1 $WIRE $2
 }
 
 
diff --git a/test/conf/taler_eth.conf b/test/conf/taler_eth_bump.conf
similarity index 85%
copy from test/conf/taler_eth.conf
copy to test/conf/taler_eth_bump.conf
index c292beb..127c001 100644
--- a/test/conf/taler_eth.conf
+++ b/test/conf/taler_eth_bump.conf
@@ -7,4 +7,5 @@ BASE_URL     = http://test.com
 [depolymerizer-ethereum]
 DB_URL       = 
postgres://localhost:5454/postgres?user=postgres&password=password
 PORT         = 8060
-CONFIRMATION = 3
\ No newline at end of file
+CONFIRMATION = 3
+BUMP_DELAY   = 5
\ No newline at end of file
diff --git a/test/eth/bumpfee.sh b/test/eth/bumpfee.sh
new file mode 100644
index 0000000..150f511
--- /dev/null
+++ b/test/eth/bumpfee.sh
@@ -0,0 +1,110 @@
+#!/bin/bash
+
+## Test eth-wire ability to handle stuck transaction correctly
+
+set -eu
+
+source "${BASH_SOURCE%/*}/../common.sh"
+CONFIG=taler_eth_bump.conf
+
+echo "----- Setup -----"
+echo "Load config file"
+load_config
+echo "Start ethereum node"
+init_eth --miner.gasprice 0
+echo "Start second ethereum node"
+init_eth2 --miner.gasprice 0
+echo "Start eth-wire"
+eth_wire
+echo "Start gateway"
+gateway
+echo ""
+
+SEQ="seq 10 20"
+
+echo -n "Making wire transfer to exchange: "
+$WIRE_UTILS deposit $CLIENT $WIRE 0.000 `$SEQ`
+sleep 1
+mine_eth # Trigger eth-wire
+check_balance_eth 999835000 165000
+echo "OK"
+
+echo  "----- Bump fee -----"
+
+echo -n "Making wire transfer from exchange: "
+for n in `$SEQ`; do
+    taler-exchange-wire-gateway-client \
+        -b $BANK_ENDPOINT \
+        -C payto://ethereum/$CLIENT \
+        -a ETH:0.0000$n > /dev/null
+done
+sleep 1
+echo "OK"
+
+echo -n "Bump miner fee:"
+restart_eth --miner.gasprice 1000
+sleep 5
+echo "OK"
+
+echo -n "Check bump: "
+sleep 1
+mine_eth # Mine transactions
+check_balance_eth 999851500 148499
+echo "OK"
+
+echo  "----- Bump fee reorg -----"
+
+echo "Loose second ethereum node"
+eth_deco
+
+echo -n "Making wire transfer from exchange: "
+for n in `$SEQ`; do
+    taler-exchange-wire-gateway-client \
+        -b $BANK_ENDPOINT \
+        -C payto://ethereum/$CLIENT \
+        -a ETH:0.0000$n > /dev/null
+done
+sleep 1
+echo "OK"
+
+echo -n "Perform fork and bump miner fee:"
+eth_fork 10
+restart_eth --miner.gasprice 2000
+sleep 5
+echo "OK"
+
+echo -n "Check bump: "
+sleep 1
+mine_eth # Mine transactions
+check_balance_eth 999868000 131999
+echo "OK"
+
+echo "----- Bump fee stress -----"
+
+echo -n "Replace btc-wire with stressed btc-wire"
+kill $WIRE_PID
+stress_eth_wire
+echo " OK"
+
+echo -n "Making wire transfer from exchange: "
+for n in `$SEQ`; do
+    taler-exchange-wire-gateway-client \
+        -b $BANK_ENDPOINT \
+        -C payto://ethereum/$CLIENT \
+        -a ETH:0.0000$n > /dev/null
+done
+sleep 1
+echo "OK"
+
+echo -n "Bump miner fee:"
+restart_eth --miner.gasprice 3000
+sleep 5
+echo "OK"
+
+echo -n "Check bump: "
+sleep 1
+mine_eth # Mine transactions
+check_balance_eth 999884500 115499
+echo "OK"
+
+echo "All tests passed!"
\ No newline at end of file
diff --git a/test/eth/reorg.sh b/test/eth/reorg.sh
index 2056782..7722cb9 100644
--- a/test/eth/reorg.sh
+++ b/test/eth/reorg.sh
@@ -93,6 +93,7 @@ echo -n "Perform fork and check eth-wire hard error:"
 gateway_up
 eth_fork 10
 check_balance_eth 999851500 148500
+sleep 5 # Wait for reconnect
 gateway_down
 echo " OK"
 

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]