gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-depolymerization] branch master updated (5db4ac3 -> f1aaee3)


From: gnunet
Subject: [taler-depolymerization] branch master updated (5db4ac3 -> f1aaee3)
Date: Mon, 07 Feb 2022 20:29:19 +0100

This is an automated email from the git hooks/post-receive script.

antoine pushed a change to branch master
in repository depolymerization.

    from 5db4ac3  Update dependencies and use taler-config to parse 
configuration
     new 8359314  Test bump reorged transactions
     new f1aaee3  Add eth-wire reorg handling and fix btc-wire bounce reorg 
handling

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 Cargo.lock                         |   4 +-
 btc-wire/src/loops/worker.rs       |  59 ++++++-----
 btc-wire/src/rpc.rs                |   2 +-
 common/src/config.rs               |   2 +-
 eth-wire/src/bin/eth-wire-cli.rs   |  18 ++--
 eth-wire/src/bin/eth-wire-utils.rs |  43 ++++++--
 eth-wire/src/lib.rs                |  40 ++++----
 eth-wire/src/loops/worker.rs       | 195 +++++++++++++++++++++++++++++++------
 eth-wire/src/rpc.rs                |  40 ++++++--
 makefile                           |   1 +
 test/btc/bumpfee.sh                |  54 +++++++---
 test/btc/reorg.sh                  |   1 +
 test/common.sh                     |  39 ++++++--
 test/conf/taler_btc_bump.conf      |   3 +-
 test/eth/reorg.sh                  | 108 ++++++++++++++++++++
 test/eth/wire.sh                   |   3 +-
 16 files changed, 484 insertions(+), 128 deletions(-)
 create mode 100644 test/eth/reorg.sh

diff --git a/Cargo.lock b/Cargo.lock
index 30c1a6d..20e1471 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1796,9 +1796,9 @@ dependencies = [
 
 [[package]]
 name = "unicode-segmentation"
-version = "1.8.0"
+version = "1.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
+checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
 
 [[package]]
 name = "unicode-width"
diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs
index a84e373..e8a355c 100644
--- a/btc-wire/src/loops/worker.rs
+++ b/btc-wire/src/loops/worker.rs
@@ -14,7 +14,7 @@
   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
 */
 use std::{
-    collections::{HashMap, HashSet},
+    collections::HashMap,
     fmt::Write,
     sync::atomic::Ordering,
     time::{Duration, SystemTime},
@@ -50,8 +50,8 @@ use super::{LoopError, LoopResult};
 pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, state: 
&WireState) {
     let mut lifetime = state.config.wire_lifetime;
     let mut status = true;
-
     let mut skip_notification = false;
+
     loop {
         // Check lifetime
         if let Some(nb) = lifetime.as_mut() {
@@ -212,7 +212,11 @@ fn sync_chain(
     let min_confirmations = state.confirmation.load(Ordering::SeqCst);
 
     // Get a set of transactions ids to parse
-    let (txs, removed, lastblock): (HashMap<Txid, (Category, i32)>, 
HashSet<Txid>, BlockHash) = {
+    let (txs, removed, lastblock): (
+        HashMap<Txid, (Category, i32)>,
+        HashMap<Txid, (Category, i32)>,
+        BlockHash,
+    ) = {
         // Get all transactions made since this block
         let list = rpc.list_since_block(Some(&last_hash), min_confirmations, 
true)?;
         // Only keep ids and category
@@ -224,15 +228,15 @@ fn sync_chain(
         let removed = list
             .removed
             .into_iter()
-            .filter_map(|tx| (tx.category == Category::Receive).then(|| 
tx.txid))
+            .map(|tx| (tx.txid, (tx.category, tx.confirmations)))
             .collect();
         (txs, removed, list.lastblock)
     };
 
     // Check if a confirmed incoming transaction have been removed by a 
blockchain reorganisation
-
     let new_status = sync_chain_removed(&txs, &removed, rpc, db, 
min_confirmations as i32)?;
 
+    // Sync status with database
     if *status != new_status {
         let mut tx = db.transaction()?;
         tx.execute(
@@ -242,11 +246,14 @@ fn sync_chain(
         tx.execute("NOTIFY status", &[])?;
         tx.commit()?;
         *status = new_status;
+        if new_status {
+            info!("Recovered lost transactions");
+        }
     }
-
     if !new_status {
         return Ok(false);
     }
+
     for (id, (category, confirmations)) in txs {
         match category {
             Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, 
state)?,
@@ -265,41 +272,44 @@ fn sync_chain(
         &[&lastblock.as_ref()],
     )?;
 
-    
-    dbg!(last_hash, lastblock);
     Ok(true)
 }
 
 /// Sync database with removed transactions, return false if bitcoin backing 
is compromised
 fn sync_chain_removed(
     txs: &HashMap<Txid, (Category, i32)>,
-    removed: &HashSet<Txid>,
+    removed: &HashMap<Txid, (Category, i32)>,
     rpc: &mut Rpc,
     db: &mut Client,
     min_confirmations: i32,
 ) -> LoopResult<bool> {
-    // Removed transactions are correctness issue in only two cases:
+    // Removed transactions are correctness issues in only two cases:
     // - An incoming valid transaction considered confirmed in the database
     // - An incoming invalid transactions already bounced
     // Those two cases can compromise bitcoin backing
     // Removed outgoing transactions will be retried automatically by the node
 
-    let mut blocking_receive = Vec::new();
+    let mut blocking_deposit = Vec::new();
     let mut blocking_bounce = Vec::new();
-    for id in removed {
+
+    // Only keep incoming transaction that are not reconfirmed
+    // TODO study risk of accepting only mined transactions for faster recovery
+    for (id, _) in removed.iter().filter(|(id, (cat, _))| {
+        *cat == Category::Receive
+            && txs
+                .get(*id)
+                .map(|(_, confirmations)| *confirmations < min_confirmations)
+                .unwrap_or(true)
+    }) {
         match rpc.get_tx_segwit_key(id) {
             Ok((full, key)) => {
-                // Valid tx are only problematic if not confirmed in the txs 
list and stored stored in the database
-                if txs
-                    .get(id)
-                    .map(|(_, confirmations)| *confirmations < 
min_confirmations)
-                    .unwrap_or(true)
-                    && db
-                        .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", 
&[&key.as_ref()])?
-                        .is_some()
+                // Deposit are only problematic if not reconfirmed and stored 
in the database
+                if db
+                    .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", 
&[&key.as_ref()])?
+                    .is_some()
                 {
                     let debit_addr = sender_address(rpc, &full)?;
-                    blocking_receive.push((key, id, debit_addr));
+                    blocking_deposit.push((key, id, debit_addr));
                 }
             }
             Err(err) => match err {
@@ -320,12 +330,12 @@ fn sync_chain_removed(
         }
     }
 
-    if !blocking_bounce.is_empty() || !blocking_receive.is_empty() {
+    if !blocking_bounce.is_empty() || !blocking_deposit.is_empty() {
         let mut buf = "The following transaction have been removed from the 
blockchain, bitcoin backing is compromised until the transaction 
reappear:".to_string();
-        for (key, id, addr) in blocking_receive {
+        for (key, id, addr) in blocking_deposit {
             write!(
                 &mut buf,
-                "\n\treceived {} in {} from {}",
+                "\n\tdeposit {} in {} from {}",
                 base32(&key),
                 id,
                 addr
@@ -557,7 +567,6 @@ fn sync_chain_incoming_confirmed(
     rpc: &mut Rpc,
     db: &mut Client,
 ) -> Result<(), LoopError> {
-    dbg!(id);
     match rpc.get_tx_segwit_key(id) {
         Ok((full, reserve_pub)) => {
             // Store transactions in database
diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs
index aa25f1b..8003304 100644
--- a/btc-wire/src/rpc.rs
+++ b/btc-wire/src/rpc.rs
@@ -70,7 +70,7 @@ pub enum Error {
     Bitcoin(String),
     #[error("JSON: {0}")]
     Json(#[from] serde_json::Error),
-    #[error("No result or error")]
+    #[error("Null rpc, no result or error")]
     Null,
 }
 
diff --git a/common/src/config.rs b/common/src/config.rs
index 00bcf62..d74865b 100644
--- a/common/src/config.rs
+++ b/common/src/config.rs
@@ -131,7 +131,7 @@ pub fn load_btc_config(path: Option<&str>) -> BtcConfig {
     return config;
 }
 
-pub type EthConfig = WireConfig<1000000, 24>;
+pub type EthConfig = WireConfig<10_000_000_000_000, 24>;
 
 pub fn load_eth_config(path: Option<&str>) -> EthConfig {
     let config = WireConfig::load_taler_config(path);
diff --git a/eth-wire/src/bin/eth-wire-cli.rs b/eth-wire/src/bin/eth-wire-cli.rs
index e638157..ce8aa93 100644
--- a/eth-wire/src/bin/eth-wire-cli.rs
+++ b/eth-wire/src/bin/eth-wire-cli.rs
@@ -14,12 +14,13 @@
   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
 */
 
-use eth_wire::{rpc::Rpc, BlockState};
-use ethereum_types::H160;
 use common::{
     config::{Config, CoreConfig},
-    postgres::{Client, NoTls}, log::init,
+    log::init,
+    postgres::{Client, NoTls},
 };
+use eth_wire::{rpc::Rpc, SyncState};
+use ethereum_types::H160;
 
 fn main() {
     init();
@@ -51,18 +52,19 @@ fn main() {
 
             // Skip previous blocks
             let block = rpc.latest_block().expect("Failed to get current 
block");
-            let state = BlockState {
-                hash: block.hash.unwrap(),
-                number: block.number.unwrap(),
+            let state = SyncState {
+                tip_hash: block.hash.unwrap(),
+                tip_height: block.number.unwrap(),
+                conf_height: block.number.unwrap(),
             };
             let nb_row = db
               .execute(
-                  "INSERT INTO state (name, value) VALUES ('last_block', $1) 
ON CONFLICT (name) DO NOTHING",
+                  "INSERT INTO state (name, value) VALUES ('sync', $1) ON 
CONFLICT (name) DO NOTHING",
                   &[&state.to_bytes().as_ref()],
               )
               .expect("Failed to update database state");
             if nb_row > 0 {
-                println!("Skipped {} previous block", state.number);
+                println!("Skipped {} previous block", state.conf_height);
             }
 
             let prev_addr = db
diff --git a/eth-wire/src/bin/eth-wire-utils.rs 
b/eth-wire/src/bin/eth-wire-utils.rs
index cb15d45..26f5ddd 100644
--- a/eth-wire/src/bin/eth-wire-utils.rs
+++ b/eth-wire/src/bin/eth-wire-utils.rs
@@ -15,7 +15,7 @@
 */
 use std::{path::PathBuf, str::FromStr};
 
-use common::{api_common::Amount, rand_slice, log::init};
+use common::{api_common::Amount, log::init, rand_slice};
 use eth_wire::{
     rpc::{hex::Hex, Rpc, TransactionRequest},
     taler_util::taler_to_eth,
@@ -40,6 +40,8 @@ enum Cmd {
     Mine(MineCmd),
     ClearDB(ClearCmd),
     Balance(BalanceCmd),
+    Connect(ConnectCmd),
+    Disconnect(DisconnectCmd),
 }
 
 #[derive(argh::FromArgs)]
@@ -102,7 +104,7 @@ struct MineCmd {
 struct ClearCmd {
     #[argh(positional)]
     /// taler config
-    config: String,
+    config: PathBuf,
 }
 
 #[derive(argh::FromArgs)]
@@ -114,9 +116,28 @@ struct BalanceCmd {
     addr: String,
 }
 
+#[derive(argh::FromArgs)]
+#[argh(subcommand, name = "connect")]
+/// Add a peer
+struct ConnectCmd {
+    #[argh(positional)]
+    /// peer datadir
+    datadir: PathBuf,
+}
+
+#[derive(argh::FromArgs)]
+#[argh(subcommand, name = "disconnect")]
+/// Remove a peer
+struct DisconnectCmd {
+    #[argh(positional)]
+    /// peer datadir
+    datadir: PathBuf,
+}
+
 fn main() {
     init();
     let args: Args = argh::from_env();
+    let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
     match args.cmd {
         Cmd::Deposit(DepositCmd {
             from,
@@ -124,7 +145,6 @@ fn main() {
             amounts,
             fmt,
         }) => {
-            let mut rpc = 
Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
             let from = H160::from_str(&from).unwrap();
             let to = H160::from_str(&to).unwrap();
             rpc.unlock_account(&from, "password").ok();
@@ -140,7 +160,6 @@ fn main() {
             fmt,
             amounts,
         }) => {
-            let mut rpc = 
Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
             let from = H160::from_str(&from).unwrap();
             let to = H160::from_str(&to).unwrap();
             rpc.unlock_account(&from, "password").ok();
@@ -159,7 +178,6 @@ fn main() {
             }
         }
         Cmd::Mine(MineCmd { to, mut amount }) => {
-            let mut rpc = 
Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
             let to = H160::from_str(&to).unwrap();
             rpc.unlock_account(&to, "password").ok();
             let mut notifier = rpc.subscribe_new_head().unwrap();
@@ -175,11 +193,24 @@ fn main() {
             rpc.miner_stop().unwrap();
         }
         Cmd::Balance(BalanceCmd { addr }) => {
-            let mut rpc = 
Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
             let addr = H160::from_str(&addr).unwrap();
             let balance = rpc.balance(&addr).unwrap();
             println!("{}", (balance / 10_000_000_000u64).as_u64());
         }
         Cmd::ClearDB(_) => todo!(),
+        Cmd::Connect(ConnectCmd { datadir }) => {
+            let mut peer = Rpc::new(datadir.join("geth.ipc")).unwrap();
+            let mut enode = peer.node_info().unwrap().enode;
+            // Replace ip with localhost because it is broken
+            enode.set_host(Some("127.0.0.1")).unwrap();
+            assert!(rpc.add_peer(&enode).unwrap());
+        }
+        Cmd::Disconnect(DisconnectCmd { datadir }) => {
+            let mut peer = Rpc::new(datadir.join("geth.ipc")).unwrap();
+            let mut enode = peer.node_info().unwrap().enode;
+            // Replace ip with localhost because it is broken
+            enode.set_host(Some("127.0.0.1")).unwrap();
+            assert!(rpc.remove_peer(&enode).unwrap());
+        }
     }
 }
diff --git a/eth-wire/src/lib.rs b/eth-wire/src/lib.rs
index 9561576..c689a63 100644
--- a/eth-wire/src/lib.rs
+++ b/eth-wire/src/lib.rs
@@ -14,10 +14,10 @@
   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
 */
 
+use common::url::Url;
 use ethereum_types::{Address, H256, U256, U64};
 use metadata::{InMetadata, OutMetadata};
 use rpc::hex::Hex;
-use common::url::Url;
 
 pub mod metadata;
 pub mod rpc;
@@ -80,43 +80,47 @@ impl rpc::Rpc {
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub struct BlockState {
-    pub hash: H256,
-    pub number: U64,
+pub struct SyncState {
+    pub tip_hash: H256,
+    pub tip_height: U64,
+    pub conf_height: U64,
 }
 
-impl BlockState {
-    pub fn to_bytes(&self) -> [u8; 40] {
-        let mut bytes = [0; 40];
-        bytes[..32].copy_from_slice(self.hash.as_bytes());
-        self.number.to_little_endian(&mut bytes[32..]);
+impl SyncState {
+    pub fn to_bytes(&self) -> [u8; 48] {
+        let mut bytes = [0; 48];
+        bytes[..32].copy_from_slice(self.tip_hash.as_bytes());
+        self.tip_height.to_little_endian(&mut bytes[32..40]);
+        self.conf_height.to_little_endian(&mut bytes[40..]);
         bytes
     }
 
-    pub fn from_bytes(bytes: &[u8; 40]) -> Self {
+    pub fn from_bytes(bytes: &[u8; 48]) -> Self {
         Self {
-            hash: H256::from_slice(&bytes[..32]),
-            number: U64::from_little_endian(&bytes[32..]),
+            tip_hash: H256::from_slice(&bytes[..32]),
+            tip_height: U64::from_little_endian(&bytes[32..40]),
+            conf_height: U64::from_little_endian(&bytes[40..]),
         }
     }
 }
 
 #[cfg(test)]
 mod test {
-    use ethereum_types::{H256, U64};
     use common::{rand::random, rand_slice};
+    use ethereum_types::{H256, U64};
 
-    use crate::BlockState;
+    use crate::SyncState;
 
     #[test]
     fn to_from_bytes_block_state() {
         for _ in 0..4 {
-            let state = BlockState {
-                hash: H256::from_slice(&rand_slice::<32>()),
-                number: U64::from(random::<u64>()),
+            let state = SyncState {
+                tip_hash: H256::from_slice(&rand_slice::<32>()),
+                tip_height: U64::from(random::<u64>()),
+                conf_height: U64::from(random::<u64>()),
             };
             let encoded = state.to_bytes();
-            let decoded = BlockState::from_bytes(&encoded);
+            let decoded = SyncState::from_bytes(&encoded);
             assert_eq!(state, decoded);
         }
     }
diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs
index b1962a5..b812c10 100644
--- a/eth-wire/src/loops/worker.rs
+++ b/eth-wire/src/loops/worker.rs
@@ -13,7 +13,7 @@
   You should have received a copy of the GNU Affero General Public License 
along with
   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
 */
-use std::{sync::atomic::Ordering, time::SystemTime};
+use std::{fmt::Write, sync::atomic::Ordering, time::SystemTime};
 
 use common::{
     api_common::base32,
@@ -26,9 +26,9 @@ use eth_wire::{
     metadata::InMetadata,
     rpc::{self, Rpc, Transaction},
     taler_util::{eth_payto_url, eth_to_taler},
-    BlockState,
+    SyncState,
 };
-use ethereum_types::{Address, H256, U256};
+use ethereum_types::{Address, H256, U256, U64};
 
 use crate::{
     sql::{sql_addr, sql_eth_amount, sql_hash},
@@ -37,7 +37,9 @@ use crate::{
 
 pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
     let mut lifetime = state.config.wire_lifetime;
+    let mut status = true;
     let mut skip_notification = false;
+
     loop {
         // Check lifetime
         if let Some(nb) = lifetime.as_mut() {
@@ -64,7 +66,7 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: 
&WireState) {
                 while iter.next()?.is_some() {}
             }
 
-            sync_chain(&mut rpc, &mut db, state)?;
+            sync_chain(&mut rpc, &mut db, state, &mut status)?;
 
             while withdraw(&mut db, &mut rpc, state)? {}
 
@@ -85,9 +87,9 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: 
&WireState) {
 fn list_since_block_state(
     rpc: &mut Rpc,
     address: &Address,
-    state: BlockState,
+    state: SyncState,
     min_confirmation: u16,
-) -> LoopResult<Option<(Vec<(Transaction, u16)>, BlockState)>> {
+) -> LoopResult<(Vec<(Transaction, u16)>, Vec<(Transaction, u16)>, SyncState)> 
{
     let match_tx = |txs: Vec<Transaction>, conf: u16| -> Vec<(Transaction, 
u16)> {
         txs.into_iter()
             .filter_map(|tx| {
@@ -97,45 +99,91 @@ fn list_since_block_state(
     };
 
     let mut txs = Vec::new();
+    let mut removed = Vec::new();
+
     // Add pending transaction
     txs.extend(match_tx(rpc.pending_transactions()?, 0));
 
-    let mut next_state = state;
+    let latest = rpc.latest_block()?;
+
     let mut confirmation = 1;
-    let mut current = rpc.latest_block()?;
-
-    // Move backward until we reach the starting block
-    while current.number.expect("Mined block") != state.number {
-        if confirmation == min_confirmation {
-            next_state = BlockState {
-                hash: current.hash.unwrap(),
-                number: current.number.unwrap(),
-            };
-        }
-        txs.extend(match_tx(current.transactions, confirmation));
-        if let Some(block) = rpc.block(&current.parent_hash)? {
-            current = block;
-        } else {
-            return Ok(None);
-        }
+    let mut chain_cursor = latest.clone();
+
+    // Move until tip height
+    while chain_cursor.number.unwrap() != state.tip_height {
+        txs.extend(match_tx(chain_cursor.transactions, confirmation));
+        chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
         confirmation += 1;
     }
 
-    if current.hash.unwrap() != state.hash {
-        return Ok(None);
+    // Check if fork
+    if chain_cursor.hash.unwrap() != state.tip_hash {
+        let mut fork_cursor = rpc.block(&state.tip_hash)?.unwrap();
+        // Move until found common parent
+        while fork_cursor.hash != chain_cursor.hash {
+            txs.extend(match_tx(chain_cursor.transactions, confirmation));
+            removed.extend(match_tx(fork_cursor.transactions, confirmation));
+            chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
+            fork_cursor = rpc.block(&fork_cursor.parent_hash)?.unwrap();
+            confirmation += 1;
+        }
     }
 
-    Ok(Some((txs, next_state)))
+    // Move until last conf
+    while chain_cursor.number.unwrap() > state.conf_height {
+        txs.extend(match_tx(chain_cursor.transactions, confirmation));
+        chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
+        confirmation += 1;
+    }
+
+    Ok((
+        txs,
+        removed,
+        SyncState {
+            tip_hash: latest.hash.unwrap(),
+            tip_height: latest.number.unwrap(),
+            conf_height: latest
+                .number
+                .unwrap()
+                .saturating_sub(U64::from(min_confirmation)),
+        },
+    ))
 }
 
-fn sync_chain(rpc: &mut Rpc, db: &mut Client, state: &WireState) -> 
LoopResult<bool> {
-    let row = db.query_one("SELECT value FROM state WHERE name='last_block'", 
&[])?;
+fn sync_chain(
+    rpc: &mut Rpc,
+    db: &mut Client,
+    state: &WireState,
+    status: &mut bool,
+) -> LoopResult<bool> {
+    let row = db.query_one("SELECT value FROM state WHERE name='sync'", &[])?;
     let slice: &[u8] = row.get(0);
-    let block = BlockState::from_bytes(slice.try_into().unwrap());
+    let block = SyncState::from_bytes(slice.try_into().unwrap());
     let min_confirmations = state.confirmation.load(Ordering::SeqCst);
 
-    let (txs, next_state) =
-        list_since_block_state(rpc, &state.address, block, 
min_confirmations)?.unwrap();
+    let (txs, removed, next_state) =
+        list_since_block_state(rpc, &state.address, block, min_confirmations)?;
+
+    // Check if a confirmed incoming transaction have been removed by a 
blockchain reorganisation
+    let new_status = sync_chain_removed(&txs, &removed, db, &state.address, 
min_confirmations)?;
+
+    // Sync status with database
+    if *status != new_status {
+        let mut tx = db.transaction()?;
+        tx.execute(
+            "UPDATE state SET value=$1 WHERE name='status'",
+            &[&[new_status as u8].as_ref()],
+        )?;
+        tx.execute("NOTIFY status", &[])?;
+        tx.commit()?;
+        *status = new_status;
+        if new_status {
+            info!("Recovered lost transactions");
+        }
+    }
+    if !new_status {
+        return Ok(false);
+    }
 
     for (tx, confirmation) in txs {
         if tx.to == Some(state.address) && confirmation >= min_confirmations {
@@ -171,12 +219,95 @@ fn sync_chain(rpc: &mut Rpc, db: &mut Client, state: 
&WireState) -> LoopResult<b
     }
 
     db.execute(
-        "UPDATE state SET value=$1 WHERE name='last_block'",
+        "UPDATE state SET value=$1 WHERE name='sync'",
         &[&next_state.to_bytes().as_ref()],
     )?;
     Ok(true)
 }
 
+/// Sync database with removed transactions, return false if bitcoin backing 
is compromised
+fn sync_chain_removed(
+    txs: &[(Transaction, u16)],
+    removed: &[(Transaction, u16)],
+    db: &mut Client,
+    addr: &Address,
+    min_confirmation: u16,
+) -> LoopResult<bool> {
+    // Removed transactions are correctness issues in only two cases:
+    // - An incoming valid transaction considered confirmed in the database
+    // - An incoming invalid transactions already bounced
+    // Those two cases can compromise ethereum backing
+    // Removed outgoing transactions will be retried automatically by the node
+    
+    let mut blocking_deposit = Vec::new();
+    let mut blocking_bounce = Vec::new();
+
+    // Only keep incoming transaction that are not reconfirmed
+    // TODO study risk of accepting only mined transactions for faster recovery
+    for (tx, _) in removed.iter().filter(|(tx, _)| {
+        tx.to == Some(*addr)
+            && txs
+                .iter()
+                .all(|(t, conf)| t.hash != tx.hash || *conf < min_confirmation)
+    }) {
+        match InMetadata::decode(&tx.input) {
+            Ok(metadata) => match metadata {
+                InMetadata::Deposit { reserve_pub } => {
+                    // Deposit are only problematic if not reconfirmed and 
stored in the database
+                    if db
+                        .query_opt(
+                            "SELECT 1 FROM tx_in WHERE reserve_pub=$1",
+                            &[&reserve_pub.as_ref()],
+                        )?
+                        .is_some()
+                    {
+                        blocking_deposit.push((reserve_pub, tx.hash, 
tx.from.unwrap()));
+                    }
+                }
+            },
+            Err(_) => {
+                // Invalid tx are only problematic if if not reconfirmed and 
already bounced
+                if let Some(row) = db.query_opt(
+                    "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT 
NULL",
+                    &[&tx.hash.as_ref()],
+                )? {
+                    blocking_bounce.push((sql_hash(&row, 0), tx.hash));
+                } else {
+                    // Remove transaction from bounce table
+                    db.execute("DELETE FROM bounce WHERE bounced=$1", 
&[&tx.hash.as_ref()])?;
+                }
+            }
+        }
+    }
+
+    if !blocking_bounce.is_empty() || !blocking_deposit.is_empty() {
+        let mut buf = "The following transaction have been removed from the 
blockchain, ethereum backing is compromised until the transaction 
reappear:".to_string();
+        for (key, id, addr) in blocking_deposit {
+            write!(
+                &mut buf,
+                "\n\tdeposit {} in {} from {}",
+                base32(&key),
+                hex::encode(id),
+                hex::encode(addr)
+            )
+            .unwrap();
+        }
+        for (id, bounced) in blocking_bounce {
+            write!(
+                &mut buf,
+                "\n\tbounce {} in {}",
+                hex::encode(id),
+                hex::encode(bounced)
+            )
+            .unwrap();
+        }
+        error!("{}", buf);
+        return Ok(false);
+    } else {
+        return Ok(true);
+    }
+}
+
 /// Send a withdraw transaction on the blockchain, return false if no more 
requested transaction are found
 fn withdraw(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> 
LoopResult<bool> {
     // We rely on the advisory lock to ensure we are the only one sending 
transactions
diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs
index ca9f034..e626cd6 100644
--- a/eth-wire/src/rpc.rs
+++ b/eth-wire/src/rpc.rs
@@ -19,6 +19,7 @@
 //! We only parse the thing we actually use, this reduce memory usage and
 //! make our code more compatible with future deprecation
 
+use common::url::Url;
 use ethereum_types::{Address, H256, U256, U64};
 use serde::de::DeserializeOwned;
 use serde_json::error::Category;
@@ -60,7 +61,7 @@ pub enum Error {
     RPC { code: i64, msg: String },
     #[error("JSON: {0}")]
     Json(#[from] serde_json::Error),
-    #[error("No result or error")]
+    #[error("Null rpc, no result or error")]
     Null,
 }
 
@@ -186,19 +187,21 @@ impl Rpc {
     }
 
     pub fn get_transaction(&mut self, hash: &H256) -> 
Result<Option<Transaction>> {
-        self.call("eth_getTransactionByHash", &[hash])
+        match self.call("eth_getTransactionByHash", &[hash]) {
+            Err(Error::Null) => Ok(None),
+            r => r,
+        }
     }
 
     pub fn send_transaction(&mut self, params: &TransactionRequest) -> 
Result<H256> {
         self.call("eth_sendTransaction", &[params])
     }
 
-    pub fn block_at(&mut self, nb: &U64) -> Result<Option<Block>> {
-        self.call("eth_getBlockByNumber", &(nb, &true))
-    }
-
     pub fn block(&mut self, hash: &H256) -> Result<Option<Block>> {
-        self.call("eth_getBlockByHash", &(hash, &true))
+        match self.call("eth_getBlockByHash", &(hash, &true)) {
+            Err(Error::Null) => Ok(None),
+            r => r,
+        }
     }
 
     pub fn pending_transactions(&mut self) -> Result<Vec<Transaction>> {
@@ -232,6 +235,20 @@ impl Rpc {
     pub fn balance(&mut self, addr: &Address) -> Result<U256> {
         self.call("eth_getBalance", &(addr, "latest"))
     }
+
+    /* ----- Peer management ----- */
+
+    pub fn node_info(&mut self) -> Result<NodeInfo> {
+        self.call("admin_nodeInfo", &EMPTY)
+    }
+
+    pub fn add_peer(&mut self, url: &Url) -> Result<bool> {
+        self.call("admin_addPeer", &[url])
+    }
+
+    pub fn remove_peer(&mut self, url: &Url) -> Result<bool> {
+        self.call("admin_removePeer", &[url])
+    }
 }
 
 pub struct RpcStream<T: Debug + DeserializeOwned> {
@@ -275,7 +292,7 @@ pub enum NotifEnd<T> {
     End(bool),
 }
 
-#[derive(Debug, serde::Deserialize)]
+#[derive(Debug, Clone, serde::Deserialize)]
 pub struct Block {
     /// Hash of the block
     pub hash: Option<H256>,
@@ -292,7 +309,7 @@ pub struct Block {
 pub struct BlockHead {}
 
 /// Description of a Transaction, pending or in the chain.
-#[derive(Debug, serde::Deserialize)]
+#[derive(Debug, Clone, serde::Deserialize)]
 pub struct Transaction {
     /// Hash
     pub hash: H256,
@@ -326,6 +343,11 @@ pub struct TransactionRequest {
     pub data: Hex,
 }
 
+#[derive(Debug, serde::Deserialize)]
+pub struct NodeInfo {
+    pub enode: Url,
+}
+
 pub mod hex {
     use std::{
         fmt,
diff --git a/makefile b/makefile
index cee034b..cea4d3a 100644
--- a/makefile
+++ b/makefile
@@ -22,5 +22,6 @@ test_btc: install
 test_eth: install
        test/eth/wire.sh
        test/eth/lifetime.sh
+       test/eth/reorg.sh
 
 test: install test_gateway test_eth test_btc
\ No newline at end of file
diff --git a/test/btc/bumpfee.sh b/test/btc/bumpfee.sh
index e1b7c51..58d7818 100644
--- a/test/btc/bumpfee.sh
+++ b/test/btc/bumpfee.sh
@@ -25,7 +25,6 @@ echo ""
 
 SEQ="seq 10 30"
 
-
 echo -n "Making wire transfer to exchange:"
 for n in `$SEQ`; do
     btc-wire-utils -d $WIRE_DIR transfer 0.$n > /dev/null
@@ -46,17 +45,45 @@ sleep 1
 check_balance 5.79983389 4.19599801
 echo " OK"
 
-echo -n "Abandon pending transaction:"
+echo -n "Bump relay fee:"
 restart_btc -minrelaytxfee=0.0001
 echo " OK"
 
 echo -n "Check bump:"
-sleep 6
+sleep 5
 mine_btc
+sleep 1
 check_balance 5.80383389 4.19598010
 echo " OK"
 
-echo "----- Bump stress -----"
+echo "----- Bump fee reorg -----"
+
+echo "Loose second bitcoin node"
+btc2_deco
+
+echo -n "Making wire transfer from exchange:"
+taler-exchange-wire-gateway-client \
+    -b $BANK_ENDPOINT \
+    -C payto://bitcoin/$CLIENT \
+    -a BTC:0.004 > /dev/null
+sleep 1
+check_balance 5.80383389 4.19196020
+echo " OK"
+
+echo -n "Perform fork and bump relay fee:"
+btc2_fork 6
+restart_btc -minrelaytxfee=0.0002
+mine_btc
+echo " OK"
+
+echo -n "Check bump:"
+sleep 5
+next_btc
+sleep 1
+check_balance 5.80783389 4.19194030
+echo " OK"
+
+echo "----- Bump fee stress -----"
 
 echo -n "Replace btc_wire with stressed btc_wire"
 kill $WIRE_PID
@@ -71,23 +98,18 @@ for n in `$SEQ`; do
         -a BTC:0.00$n > /dev/null
 done
 sleep 5
-check_balance 5.80383389 4.15356220
 echo " OK"
 
-echo -n "Abandon pending transaction:"
-restart_btc -minrelaytxfee=0.0002
+echo -n "Bump relay fee:"
+restart_btc -minrelaytxfee=0.0003
 echo " OK"
 
 echo -n "Check bump:"
-sleep 2
-mine_btc
-sleep 2
-mine_btc
-sleep 2
-mine_btc
-sleep 2
-mine_btc
-check_balance 5.84583389 4.15314430
+for n in `seq 0 4`; do
+    sleep 2
+    mine_btc
+done
+check_balance 5.84983389 4.14868660
 echo " OK"
 
 echo "All tests passed!"
\ No newline at end of file
diff --git a/test/btc/reorg.sh b/test/btc/reorg.sh
index c047290..e058213 100644
--- a/test/btc/reorg.sh
+++ b/test/btc/reorg.sh
@@ -111,6 +111,7 @@ echo " OK"
 echo -n "Recover orphaned transactions:"
 next_btc 6 # More block needed to confirm
 check_balance "*" 0.00011000
+gateway_up
 echo " OK"
 
 echo "All tests passed!"
\ No newline at end of file
diff --git a/test/common.sh b/test/common.sh
index ccf2546..931d724 100644
--- a/test/common.sh
+++ b/test/common.sh
@@ -47,9 +47,11 @@ function load_config() {
     if [ "$CURRENCY" == "BTC" ]; then
         WIRE_CLI="btc-wire-cli"
         WIRE_UTILS="btc-wire-utils -d $WIRE_DIR"
+        WIRE_UTILS2="btc-wire-utils -d $WIRE_DIR2"
     else
         WIRE_CLI="eth-wire-cli"
         WIRE_UTILS="eth-wire-utils -d $WIRE_DIR"
+        WIRE_UTILS2="eth-wire-utils -d $WIRE_DIR2"
     fi
 }
 
@@ -92,7 +94,7 @@ function reset_db() {
 # Start a bitcoind regtest node, generate money, wallet and addresses
 function init_btc() {
     cp ${BASH_SOURCE%/*}/conf/${BTC_CONFIG:-bitcoin.conf} 
$WIRE_DIR/bitcoin.conf
-    bitcoind -datadir=$WIRE_DIR $* &>> log/btc.log &
+    bitcoind -datadir=$WIRE_DIR $* &>> log/node.log &
     BTC_PID="$!"
     # Wait for RPC server to be online
     $BTC_CLI -rpcwait getnetworkinfo > /dev/null
@@ -115,7 +117,7 @@ function init_btc() {
 # Start a second bitcoind regtest node connected to the first one
 function init_btc2() {
     cp ${BASH_SOURCE%/*}/conf/bitcoin2.conf $WIRE_DIR2/bitcoin.conf
-    bitcoind -datadir=$WIRE_DIR2 $* &>> log/btc2.log &
+    bitcoind -datadir=$WIRE_DIR2 $* &>> log/node2.log &
     $BTC_CLI2 -rpcwait getnetworkinfo > /dev/null    
     $BTC_CLI addnode 127.0.0.1:8346 onetry
 }
@@ -135,7 +137,7 @@ function btc2_fork() {
 # Restart a bitcoind regest server in a previously created temporary directory 
and load wallets
 function resume_btc() {
     # Restart node
-    bitcoind -datadir=$WIRE_DIR $* &>> log/btc.log &
+    bitcoind -datadir=$WIRE_DIR $* &>> log/node.log &
     BTC_PID="$!" 
     # Load wallets
     for wallet in wire client reserve; do
@@ -203,7 +205,7 @@ function btc_wire() {
 function stress_btc_wire() {
    cargo build --bin btc-wire --release --features fail &> log/cargo.log
    target/release/btc-wire $CONF &> log/wire.log & 
-   target/release/btc-wire $CONF &> log/wire.log & 
+   target/release/btc-wire $CONF &> log/wire1.log & 
 }
 
 # ----- Ethereum node ----- #
@@ -240,15 +242,40 @@ function init_eth() {
   }
 }" > $DIR/genesis.json
     # Initialize blockchain
-    $ETH_CLI init $DIR/genesis.json &> log/eth.log
+    $ETH_CLI init $DIR/genesis.json &> log/node.log
     # Start node
-    $ETH_CLI --miner.gasprice 0 $* &> log/eth.log &
+    $ETH_CLI --miner.recommit 0s --miner.gasprice 0 $* &> log/node.log &
     sleep 1
     # Create wire address
     WIRE=`eth-wire-cli initwallet $CONF | grep -oP '(?<=is ).*'`
     echo -e "PAYTO = payto://ethereum/$WIRE" >> $CONF
 }
 
+# Start a seconf geth dev node connected to the first one
+function init_eth2() {
+    # Initialize blockchain
+    $ETH_CLI2 init $DIR/genesis.json &> log/node2.log
+    # Start node
+    $ETH_CLI2 --port 30305 --miner.recommit 0s --miner.gasprice 0 $* &> 
log/node2.log &
+    sleep 1
+    # Create etherbase account for mining
+    $ETH_CLI2 account new --password <(echo "password") &> /dev/null
+    # Connect nodes
+    $WIRE_UTILS connect $WIRE_DIR2
+}
+
+# Disconnect the two nodes
+function eth2_deco() {
+    $WIRE_UTILS disconnect $WIRE_DIR2
+}
+
+# Create a fork on the second node and reconnect the two node
+function eth2_fork() {
+    $WIRE_UTILS2 mine $RESERVE ${1:-}
+    $WIRE_UTILS connect $WIRE_DIR2
+    sleep 5
+}
+
 # Check client and wire balance
 function check_balance_eth() {
     local CLIENT_BALANCE=`$WIRE_UTILS balance $CLIENT`
diff --git a/test/conf/taler_btc_bump.conf b/test/conf/taler_btc_bump.conf
index 859c692..f8a99c5 100644
--- a/test/conf/taler_btc_bump.conf
+++ b/test/conf/taler_btc_bump.conf
@@ -9,5 +9,4 @@ DB_URL        = 
postgres://localhost:5454/postgres?user=postgres&password=passwo
 PORT          = 8060
 PAYTO         = payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj
 CONFIRMATION  = 3
-BOUNCE_FEE    = 1000
-BUMP_DELAY    = 10
\ No newline at end of file
+BUMP_DELAY    = 5
\ No newline at end of file
diff --git a/test/eth/reorg.sh b/test/eth/reorg.sh
new file mode 100644
index 0000000..890033d
--- /dev/null
+++ b/test/eth/reorg.sh
@@ -0,0 +1,108 @@
+#!/bin/bash
+
+## Test eth-wire correctness when a blockchain reorganisation occurs
+
+set -eu
+
+source "${BASH_SOURCE%/*}/../common.sh"
+SCHEMA=eth.sql
+CONFIG=taler_eth.conf
+
+echo  "----- Setup -----"
+echo "Load config file"
+load_config
+echo "Start database"
+setup_db
+echo "Start ethereum node"
+init_eth
+echo "Start second ethereum node"
+init_eth2
+echo "Start eth-wire"
+eth_wire
+echo "Start gateway"
+gateway
+echo ""
+
+SEQ="seq 10 20"
+
+echo  "----- Handle reorg incoming transactions -----"
+
+echo "Loose second ethereum node"
+eth2_deco
+
+echo -n "Making wire transfer to exchange:"
+eth-wire-utils -d $WIRE_DIR deposit $CLIENT $WIRE 0.000 `$SEQ`
+next_eth # Trigger eth-wire
+check_delta "incoming?delta=-100" "$SEQ" "0.000"
+check_balance_eth 999835000 165000
+echo " OK"
+
+echo -n "Perform fork and check eth-wire hard error:"
+gateway_up
+eth2_fork 10
+check_balance_eth 1000000000 0
+gateway_down
+echo " OK"
+
+echo -n "Recover orphaned transactions:"
+next_eth 6 # More block needed to confirm
+check_balance_eth 999835000 165000
+gateway_up
+echo " OK"
+
+echo "----- Handle reorg outgoing transactions -----"
+
+echo "Loose second ethereum node"
+eth2_deco
+
+echo -n "Making wire transfer from exchange:"
+for n in `$SEQ`; do
+    taler-exchange-wire-gateway-client \
+        -b $BANK_ENDPOINT \
+        -C payto://ethereum/$CLIENT \
+        -a ETH:0.0000$n > /dev/null
+done
+sleep 1
+mine_eth # Mine transactions
+check_delta "outgoing?delta=-100" "$SEQ"
+check_balance_eth 999851500 148500
+echo " OK"
+
+echo -n "Perform fork and check eth-wire still up:"
+gateway_up
+eth2_fork 10
+check_balance_eth 999835000 165000
+gateway_up
+echo " OK"
+
+echo -n "Recover orphaned transactions:"
+next_eth 6 # More block needed to confirm
+check_balance_eth 999851500 148500
+echo " OK"
+
+echo "----- Handle reorg bounce -----"
+
+echo "Loose second ethereum node"
+eth2_deco
+
+echo -n "Bounce:"
+eth-wire-utils -d $WIRE_DIR send $CLIENT $WIRE 0.000 `$SEQ`
+sleep 1
+next_eth 6
+check_balance_eth 999840500 159500
+echo " OK"
+
+echo -n "Perform fork and check eth-wire hard error:"
+gateway_up
+eth2_fork 10
+check_balance_eth 999851500 148500
+gateway_down
+echo " OK"
+
+echo -n "Recover orphaned transactions:"
+next_eth 6 # More block needed to confirm
+check_balance_eth 999840500 159500
+gateway_up
+echo " OK"
+
+echo "All tests passed!"
\ No newline at end of file
diff --git a/test/eth/wire.sh b/test/eth/wire.sh
index 4dedf46..ec01a86 100644
--- a/test/eth/wire.sh
+++ b/test/eth/wire.sh
@@ -22,7 +22,6 @@ gateway
 echo ""
 
 SEQ="seq 10 99"
-RUST_BACKTRACE=1
 
 echo  "----- Receive -----"
 
@@ -60,7 +59,7 @@ echo -n "Bounce:"
 eth-wire-utils -d $WIRE_DIR send $CLIENT $WIRE 0.000 `seq 10 40`
 sleep 1
 next_eth
-check_balance_eth 995585499 4414500
+check_balance_eth 995554500 4445500
 echo " OK"
 
 echo "All tests passed!"
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]