gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-depolymerization] branch master updated (284b838 -> 635e0e1)


From: gnunet
Subject: [taler-depolymerization] branch master updated (284b838 -> 635e0e1)
Date: Thu, 01 Feb 2024 18:18:59 +0100

This is an automated email from the git hooks/post-receive script.

antoine pushed a change to branch master
in repository depolymerization.

    from 284b838  Test support for geth 1.13.11 and update dependencies
     new bc800d5  Test support for bitcoind 26.0 and update dependencies
     new c97f15a  Revert node version bump, more testing is required
     new 635e0e1  Update taler wire api to the newest specification

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 Cargo.lock                   |    8 +-
 README.md                    |    2 +-
 common/src/api_wire.rs       |  199 ++++----
 instrumentation/Cargo.toml   |    2 +-
 instrumentation/src/main.rs  |   12 +-
 instrumentation/src/utils.rs |   50 +-
 script/prepare.sh            |    6 +-
 wire-gateway/Cargo.toml      |    2 +-
 wire-gateway/src/json.rs     |    2 +-
 wire-gateway/src/main.rs     | 1084 +++++++++++++++++++++---------------------
 10 files changed, 704 insertions(+), 663 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 95ecf9e..939d2e7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -614,9 +614,9 @@ dependencies = [
 
 [[package]]
 name = "deadpool-postgres"
-version = "0.11.0"
+version = "0.12.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "40b75ba49590d27f677d3bebaf76cd15889ca8b308bc7ba99bfa25f1d7269c13"
+checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa"
 dependencies = [
  "deadpool",
  "tokio",
@@ -1357,9 +1357,9 @@ dependencies = [
 
 [[package]]
 name = "owo-colors"
-version = "3.5.0"
+version = "4.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f"
+checksum = "caff54706df99d2a78a5a4e3455ff45448d81ef1bb63c22cd14052ca0e993a3f"
 
 [[package]]
 name = "parking_lot"
diff --git a/README.md b/README.md
index 823328f..8235867 100644
--- a/README.md
+++ b/README.md
@@ -42,7 +42,7 @@ Depolymerizer require:
 
 #### Ethereum
 
-[Geth](https://geth.ethereum.org/) version 1.13.11 is expected
+[Geth](https://geth.ethereum.org/) version 1.13.5 is expected
 
 ### Initialization
 
diff --git a/common/src/api_wire.rs b/common/src/api_wire.rs
index 8476069..7f7b477 100644
--- a/common/src/api_wire.rs
+++ b/common/src/api_wire.rs
@@ -1,98 +1,101 @@
-/*
-  This file is part of TALER
-  Copyright (C) 2022 Taler Systems SA
-
-  TALER is free software; you can redistribute it and/or modify it under the
-  terms of the GNU Affero General Public License as published by the Free 
Software
-  Foundation; either version 3, or (at your option) any later version.
-
-  TALER is distributed in the hope that it will be useful, but WITHOUT ANY
-  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-  A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more 
details.
-
-  You should have received a copy of the GNU Affero General Public License 
along with
-  TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
-*/
-use url::Url;
-
-use crate::api_common::{Amount, EddsaPublicKey, HashCode, SafeU64, 
ShortHashCode, Timestamp};
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-TransferResponse>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct TransferResponse {
-    pub timestamp: Timestamp,
-    pub row_id: SafeU64,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-TransferRequest>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
-pub struct TransferRequest {
-    pub request_uid: HashCode,
-    pub amount: Amount,
-    pub exchange_base_url: Url,
-    pub wtid: ShortHashCode,
-    pub credit_account: Url,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-OutgoingHistory>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct OutgoingHistory {
-    pub outgoing_transactions: Vec<OutgoingBankTransaction>,
-}
-
-/// 
<https://docs.taler.net/core/api-wire.html#tsref-type-OutgoingBankTransaction>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct OutgoingBankTransaction {
-    pub row_id: SafeU64,
-    pub date: Timestamp,
-    pub amount: Amount,
-    pub credit_account: Url,
-    pub debit_account: Url,
-    pub wtid: ShortHashCode,
-    pub exchange_base_url: Url,
-}
-
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct IncomingHistory {
-    pub incoming_transactions: Vec<IncomingBankTransaction>,
-}
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-#[serde(tag = "type")]
-pub enum IncomingBankTransaction {
-    #[serde(rename = "RESERVE")]
-    IncomingReserveTransaction {
-        row_id: SafeU64,
-        date: Timestamp,
-        amount: Amount,
-        credit_account: Url,
-        debit_account: Url,
-        reserve_pub: EddsaPublicKey,
-    },
-    #[serde(rename = "WAD")]
-    IncomingWadTransaction {
-        // TODO not yet supported
-    },
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-AddIncomingRequest>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct AddIncomingRequest {
-    pub amount: Amount,
-    pub reserve_pub: EddsaPublicKey,
-    pub debit_account: Url,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-AddIncomingResponse>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct AddIncomingResponse {
-    pub row_id: SafeU64,
-    pub timestamp: Timestamp,
-}
-
-/// 
<https://docs.taler.net/core/api-wire.html#querying-the-transaction-history>
-#[derive(Debug, Clone, serde::Deserialize)]
-pub struct HistoryParams {
-    pub start: Option<u64>,
-    pub delta: i64,
-    pub long_pool_ms: Option<u64>,
-}
+/*
+  This file is part of TALER
+  Copyright (C) 2022 Taler Systems SA
+
+  TALER is free software; you can redistribute it and/or modify it under the
+  terms of the GNU Affero General Public License as published by the Free 
Software
+  Foundation; either version 3, or (at your option) any later version.
+
+  TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+  A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more 
details.
+
+  You should have received a copy of the GNU Affero General Public License 
along with
+  TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
+*/
+
+/// Type for the Taler Wire Gateway HTTP API 
<https://docs.taler.net/core/api-bank-wire.html#taler-wire-gateway-http-api>
+use url::Url;
+
+use crate::api_common::{Amount, EddsaPublicKey, HashCode, SafeU64, 
ShortHashCode, Timestamp};
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct WireConfig {
+    pub name: &'static str,
+    pub version: &'static str,
+    pub currency: &'static str,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct TransferResponse {
+    pub timestamp: Timestamp,
+    pub row_id: SafeU64,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
+pub struct TransferRequest {
+    pub request_uid: HashCode,
+    pub amount: Amount,
+    pub exchange_base_url: Url,
+    pub wtid: ShortHashCode,
+    pub credit_account: Url,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct OutgoingHistory {
+    pub outgoing_transactions: Vec<OutgoingBankTransaction>,
+    pub debit_account: Url,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct OutgoingBankTransaction {
+    pub row_id: SafeU64,
+    pub date: Timestamp,
+    pub amount: Amount,
+    pub credit_account: Url,
+    pub wtid: ShortHashCode,
+    pub exchange_base_url: Url,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct IncomingHistory {
+    pub credit_account: Url,
+    pub incoming_transactions: Vec<IncomingBankTransaction>,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+#[serde(tag = "type")]
+pub enum IncomingBankTransaction {
+    #[serde(rename = "RESERVE")]
+    IncomingReserveTransaction {
+        row_id: SafeU64,
+        date: Timestamp,
+        amount: Amount,
+        debit_account: Url,
+        reserve_pub: EddsaPublicKey,
+    },
+    #[serde(rename = "WAD")]
+    IncomingWadTransaction {
+        // TODO not yet supported
+    },
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct AddIncomingRequest {
+    pub amount: Amount,
+    pub reserve_pub: EddsaPublicKey,
+    pub debit_account: Url,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct AddIncomingResponse {
+    pub row_id: SafeU64,
+    pub timestamp: Timestamp,
+}
+
+#[derive(Debug, Clone, serde::Deserialize)]
+pub struct HistoryParams {
+    pub start: Option<u64>,
+    pub delta: i64,
+    pub long_pool_ms: Option<u64>,
+}
diff --git a/instrumentation/Cargo.toml b/instrumentation/Cargo.toml
index 8ec2fe0..b06f292 100644
--- a/instrumentation/Cargo.toml
+++ b/instrumentation/Cargo.toml
@@ -26,7 +26,7 @@ tempfile = "3.3.0"
 # RNG
 fastrand = "2.0.1"
 # terminal color
-owo-colors = "3.5.0"
+owo-colors = "4.0.0"
 # Better backtrace
 color-backtrace = "0.6.0"
 # Send signal to child processes
diff --git a/instrumentation/src/main.rs b/instrumentation/src/main.rs
index c31fb71..fd3e46f 100644
--- a/instrumentation/src/main.rs
+++ b/instrumentation/src/main.rs
@@ -138,7 +138,7 @@ pub fn main() {
                                 pb.finish();
                             }
                             let out: String = out.lock().unwrap().clone();
-                            (result, start.elapsed(), out)
+                            (result, start.elapsed(), out, pb.message())
                         });
                         (join, name)
                     })
@@ -151,7 +151,12 @@ pub fn main() {
 
             let len = results.len();
             m.clear().unwrap();
-            for ((result, time, out), name) in results {
+            for ((result, _, out, msg), name) in &results {
+                if  result.is_err() {
+                    println!("{} {}\n{}", name.magenta(), msg.red(), 
out.bright_black());
+                }
+            }
+            for ((result, time, _, msg), name) in results {
                 match result {
                     Ok(_) => {
                         println!(
@@ -165,10 +170,9 @@ pub fn main() {
                         println!(
                             "{} {} {}",
                             name.magenta(),
-                            "ERR".red(),
+                            msg.red(),
                             format_args!("{}s", time.as_secs()).bright_black()
                         );
-                        println!("{}", out.bright_black());
                     }
                 }
             }
diff --git a/instrumentation/src/utils.rs b/instrumentation/src/utils.rs
index 5509754..baf3686 100644
--- a/instrumentation/src/utils.rs
+++ b/instrumentation/src/utils.rs
@@ -44,16 +44,21 @@ pub fn print_now(disp: impl Display) {
 
 #[must_use]
 pub fn check_incoming(base_url: &str, txs: &[([u8; 32], Amount)]) -> bool {
-    let history: IncomingHistory = ureq::get(&format!("{}history/incoming", 
base_url))
+    let res = ureq::get(&format!("{}history/incoming", base_url))
         .query("delta", &format!("-{}", txs.len()))
         .call()
-        .unwrap()
-        .into_json()
         .unwrap();
+    if txs.is_empty() {
+        res.status() == 204
+    } else {
+        if res.status() != 200 {
+            return false;
+        }
+        let history: IncomingHistory = res.into_json().unwrap();
 
-    history.incoming_transactions.len() == txs.len()
-        && txs.iter().all(|(reserve_pub_key, taler_amount)| {
-            history.incoming_transactions.iter().any(|h| {
+        history.incoming_transactions.len() == txs.len()
+            && txs.iter().all(|(reserve_pub_key, taler_amount)| {
+                history.incoming_transactions.iter().any(|h| {
                 matches!(
                     h,
                     IncomingBankTransaction::IncomingReserveTransaction {
@@ -63,7 +68,8 @@ pub fn check_incoming(base_url: &str, txs: &[([u8; 32], 
Amount)]) -> bool {
                     } if reserve_pub == &Base32::from(*reserve_pub_key) && 
amount == taler_amount
                 )
             })
-        })
+            })
+    }
 }
 
 pub fn gateway_error(path: &str, error: u16) {
@@ -96,10 +102,7 @@ pub fn check_gateway_down(base_url: &str) -> bool {
 
 #[must_use]
 pub fn check_gateway_up(base_url: &str) -> bool {
-    ureq::get(&format!("{}history/incoming", base_url))
-        .query("delta", "-5")
-        .call()
-        .is_ok()
+    ureq::get(&format!("{}config", base_url)).call().is_ok()
 }
 
 pub fn transfer(base_url: &str, wtid: &[u8; 32], url: &Url, credit_account: 
Url, amount: &Amount) {
@@ -116,18 +119,27 @@ pub fn transfer(base_url: &str, wtid: &[u8; 32], url: 
&Url, credit_account: Url,
 
 #[must_use]
 pub fn check_outgoing(base_url: &str, url: &Url, txs: &[([u8; 32], Amount)]) 
-> bool {
-    let history: OutgoingHistory = ureq::get(&format!("{}history/outgoing", 
base_url))
+    let res = ureq::get(&format!("{}history/outgoing", base_url))
         .query("delta", &format!("-{}", txs.len()))
         .call()
-        .unwrap()
-        .into_json()
         .unwrap();
-    history.outgoing_transactions.len() == txs.len()
-        && txs.iter().all(|(wtid, amount)| {
-            history.outgoing_transactions.iter().any(|h| {
-                h.wtid == Base32::from(*wtid) && &h.exchange_base_url == url 
&& &h.amount == amount
+    if txs.is_empty() {
+        res.status() == 204
+    } else {
+        if res.status() != 200 {
+            return false;
+        }
+        let history: OutgoingHistory = res.into_json().unwrap();
+
+        history.outgoing_transactions.len() == txs.len()
+            && txs.iter().all(|(wtid, amount)| {
+                history.outgoing_transactions.iter().any(|h| {
+                    h.wtid == Base32::from(*wtid)
+                        && &h.exchange_base_url == url
+                        && &h.amount == amount
+                })
             })
-        })
+    }
 }
 pub struct ChildGuard(pub Child);
 
diff --git a/script/prepare.sh b/script/prepare.sh
index 184d74a..531ec09 100755
--- a/script/prepare.sh
+++ b/script/prepare.sh
@@ -28,13 +28,13 @@ rm -rfv ~/bitcoin
 mkdir -pv ~/bitcoin
 mv -v bitcoin-25.1/* ~/bitcoin
 
-echo "Ⅲ - Install Go Ethereum (Geth) v1.13.11"
+echo "Ⅲ - Install Go Ethereum (Geth) v1.13.5"
 cd $DIR
-curl -L 
https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.13.11-8f7eb9cc.tar.gz
 -o geth.tar.gz
+curl -L 
https://gethstore.blob.core.windows.net/builds/geth-linux-amd64-1.13.5-916d6a44.tar.gz
 -o geth.tar.gz
 tar xvzf geth.tar.gz
 rm -rfv ~/geth
 mkdir -pv ~/geth
-mv -v geth-linux-amd64-1.13.11-8f7eb9cc/* ~/geth
+mv -v geth-linux-amd64-1.13.5-916d6a44/* ~/geth
 
 echo "Ⅴ - PATH"
 
diff --git a/wire-gateway/Cargo.toml b/wire-gateway/Cargo.toml
index 4f44c02..f73af0f 100644
--- a/wire-gateway/Cargo.toml
+++ b/wire-gateway/Cargo.toml
@@ -26,7 +26,7 @@ thiserror = "1.0.38"
 miniz_oxide = "0.7.1"
 # Async postgres client
 tokio-postgres = { version = "0.7.7" }
-deadpool-postgres = "0.11.0"
+deadpool-postgres = "0.12.1"
 # Socket activation
 listenfd = "1.0.0"
 # Common lib
diff --git a/wire-gateway/src/json.rs b/wire-gateway/src/json.rs
index da0c3c3..96a55b8 100644
--- a/wire-gateway/src/json.rs
+++ b/wire-gateway/src/json.rs
@@ -73,7 +73,7 @@ pub enum EncodeBodyError {
     Json(#[from] serde_json::Error),
 }
 
-pub async fn encode_body<J: serde::Serialize>(
+pub fn encode_body<J: serde::Serialize>(
     parts: &Parts,
     status: StatusCode,
     json: &J,
diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs
index 7024d9b..98c7589 100644
--- a/wire-gateway/src/main.rs
+++ b/wire-gateway/src/main.rs
@@ -1,531 +1,553 @@
-/*
-  This file is part of TALER
-  Copyright (C) 2022 Taler Systems SA
-
-  TALER is free software; you can redistribute it and/or modify it under the
-  terms of the GNU Affero General Public License as published by the Free 
Software
-  Foundation; either version 3, or (at your option) any later version.
-
-  TALER is distributed in the hope that it will be useful, but WITHOUT ANY
-  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-  A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more 
details.
-
-  You should have received a copy of the GNU Affero General Public License 
along with
-  TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
-*/
-use clap::Parser;
-use common::{
-    api_common::{ShortHashCode, Timestamp},
-    api_wire::{
-        HistoryParams, IncomingBankTransaction, IncomingHistory, 
OutgoingBankTransaction,
-        OutgoingHistory, TransferRequest, TransferResponse,
-    },
-    config::{AuthMethod, TalerConfig},
-    currency::Currency,
-    error_codes::ErrorCode,
-    log::{
-        fail,
-        log::{error, info, log, Level},
-    },
-    postgres::{self, fallible_iterator::FallibleIterator},
-    sql::{sql_amount, sql_array, sql_safe_u64, sql_url},
-    url::Url,
-};
-use deadpool_postgres::{Pool, Runtime};
-use error::{CatchResult, ServerError};
-use hyper::{
-    http::request::Parts,
-    service::{make_service_fn, service_fn},
-    Body, Method, Response, Server, StatusCode,
-};
-use json::{encode_body, parse_body};
-use listenfd::ListenFd;
-use std::{
-    convert::Infallible,
-    path::PathBuf,
-    str::FromStr,
-    sync::atomic::{AtomicBool, AtomicU32, Ordering},
-    time::{Duration, Instant},
-};
-use tokio::sync::Notify;
-use tokio_postgres::{config::Host, NoTls};
-
-mod error;
-mod json;
-
-struct ServerState {
-    pool: Pool,
-    db_config: postgres::Config,
-    payto: Url,
-    currency: Currency,
-    notify: Notify,
-    lifetime: Option<AtomicU32>,
-    status: AtomicBool,
-    auth: AuthMethod,
-}
-
-impl ServerState {
-    /// Decrease lifetime, triggering graceful shutdown when reach lifetime end
-    pub fn step(&self) {
-        if let Some(lifetime) = &self.lifetime {
-            let mut current = lifetime.load(Ordering::Relaxed);
-            loop {
-                if current == 0 {
-                    self.notify.notify_one();
-                }
-                match lifetime.compare_exchange_weak(
-                    current,
-                    current - 1,
-                    Ordering::SeqCst,
-                    Ordering::Relaxed,
-                ) {
-                    Ok(_) => break,
-                    Err(new) => current = new,
-                }
-            }
-        }
-    }
-
-    pub async fn shutdown_signal(&self) {
-        self.notify.notified().await;
-        info!("Reach end of lifetime");
-    }
-}
-
-/// Taler wire gateway server for depolymerizer
-#[derive(clap::Parser, Debug)]
-struct Args {
-    /// Override default configuration file path
-    #[clap(global = true, short, long)]
-    config: Option<PathBuf>,
-}
-
-#[tokio::main]
-async fn main() {
-    common::log::init();
-    let args = Args::parse();
-    let taler_config = TalerConfig::load(args.config.as_deref());
-
-    #[cfg(feature = "test")]
-    common::log::log::warn!("Running with test admin endpoint unsuitable for 
production");
-
-    // Parse postgres url
-    let db_config = taler_config.db_config();
-    // TODO find a way to clean this ugly mess
-    let mut cfg = deadpool_postgres::Config::new();
-    cfg.user = db_config.get_user().map(|it| it.to_string());
-    cfg.password = db_config
-        .get_password()
-        .map(|it| String::from_utf8(it.to_vec()).unwrap());
-    cfg.dbname = db_config.get_dbname().map(|it| it.to_string());
-    cfg.options = db_config.get_options().map(|it| it.to_string());
-    cfg.host = Some(
-        db_config
-            .get_hosts()
-            .iter()
-            .map(|it| match it {
-                Host::Tcp(it) => it.to_string(),
-                #[cfg(unix)]
-                Host::Unix(it) => it.to_str().unwrap().to_string(),
-            })
-            .collect(),
-    );
-    cfg.ports = Some(db_config.get_ports().to_vec());
-    cfg.application_name = db_config.get_application_name().map(|it| 
it.to_string());
-    cfg.connect_timeout = db_config.get_connect_timeout().cloned();
-
-    let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
-    let payto = taler_config.payto();
-    let state = ServerState {
-        pool,
-        notify: Notify::new(),
-        lifetime: taler_config.http_lifetime().map(AtomicU32::new),
-        status: AtomicBool::new(true),
-        db_config,
-        payto,
-        currency: taler_config.currency,
-        auth: taler_config.auth_method(),
-    };
-    let state: &'static ServerState = Box::leak(Box::new(state));
-    std::thread::spawn(move || status_watcher(state));
-    let service = service_fn(move |req| async move {
-        state.step();
-        let start = Instant::now();
-        let (parts, body) = req.into_parts();
-        let (response, msg) = match router(&parts, body, state).await {
-            Ok(resp) => (resp, String::new()),
-            Err(err) => err.response(),
-        };
-        let status = response.status().as_u16();
-        let level = if status >= 500 {
-            Level::Error
-        } else if status >= 400 {
-            Level::Warn
-        } else {
-            Level::Info
-        };
-        log!(
-            level,
-            "{} {} {}ms {} - {}",
-            parts.method,
-            status,
-            start.elapsed().as_millis(),
-            parts.uri.path(),
-            msg
-        );
-        Ok::<Response<Body>, Infallible>(response)
-    });
-    let make_service = make_service_fn(move |_| async move { Ok::<_, 
Infallible>(service) });
-    let make_service_unix = make_service_fn(move |_| async move { Ok::<_, 
Infallible>(service) });
-
-    let mut listenfd = ListenFd::from_env();
-
-    if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() {
-        info!(
-            "Server listening on activated socket {}",
-            listener.local_addr().unwrap()
-        );
-        let server = Server::from_tcp(listener)
-            .unwrap()
-            .serve(make_service)
-            .with_graceful_shutdown(state.shutdown_signal());
-        if let Err(e) = server.await {
-            error!("server: {}", e);
-        }
-    } else if let Some(path) = taler_config.unix_path() {
-        use hyperlocal::UnixServerExt;
-        info!("Server listening on unix domain socket {:?}", path);
-        if let Err(err) = std::fs::remove_file(&path) {
-            if err.kind() != std::io::ErrorKind::NotFound {
-                fail(format_args!("{}", err));
-            }
-        }
-        let server = Server::bind_unix(path)
-            .unwrap()
-            .serve(make_service_unix)
-            .with_graceful_shutdown(state.shutdown_signal());
-        if let Err(e) = server.await {
-            error!("server: {}", e);
-        }
-    } else {
-        let addr = ([0, 0, 0, 0], taler_config.port()).into();
-        info!("Server listening on http://{}";, &addr);
-        let server = Server::bind(&addr)
-            .serve(make_service)
-            .with_graceful_shutdown(state.shutdown_signal());
-        if let Err(e) = server.await {
-            error!("server: {}", e);
-        }
-    };
-    info!("wire-gateway stopped");
-}
-
-/// Check if an url if a valid payto url for the configured currency
-fn check_payto(url: &Url, currency: Currency) -> bool {
-    match currency {
-        Currency::ETH(_) => check_pay_to_eth(url),
-        Currency::BTC(_) => check_pay_to_btc(url),
-    }
-}
-
-/// Check if an url is a valid bitcoin payto url
-fn check_pay_to_btc(url: &Url) -> bool {
-    return url.domain() == Some("bitcoin")
-        && url.scheme() == "payto"
-        && url.username() == ""
-        && url.password().is_none()
-        && url.query().is_none()
-        && url.fragment().is_none()
-        && 
bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok();
-}
-
-/// Check if an url is a valid ethereum payto url
-fn check_pay_to_eth(url: &Url) -> bool {
-    return url.domain() == Some("ethereum")
-        && url.scheme() == "payto"
-        && url.username() == ""
-        && url.password().is_none()
-        && url.query().is_none()
-        && url.fragment().is_none()
-        && 
ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok();
-}
-
-/// Assert request method match expected
-fn assert_method(parts: &Parts, method: Method) -> Result<(), ServerError> {
-    if parts.method == method {
-        Ok(())
-    } else {
-        Err(ServerError::code(
-            StatusCode::METHOD_NOT_ALLOWED,
-            ErrorCode::GENERIC_METHOD_INVALID,
-        ))
-    }
-}
-
-/// Parse history params from request
-fn history_params(parts: &Parts) -> Result<HistoryParams, ServerError> {
-    let params: HistoryParams = 
serde_urlencoded::from_str(parts.uri.query().unwrap_or(""))
-        .catch_code(
-            StatusCode::BAD_REQUEST,
-            ErrorCode::GENERIC_PARAMETER_MALFORMED,
-        )?;
-    if params.delta == 0 {
-        return Err(ServerError::code(
-            StatusCode::BAD_REQUEST,
-            ErrorCode::GENERIC_PARAMETER_MALFORMED,
-        ));
-    }
-    Ok(params)
-}
-
-/// Generate sql query filter from history params
-fn sql_history_filter(params: &HistoryParams) -> String {
-    let asc = params.delta > 0;
-    let limit = params.delta.abs();
-    let order_sql = if asc { "ASC" } else { "DESC" };
-    let where_sql = if let Some(start) = params.start {
-        format!("WHERE id {} {}", if asc { '>' } else { '<' }, start)
-    } else {
-        String::new()
-    };
-    format!("{} ORDER BY id {} LIMIT {}", where_sql, order_sql, limit)
-}
-
-async fn router(
-    parts: &Parts,
-    body: Body,
-    state: &'static ServerState,
-) -> Result<Response<Body>, ServerError> {
-    // Check status error
-    if !state.status.load(Ordering::SeqCst) {
-        return Ok(Response::builder()
-            .status(StatusCode::BAD_GATEWAY)
-            .body(Body::empty())
-            .unwrap());
-    }
-
-    // Check auth method
-    match &state.auth {
-        AuthMethod::Basic(auth) => {
-            if !matches!(parts.headers
-                .get(hyper::header::AUTHORIZATION)
-                .and_then(|h| h.to_str().ok())
-                .and_then(|s| s.strip_prefix("Basic ")),
-                Some(token) if token == auth )
-            {
-                return Ok(Response::builder()
-                    .status(StatusCode::UNAUTHORIZED)
-                    .body(Body::empty())
-                    .unwrap());
-            }
-        }
-        AuthMethod::None => {}
-    }
-
-    let response = match parts.uri.path() {
-        "/transfer" => {
-            assert_method(parts, Method::POST)?;
-            let request: TransferRequest = parse_body(parts, 
body).await.catch_code(
-                StatusCode::BAD_REQUEST,
-                ErrorCode::GENERIC_PARAMETER_MALFORMED,
-            )?;
-            if !check_payto(&request.credit_account, state.currency) {
-                return Err(ServerError::code(
-                    StatusCode::BAD_REQUEST,
-                    ErrorCode::GENERIC_PAYTO_URI_MALFORMED,
-                ));
-            }
-            if Currency::from_str(&request.amount.currency) != 
Ok(state.currency) {
-                return Err(ServerError::code(
-                    StatusCode::BAD_REQUEST,
-                    ErrorCode::GENERIC_PARAMETER_MALFORMED,
-                ));
-            }
-            let mut db = state.pool.get().await.catch_code(
-                StatusCode::GATEWAY_TIMEOUT,
-                ErrorCode::GENERIC_DB_FETCH_FAILED,
-            )?;
-            // Handle idempotence, check previous transaction with the same 
request_uid
-            let row = db.query_opt("SELECT amount, exchange_url, wtid, 
credit_acc, id, _date FROM tx_out WHERE request_uid = $1", 
&[&request.request_uid.as_slice()])
-                .await?;
-            if let Some(row) = row {
-                let prev = TransferRequest {
-                    request_uid: request.request_uid.clone(),
-                    amount: sql_amount(&row, 0),
-                    exchange_base_url: sql_url(&row, 1),
-                    wtid: ShortHashCode::from(sql_array(&row, 2)),
-                    credit_account: sql_url(&row, 3),
-                };
-                if prev == request {
-                    // Idempotence
-                    return encode_body(
-                        parts,
-                        StatusCode::OK,
-                        &TransferResponse {
-                            timestamp: Timestamp::Time(row.get(5)),
-                            row_id: sql_safe_u64(&row, 4),
-                        },
-                    )
-                    .await
-                    .unexpected();
-                } else {
-                    return Err(ServerError::status(StatusCode::CONFLICT));
-                }
-            }
-
-            let timestamp = Timestamp::now();
-            let tx = db.transaction().await?;
-            let row = tx.query_one("INSERT INTO tx_out (amount, wtid, 
debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, $2, $3, $4, $5, 
$6) RETURNING id", &[
-                &request.amount.to_string(), &request.wtid.as_slice(), 
&state.payto.as_ref(),  &request.credit_account.as_ref(), 
&request.exchange_base_url.as_ref(), &request.request_uid.as_slice()
-            ]).await?;
-            tx.execute("NOTIFY new_tx", &[]).await?;
-            tx.commit().await?;
-            encode_body(
-                parts,
-                StatusCode::OK,
-                &TransferResponse {
-                    timestamp,
-                    row_id: sql_safe_u64(&row, 0),
-                },
-            )
-            .await
-            .unexpected()?
-        }
-        "/history/incoming" => {
-            assert_method(parts, Method::GET)?;
-            let params = history_params(parts)?;
-            let filter = sql_history_filter(&params);
-            let db = state.pool.get().await.catch_code(
-                StatusCode::GATEWAY_TIMEOUT,
-                ErrorCode::GENERIC_DB_FETCH_FAILED,
-            )?;
-            let transactions = db
-                .query(
-                    &format!("SELECT id, _date, amount, reserve_pub, 
debit_acc, credit_acc FROM tx_in {}", filter),
-                    &[],
-                )
-                .await.catch_code(
-                    StatusCode::BAD_GATEWAY,
-                    ErrorCode::GENERIC_DB_FETCH_FAILED,
-                )?
-                .into_iter()
-                .map(|row| IncomingBankTransaction::IncomingReserveTransaction 
{
-                    row_id: sql_safe_u64(&row, 0),
-                    date: Timestamp::Time(row.get(1)),
-                    amount: sql_amount(&row, 2),
-                    reserve_pub: ShortHashCode::from(sql_array(&row, 3)),
-                    debit_account: sql_url(&row, 4),
-                    credit_account: sql_url(&row, 5),
-                })
-                .collect();
-            encode_body(
-                parts,
-                StatusCode::OK,
-                &IncomingHistory {
-                    incoming_transactions: transactions,
-                },
-            )
-            .await
-            .unexpected()?
-        }
-        "/history/outgoing" => {
-            assert_method(parts, Method::GET)?;
-            let params = history_params(parts)?;
-            let filter = sql_history_filter(&params);
-
-            let db = state.pool.get().await.catch_code(
-                StatusCode::GATEWAY_TIMEOUT,
-                ErrorCode::GENERIC_DB_FETCH_FAILED,
-            )?;
-            let transactions = db
-                .query(
-                    &format!("SELECT id, _date, amount, wtid, debit_acc, 
credit_acc, exchange_url FROM tx_out {}",filter),
-                    &[],
-                )
-                .await.catch_code(
-                    StatusCode::BAD_GATEWAY,
-                    ErrorCode::GENERIC_DB_FETCH_FAILED,
-                )?
-                .into_iter()
-                .map(|row| OutgoingBankTransaction {
-                    row_id: sql_safe_u64(&row, 0),
-                    date: Timestamp::Time(row.get(1)),
-                    amount: sql_amount(&row, 2),
-                    wtid: ShortHashCode::from(sql_array(&row, 3)),
-                    debit_account: sql_url(&row, 4),
-                    credit_account: sql_url(&row, 5),
-                    exchange_base_url:sql_url(&row, 6),
-                })
-                .collect();
-            encode_body(
-                parts,
-                StatusCode::OK,
-                &OutgoingHistory {
-                    outgoing_transactions: transactions,
-                },
-            )
-            .await
-            .unexpected()?
-        }
-        #[cfg(feature = "test")]
-        "/admin/add-incoming" => {
-            // We do not check input as this is a test admin endpoint
-            assert_method(&parts, Method::POST).unwrap();
-            let request: common::api_wire::AddIncomingRequest =
-                parse_body(&parts, body).await.unwrap();
-            let timestamp = Timestamp::now();
-            let db = state.pool.get().await.catch_code(
-                StatusCode::GATEWAY_TIMEOUT,
-                ErrorCode::GENERIC_DB_FETCH_FAILED,
-            )?;
-            let row = db.query_one("INSERT INTO tx_in (_date, amount, 
reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING 
id", &[
-                &request.amount.to_string(), &request.reserve_pub.as_slice(), 
&request.debit_account.as_ref(), 
&"payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj"
-            ]).await.catch_code(
-                StatusCode::BAD_GATEWAY,
-                ErrorCode::GENERIC_DB_FETCH_FAILED,
-            )?;
-            encode_body(
-                parts,
-                StatusCode::OK,
-                &TransferResponse {
-                    timestamp,
-                    row_id: sql_safe_u64(&row, 0),
-                },
-            )
-            .await
-            .unexpected()?
-        }
-        _ => {
-            return Err(ServerError::code(
-                StatusCode::NOT_FOUND,
-                ErrorCode::GENERIC_ENDPOINT_UNKNOWN,
-            ))
-        }
-    };
-    Ok(response)
-}
-
-/// Listen to backend status change
-fn status_watcher(state: &'static ServerState) {
-    fn inner(state: &'static ServerState) -> Result<(), Box<dyn 
std::error::Error>> {
-        let mut db = state.db_config.connect(NoTls)?;
-        // Register as listener
-        db.batch_execute("LISTEN status")?;
-        loop {
-            // Sync state
-            let row = db.query_one("SELECT value FROM state WHERE name = 
'status'", &[])?;
-            let status: &[u8] = row.get(0);
-            assert!(status.len() == 1 && status[0] < 2);
-            state.status.store(status[0] == 1, Ordering::SeqCst);
-            // Wait for next notification
-            db.notifications().blocking_iter().next()?;
-        }
-    }
-
-    loop {
-        if let Err(err) = inner(state) {
-            error!("status-watcher: {}", err);
-            std::thread::sleep(Duration::from_secs(5));
-        }
-    }
-}
+/*
+  This file is part of TALER
+  Copyright (C) 2022 Taler Systems SA
+
+  TALER is free software; you can redistribute it and/or modify it under the
+  terms of the GNU Affero General Public License as published by the Free 
Software
+  Foundation; either version 3, or (at your option) any later version.
+
+  TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+  A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more 
details.
+
+  You should have received a copy of the GNU Affero General Public License 
along with
+  TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
+*/
+use clap::Parser;
+use common::{
+    api_common::{ShortHashCode, Timestamp},
+    api_wire::{
+        HistoryParams, IncomingBankTransaction, IncomingHistory, 
OutgoingBankTransaction,
+        OutgoingHistory, TransferRequest, TransferResponse, WireConfig,
+    },
+    config::{AuthMethod, TalerConfig},
+    currency::Currency,
+    error_codes::ErrorCode,
+    log::{
+        fail,
+        log::{error, info, log, Level},
+    },
+    postgres::{self, fallible_iterator::FallibleIterator},
+    sql::{sql_amount, sql_array, sql_safe_u64, sql_url},
+    url::Url,
+};
+use deadpool_postgres::{Pool, Runtime};
+use error::{CatchResult, ServerError};
+use hyper::{
+    http::request::Parts,
+    service::{make_service_fn, service_fn},
+    Body, Method, Response, Server, StatusCode,
+};
+use json::{encode_body, parse_body};
+use listenfd::ListenFd;
+use std::{
+    convert::Infallible,
+    path::PathBuf,
+    str::FromStr,
+    sync::atomic::{AtomicBool, AtomicU32, Ordering},
+    time::{Duration, Instant},
+};
+use tokio::sync::Notify;
+use tokio_postgres::{config::Host, NoTls};
+
+mod error;
+mod json;
+
+struct ServerState {
+    pool: Pool,
+    db_config: postgres::Config,
+    payto: Url,
+    currency: Currency,
+    notify: Notify,
+    lifetime: Option<AtomicU32>,
+    status: AtomicBool,
+    auth: AuthMethod,
+}
+
+impl ServerState {
+    /// Decrease lifetime, triggering graceful shutdown when reach lifetime end
+    pub fn step(&self) {
+        if let Some(lifetime) = &self.lifetime {
+            let mut current = lifetime.load(Ordering::Relaxed);
+            loop {
+                if current == 0 {
+                    self.notify.notify_one();
+                }
+                match lifetime.compare_exchange_weak(
+                    current,
+                    current - 1,
+                    Ordering::SeqCst,
+                    Ordering::Relaxed,
+                ) {
+                    Ok(_) => break,
+                    Err(new) => current = new,
+                }
+            }
+        }
+    }
+
+    pub async fn shutdown_signal(&self) {
+        self.notify.notified().await;
+        info!("Reach end of lifetime");
+    }
+}
+
+/// Taler wire gateway server for depolymerizer
+#[derive(clap::Parser, Debug)]
+struct Args {
+    /// Override default configuration file path
+    #[clap(global = true, short, long)]
+    config: Option<PathBuf>,
+}
+
+#[tokio::main]
+async fn main() {
+    common::log::init();
+    let args = Args::parse();
+    let taler_config = TalerConfig::load(args.config.as_deref());
+
+    #[cfg(feature = "test")]
+    common::log::log::warn!("Running with test admin endpoint unsuitable for 
production");
+
+    // Parse postgres url
+    let db_config = taler_config.db_config();
+    // TODO find a way to clean this ugly mess
+    let mut cfg = deadpool_postgres::Config::new();
+    cfg.user = db_config.get_user().map(|it| it.to_string());
+    cfg.password = db_config
+        .get_password()
+        .map(|it| String::from_utf8(it.to_vec()).unwrap());
+    cfg.dbname = db_config.get_dbname().map(|it| it.to_string());
+    cfg.options = db_config.get_options().map(|it| it.to_string());
+    cfg.host = Some(
+        db_config
+            .get_hosts()
+            .iter()
+            .map(|it| match it {
+                Host::Tcp(it) => it.to_string(),
+                #[cfg(unix)]
+                Host::Unix(it) => it.to_str().unwrap().to_string(),
+            })
+            .collect(),
+    );
+    cfg.ports = Some(db_config.get_ports().to_vec());
+    cfg.application_name = db_config.get_application_name().map(|it| 
it.to_string());
+    cfg.connect_timeout = db_config.get_connect_timeout().cloned();
+
+    let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
+    let payto = taler_config.payto();
+    let state = ServerState {
+        pool,
+        notify: Notify::new(),
+        lifetime: taler_config.http_lifetime().map(AtomicU32::new),
+        status: AtomicBool::new(true),
+        db_config,
+        payto,
+        currency: taler_config.currency,
+        auth: taler_config.auth_method(),
+    };
+    let state: &'static ServerState = Box::leak(Box::new(state));
+    std::thread::spawn(move || status_watcher(state));
+    let service = service_fn(move |req| async move {
+        state.step();
+        let start = Instant::now();
+        let (parts, body) = req.into_parts();
+        let (response, msg) = match router(&parts, body, state).await {
+            Ok(resp) => (resp, String::new()),
+            Err(err) => err.response(),
+        };
+        let status = response.status().as_u16();
+        let level = if status >= 500 {
+            Level::Error
+        } else if status >= 400 {
+            Level::Warn
+        } else {
+            Level::Info
+        };
+        log!(
+            level,
+            "{} {} {}ms {} - {}",
+            parts.method,
+            status,
+            start.elapsed().as_millis(),
+            parts.uri.path(),
+            msg
+        );
+        Ok::<Response<Body>, Infallible>(response)
+    });
+    let make_service = make_service_fn(move |_| async move { Ok::<_, 
Infallible>(service) });
+    let make_service_unix = make_service_fn(move |_| async move { Ok::<_, 
Infallible>(service) });
+
+    let mut listenfd = ListenFd::from_env();
+
+    if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() {
+        info!(
+            "Server listening on activated socket {}",
+            listener.local_addr().unwrap()
+        );
+        let server = Server::from_tcp(listener)
+            .unwrap()
+            .serve(make_service)
+            .with_graceful_shutdown(state.shutdown_signal());
+        if let Err(e) = server.await {
+            error!("server: {}", e);
+        }
+    } else if let Some(path) = taler_config.unix_path() {
+        use hyperlocal::UnixServerExt;
+        info!("Server listening on unix domain socket {:?}", path);
+        if let Err(err) = std::fs::remove_file(&path) {
+            if err.kind() != std::io::ErrorKind::NotFound {
+                fail(format_args!("{}", err));
+            }
+        }
+        let server = Server::bind_unix(path)
+            .unwrap()
+            .serve(make_service_unix)
+            .with_graceful_shutdown(state.shutdown_signal());
+        if let Err(e) = server.await {
+            error!("server: {}", e);
+        }
+    } else {
+        let addr = ([0, 0, 0, 0], taler_config.port()).into();
+        info!("Server listening on http://{}";, &addr);
+        let server = Server::bind(&addr)
+            .serve(make_service)
+            .with_graceful_shutdown(state.shutdown_signal());
+        if let Err(e) = server.await {
+            error!("server: {}", e);
+        }
+    };
+    info!("wire-gateway stopped");
+}
+
+/// Check if an url if a valid payto url for the configured currency
+fn check_payto(url: &Url, currency: Currency) -> bool {
+    match currency {
+        Currency::ETH(_) => check_pay_to_eth(url),
+        Currency::BTC(_) => check_pay_to_btc(url),
+    }
+}
+
+/// Check if an url is a valid bitcoin payto url
+fn check_pay_to_btc(url: &Url) -> bool {
+    return url.domain() == Some("bitcoin")
+        && url.scheme() == "payto"
+        && url.username() == ""
+        && url.password().is_none()
+        && url.query().is_none()
+        && url.fragment().is_none()
+        && 
bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok();
+}
+
+/// Check if an url is a valid ethereum payto url
+fn check_pay_to_eth(url: &Url) -> bool {
+    return url.domain() == Some("ethereum")
+        && url.scheme() == "payto"
+        && url.username() == ""
+        && url.password().is_none()
+        && url.query().is_none()
+        && url.fragment().is_none()
+        && 
ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok();
+}
+
+/// Assert request method match expected
+fn assert_method(parts: &Parts, method: Method) -> Result<(), ServerError> {
+    if parts.method == method {
+        Ok(())
+    } else {
+        Err(ServerError::code(
+            StatusCode::METHOD_NOT_ALLOWED,
+            ErrorCode::GENERIC_METHOD_INVALID,
+        ))
+    }
+}
+
+/// Parse history params from request
+fn history_params(parts: &Parts) -> Result<HistoryParams, ServerError> {
+    let params: HistoryParams = 
serde_urlencoded::from_str(parts.uri.query().unwrap_or(""))
+        .catch_code(
+            StatusCode::BAD_REQUEST,
+            ErrorCode::GENERIC_PARAMETER_MALFORMED,
+        )?;
+    if params.delta == 0 {
+        return Err(ServerError::code(
+            StatusCode::BAD_REQUEST,
+            ErrorCode::GENERIC_PARAMETER_MALFORMED,
+        ));
+    }
+    Ok(params)
+}
+
+/// Generate sql query filter from history params
+fn sql_history_filter(params: &HistoryParams) -> String {
+    let asc = params.delta > 0;
+    let limit = params.delta.abs();
+    let order_sql = if asc { "ASC" } else { "DESC" };
+    let where_sql = if let Some(start) = params.start {
+        format!("WHERE id {} {}", if asc { '>' } else { '<' }, start)
+    } else {
+        String::new()
+    };
+    format!("{} ORDER BY id {} LIMIT {}", where_sql, order_sql, limit)
+}
+
+async fn router(
+    parts: &Parts,
+    body: Body,
+    state: &'static ServerState,
+) -> Result<Response<Body>, ServerError> {
+    // Check status error
+    if !state.status.load(Ordering::SeqCst) {
+        return Ok(Response::builder()
+            .status(StatusCode::BAD_GATEWAY)
+            .body(Body::empty())
+            .unwrap());
+    }
+
+    // Check auth method
+    match &state.auth {
+        AuthMethod::Basic(auth) => {
+            if !matches!(parts.headers
+                .get(hyper::header::AUTHORIZATION)
+                .and_then(|h| h.to_str().ok())
+                .and_then(|s| s.strip_prefix("Basic ")),
+                Some(token) if token == auth )
+            {
+                return Ok(Response::builder()
+                    .status(StatusCode::UNAUTHORIZED)
+                    .body(Body::empty())
+                    .unwrap());
+            }
+        }
+        AuthMethod::None => {}
+    }
+
+    let response = match parts.uri.path() {
+        "/config" => {
+            assert_method(parts, Method::GET)?;
+            encode_body(
+                parts,
+                StatusCode::OK,
+                &WireConfig {
+                    name: "taler-wire-gateway",
+                    version: "0.0.0",
+                    currency: state.currency.to_str(),
+                },
+            )
+            .unexpected()?
+        }
+        "/transfer" => {
+            assert_method(parts, Method::POST)?;
+            let request: TransferRequest = parse_body(parts, 
body).await.catch_code(
+                StatusCode::BAD_REQUEST,
+                ErrorCode::GENERIC_PARAMETER_MALFORMED,
+            )?;
+            if !check_payto(&request.credit_account, state.currency) {
+                return Err(ServerError::code(
+                    StatusCode::BAD_REQUEST,
+                    ErrorCode::GENERIC_PAYTO_URI_MALFORMED,
+                ));
+            }
+            if Currency::from_str(&request.amount.currency) != 
Ok(state.currency) {
+                return Err(ServerError::code(
+                    StatusCode::BAD_REQUEST,
+                    ErrorCode::GENERIC_PARAMETER_MALFORMED,
+                ));
+            }
+            let mut db = state.pool.get().await.catch_code(
+                StatusCode::GATEWAY_TIMEOUT,
+                ErrorCode::GENERIC_DB_FETCH_FAILED,
+            )?;
+            // Handle idempotence, check previous transaction with the same 
request_uid
+            let row = db.query_opt("SELECT amount, exchange_url, wtid, 
credit_acc, id, _date FROM tx_out WHERE request_uid = $1", 
&[&request.request_uid.as_slice()])
+                .await?;
+            if let Some(row) = row {
+                let prev = TransferRequest {
+                    request_uid: request.request_uid.clone(),
+                    amount: sql_amount(&row, 0),
+                    exchange_base_url: sql_url(&row, 1),
+                    wtid: ShortHashCode::from(sql_array(&row, 2)),
+                    credit_account: sql_url(&row, 3),
+                };
+                if prev == request {
+                    // Idempotence
+                    return encode_body(
+                        parts,
+                        StatusCode::OK,
+                        &TransferResponse {
+                            timestamp: Timestamp::Time(row.get(5)),
+                            row_id: sql_safe_u64(&row, 4),
+                        },
+                    )
+                    .unexpected();
+                } else {
+                    return Err(ServerError::status(StatusCode::CONFLICT));
+                }
+            }
+
+            let timestamp = Timestamp::now();
+            let tx = db.transaction().await?;
+            let row = tx.query_one("INSERT INTO tx_out (amount, wtid, 
debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, $2, $3, $4, $5, 
$6) RETURNING id", &[
+                &request.amount.to_string(), &request.wtid.as_slice(), 
&state.payto.as_ref(),  &request.credit_account.as_ref(), 
&request.exchange_base_url.as_ref(), &request.request_uid.as_slice()
+            ]).await?;
+            tx.execute("NOTIFY new_tx", &[]).await?;
+            tx.commit().await?;
+            encode_body(
+                parts,
+                StatusCode::OK,
+                &TransferResponse {
+                    timestamp,
+                    row_id: sql_safe_u64(&row, 0),
+                },
+            )
+            .unexpected()?
+        }
+        "/history/incoming" => {
+            assert_method(parts, Method::GET)?;
+            let params = history_params(parts)?;
+            let filter = sql_history_filter(&params);
+            let db = state.pool.get().await.catch_code(
+                StatusCode::GATEWAY_TIMEOUT,
+                ErrorCode::GENERIC_DB_FETCH_FAILED,
+            )?;
+            let transactions: Vec<_> = db
+                .query(
+                    &format!("SELECT id, _date, amount, reserve_pub, debit_acc 
FROM tx_in {}", filter),
+                    &[],
+                )
+                .await.catch_code(
+                    StatusCode::BAD_GATEWAY,
+                    ErrorCode::GENERIC_DB_FETCH_FAILED,
+                )?
+                .into_iter()
+                .map(| row| { 
+                 IncomingBankTransaction::IncomingReserveTransaction {
+                    row_id: sql_safe_u64(&row, 0),
+                    date: Timestamp::Time(row.get(1)),
+                    amount: sql_amount(&row, 2),
+                    reserve_pub: ShortHashCode::from(sql_array(&row, 3)),
+                    debit_account: sql_url(&row, 4),
+                }
+            }).collect();
+            if transactions.is_empty() {
+                Response::builder()
+                .status(StatusCode::NO_CONTENT)
+                .body(Body::empty())
+                .unwrap()
+            } else {
+                encode_body(
+                    parts,
+                    StatusCode::OK,
+                    &IncomingHistory {
+                        credit_account: state.payto.clone(),
+                        incoming_transactions: transactions,
+                    },
+                )
+                .unexpected()?
+            }
+        }
+        "/history/outgoing" => {
+            assert_method(parts, Method::GET)?;
+            let params = history_params(parts)?;
+            let filter = sql_history_filter(&params);
+
+            let db = state.pool.get().await.catch_code(
+                StatusCode::GATEWAY_TIMEOUT,
+                ErrorCode::GENERIC_DB_FETCH_FAILED,
+            )?;
+            let transactions: Vec<_> =db
+                .query(
+                    &format!("SELECT id, _date, amount, wtid, credit_acc, 
exchange_url FROM tx_out {}", filter),
+                    &[],
+                )
+                .await.catch_code(
+                    StatusCode::BAD_GATEWAY,
+                    ErrorCode::GENERIC_DB_FETCH_FAILED,
+                )?
+                .into_iter()
+                .map(|row| OutgoingBankTransaction {
+                    row_id: sql_safe_u64(&row, 0),
+                    date: Timestamp::Time(row.get(1)),
+                    amount: sql_amount(&row, 2),
+                    wtid: ShortHashCode::from(sql_array(&row, 3)),
+                    credit_account: sql_url(&row, 4),
+                    exchange_base_url:sql_url(&row, 5),
+                }).collect();
+                if transactions.is_empty() {
+                    Response::builder()
+                    .status(StatusCode::NO_CONTENT)
+                    .body(Body::empty())
+                    .unwrap()
+                } else {
+                    encode_body(
+                        parts,
+                        StatusCode::OK,
+                        &OutgoingHistory {
+                            debit_account: state.payto.clone(),
+                            outgoing_transactions: transactions,
+                        },
+                    )
+                    .unexpected()?
+                }
+        }
+        #[cfg(feature = "test")]
+        "/admin/add-incoming" => {
+            // We do not check input as this is a test admin endpoint
+            assert_method(&parts, Method::POST).unwrap();
+            let request: common::api_wire::AddIncomingRequest =
+                parse_body(&parts, body).await.unwrap();
+            let timestamp = Timestamp::now();
+            let db = state.pool.get().await.catch_code(
+                StatusCode::GATEWAY_TIMEOUT,
+                ErrorCode::GENERIC_DB_FETCH_FAILED,
+            )?;
+            let row = db.query_one("INSERT INTO tx_in (_date, amount, 
reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING 
id", &[
+                &request.amount.to_string(), &request.reserve_pub.as_slice(), 
&request.debit_account.as_ref(), 
&"payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj"
+            ]).await.catch_code(
+                StatusCode::BAD_GATEWAY,
+                ErrorCode::GENERIC_DB_FETCH_FAILED,
+            )?;
+            encode_body(
+                parts,
+                StatusCode::OK,
+                &TransferResponse {
+                    timestamp,
+                    row_id: sql_safe_u64(&row, 0),
+                },
+            )
+            .unexpected()?
+        }
+        _ => {
+            return Err(ServerError::code(
+                StatusCode::NOT_FOUND,
+                ErrorCode::GENERIC_ENDPOINT_UNKNOWN,
+            ))
+        }
+    };
+    Ok(response)
+}
+
+/// Listen to backend status change
+fn status_watcher(state: &'static ServerState) {
+    fn inner(state: &'static ServerState) -> Result<(), Box<dyn 
std::error::Error>> {
+        let mut db = state.db_config.connect(NoTls)?;
+        // Register as listener
+        db.batch_execute("LISTEN status")?;
+        loop {
+            // Sync state
+            let row = db.query_one("SELECT value FROM state WHERE name = 
'status'", &[])?;
+            let status: &[u8] = row.get(0);
+            assert!(status.len() == 1 && status[0] < 2);
+            state.status.store(status[0] == 1, Ordering::SeqCst);
+            // Wait for next notification
+            db.notifications().blocking_iter().next()?;
+        }
+    }
+
+    loop {
+        if let Err(err) = inner(state) {
+            error!("status-watcher: {}", err);
+            std::thread::sleep(Duration::from_secs(5));
+        }
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]