gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [taler-exchange] branch master updated: modify wire auditor


From: gnunet
Subject: [GNUnet-SVN] [taler-exchange] branch master updated: modify wire auditor to deal with asynchrony of WIRE plugin API
Date: Sat, 30 Sep 2017 21:28:27 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new 85a2d3d  modify wire auditor to deal with asynchrony of WIRE plugin API
85a2d3d is described below

commit 85a2d3dc0e6938ca751bd72cf556173645e09e78
Author: Christian Grothoff <address@hidden>
AuthorDate: Sat Sep 30 21:28:17 2017 +0200

    modify wire auditor to deal with asynchrony of WIRE plugin API
---
 src/auditor/taler-wire-auditor.c | 363 ++++++++++++++++++++-------------------
 1 file changed, 191 insertions(+), 172 deletions(-)

diff --git a/src/auditor/taler-wire-auditor.c b/src/auditor/taler-wire-auditor.c
index 19a48bc..c1323ea 100644
--- a/src/auditor/taler-wire-auditor.c
+++ b/src/auditor/taler-wire-auditor.c
@@ -88,10 +88,67 @@ static struct TALER_MasterPublicKeyP master_pub;
 static struct TALER_WIRE_Plugin *wp;
 
 /**
+ * Active wire request for the transaction history.
+ */
+static struct TALER_WIRE_HistoryHandle *hh;
+
+/**
+ * Query status for the incremental processing status in the auditordb.
+ */
+static enum GNUNET_DB_QueryStatus qsx;
+
+/**
  * Last reserve_in / reserve_out serial IDs seen.
  */
 static struct TALER_AUDITORDB_WireProgressPoint pp;
 
+/**
+ * Where we are in the inbound (CREDIT) transaction history.
+ */
+static void *in_wire_off;
+
+/**
+ * Where we are in the inbound (DEBIT) transaction history.
+ */
+static void *out_wire_off;
+
+/**
+ * Number of bytes in #in_wire_off and #out_wire_off.
+ */
+static size_t wire_off_size;
+
+
+/* *****************************   Shutdown   **************************** */
+
+/**
+ * Task run on shutdown.
+ */
+static void
+do_shutdown ()
+{
+  if (NULL != hh)
+  {
+    wp->get_history_cancel (wp->cls,
+                            hh);
+    hh = NULL;
+  }
+  if (NULL != wp)
+  {
+    TALER_WIRE_plugin_unload (wp);
+    wp = NULL;
+  }
+  if (NULL != adb)
+  {
+    TALER_AUDITORDB_plugin_unload (adb);
+    adb = NULL;
+  }
+  if (NULL != edb)
+  {
+    TALER_EXCHANGEDB_plugin_unload (edb);
+    edb = NULL;
+  }
+}
+
 
 /* ***************************** Report logic **************************** */
 
@@ -140,107 +197,18 @@ report_row_minor_inconsistency (const char *table,
 #endif
 
 
-/* ***************************** Analyze reserves_in ************************ 
*/
-/* This logic checks the reserves_in table */
-
-/**
- * Analyze reserves for being well-formed.
- *
- * @param cls NULL
- * @return transaction status code
- */
-static enum GNUNET_DB_QueryStatus
-analyze_reserves_in (void *cls)
-{
-  /* FIXME: #4958 */
-  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-}
-
-
-/* ***************************** Analyze reserves_out ************************ 
*/
-/* This logic checks the reserves_out table */
-
-/**
- * Analyze reserves for being well-formed.
- *
- * @param cls NULL
- * @return transaction status code
- */
-static enum GNUNET_DB_QueryStatus
-analyze_reserves_out (void *cls)
-{
-#if 0
-  // FIXME: start_off != rowid!
-  hh = wp->get_history (wp->cls,
-                        TALER_BANK_DIRECTION_CREDIT,
-                        &start_off,
-                        sizeof (start_off),
-                        INT64_MAX,
-                        &history_cb,
-                        NULL);
-#endif
-  /* FIXME: #4958 */
-  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
-}
-
-
 /* *************************** General transaction logic ****************** */
 
 /**
- * Type of an analysis function.  Each analysis function runs in
- * its own transaction scope and must thus be internally consistent.
- *
- * @param cls closure
- * @return transaction status code
- */
-typedef enum GNUNET_DB_QueryStatus
-(*Analysis)(void *cls);
-
-
-/**
- * Perform the given @a analysis incrementally, checkpointing our
- * progress in the auditor DB.
+ * Commit the transaction, checkpointing our progress in the auditor
+ * DB.
  *
- * @param analysis analysis to run
- * @param analysis_cls closure for @a analysis
+ * @param qs transaction status so far
  * @return transaction status code
  */
 static enum GNUNET_DB_QueryStatus
-incremental_processing (Analysis analysis,
-                        void *analysis_cls)
+commit (enum GNUNET_DB_QueryStatus qs)
 {
-  enum GNUNET_DB_QueryStatus qs;
-  enum GNUNET_DB_QueryStatus qsx;
-  void *in_wire_off;
-  void *out_wire_off;
-  size_t wire_off_size;
-
-  qsx = adb->get_wire_auditor_progress (adb->cls,
-                                        asession,
-                                        &master_pub,
-                                        &pp,
-                                        &in_wire_off,
-                                        &out_wire_off,
-                                        &wire_off_size);
-  if (0 > qsx)
-  {
-    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
-    return qsx;
-  }
-  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
-                _("First analysis using this auditor, starting audit from 
scratch\n"));
-  }
-  else
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                _("Resuming audit at %llu/%llu\n"),
-                (unsigned long long) pp.last_reserve_in_serial_id,
-                (unsigned long long) pp.last_reserve_out_serial_id);
-  }
-  qs = analysis (analysis_cls);
-  // FIXME: wire plugin does NOT support synchronous activity!
   if (0 > qs)
   {
     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -248,7 +216,11 @@ incremental_processing (Analysis analysis,
                  "Serialization issue, not recording progress\n");
     else
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 "Hard database error, not recording progress\n");
+                 "Hard error, not recording progress\n");
+    adb->rollback (adb->cls,
+                   asession);
+    edb->rollback (edb->cls,
+                   esession);
     return qs;
   }
   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx)
@@ -279,43 +251,7 @@ incremental_processing (Analysis analysis,
               _("Concluded audit step at %llu/%llu\n"),
               (unsigned long long) pp.last_reserve_in_serial_id,
               (unsigned long long) pp.last_reserve_out_serial_id);
-  return qs;
-}
-
-
-/**
- * Perform the given @a analysis within a transaction scope.
- * Commit on success.
- *
- * @param analysis analysis to run
- * @param analysis_cls closure for @a analysis
- * @return #GNUNET_OK if @a analysis succeessfully committed,
- *         #GNUNET_NO if we had an error on commit (retry may help)
- *         #GNUNET_SYSERR on hard errors
- */
-static int
-transact (Analysis analysis,
-          void *analysis_cls)
-{
-  int ret;
-  enum GNUNET_DB_QueryStatus qs;
 
-  ret = adb->start (adb->cls,
-                    asession);
-  if (GNUNET_OK != ret)
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  ret = edb->start (edb->cls,
-                    esession);
-  if (GNUNET_OK != ret)
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
-  }
-  qs = incremental_processing (analysis,
-                                analysis_cls);
   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
   {
     qs = edb->commit (edb->cls,
@@ -353,46 +289,45 @@ transact (Analysis analysis,
 }
 
 
+/* ***************************** Analyze reserves_in ************************ 
*/
+
+
 /**
- * Initialize DB sessions and run the analysis.
+ * Callbacks of this type are used to serve the result of asking
+ * the bank for the transaction history.
+ *
+ * @param cls closure
+ * @param dir direction of the transfer
+ * @param row_off identification of the position at which we are querying
+ * @param row_off_size number of bytes in @a row_off
+ * @param details details about the wire transfer
+ * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
  */
-static void
-setup_sessions_and_run ()
+static int
+history_credit_cb (void *cls,
+                   enum TALER_BANK_Direction dir,
+                   const void *row_off,
+                   size_t row_off_size,
+                   const struct TALER_WIRE_TransferDetails *details)
 {
-  esession = edb->get_session (edb->cls);
-  if (NULL == esession)
+  if (NULL == details)
   {
-    fprintf (stderr,
-             "Failed to initialize exchange session.\n");
-    global_ret = 1;
-    return;
-  }
-  asession = adb->get_session (adb->cls);
-  if (NULL == asession)
-  {
-    fprintf (stderr,
-             "Failed to initialize auditor session.\n");
-    global_ret = 1;
-    return;
-  }
-  wp = TALER_WIRE_plugin_load (cfg,
-                               wire_plugin);
-  if (NULL == wp)
-  {
-    fprintf (stderr,
-             "Failed to load wire plugin `%s'\n",
-             wire_plugin);
-    global_ret = 1;
-    return;
+    /* end of operation */
+    hh = NULL;
+    /* TODO: also check DEBITs! */
+    commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT);
+    GNUNET_SCHEDULER_shutdown ();
+    return GNUNET_SYSERR;
   }
-  // FIXME: wire plugin does NOT support synchronous activity!
-  transact (&analyze_reserves_in,
-            NULL);
-  transact (&analyze_reserves_out,
-            NULL);
+  /* TODO: implement actual checks! */
+  return GNUNET_OK;
 }
 
 
+
+/* ***************************** Setup logic    ************************ */
+
+
 /**
  * Main function that will be run.
  *
@@ -407,6 +342,8 @@ run (void *cls,
      const char *cfgfile,
      const struct GNUNET_CONFIGURATION_Handle *c)
 {
+  int ret;
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Launching auditor\n");
   cfg = c;
@@ -458,18 +395,100 @@ run (void *cls,
     GNUNET_break (GNUNET_OK ==
                   adb->create_tables (adb->cls));
   }
-
+  GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
+                                 NULL);
+  esession = edb->get_session (edb->cls);
+  if (NULL == esession)
+  {
+    fprintf (stderr,
+             "Failed to initialize exchange session.\n");
+    global_ret = 1;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  asession = adb->get_session (adb->cls);
+  if (NULL == asession)
+  {
+    fprintf (stderr,
+             "Failed to initialize auditor session.\n");
+    global_ret = 1;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  wp = TALER_WIRE_plugin_load (cfg,
+                               wire_plugin);
+  if (NULL == wp)
+  {
+    fprintf (stderr,
+             "Failed to load wire plugin `%s'\n",
+             wire_plugin);
+    global_ret = 1;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Starting audit\n");
-  setup_sessions_and_run ();
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Audit complete\n");
-  if (NULL != wp)
-    TALER_WIRE_plugin_unload (wp);
-  if (NULL != adb)
-    TALER_AUDITORDB_plugin_unload (adb);
-  if (NULL != edb)
-    TALER_EXCHANGEDB_plugin_unload (edb);
+  ret = adb->start (adb->cls,
+                    asession);
+  if (GNUNET_OK != ret)
+  {
+    GNUNET_break (0);
+    global_ret = 1;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  ret = edb->start (edb->cls,
+                    esession);
+  if (GNUNET_OK != ret)
+  {
+    GNUNET_break (0);
+    global_ret = 1;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  qsx = adb->get_wire_auditor_progress (adb->cls,
+                                        asession,
+                                        &master_pub,
+                                        &pp,
+                                        &in_wire_off,
+                                        &out_wire_off,
+                                        &wire_off_size);
+  if (0 > qsx)
+  {
+    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
+    global_ret = 1;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+                _("First analysis using this auditor, starting audit from 
scratch\n"));
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                _("Resuming audit at %llu/%llu\n"),
+                (unsigned long long) pp.last_reserve_in_serial_id,
+                (unsigned long long) pp.last_reserve_out_serial_id);
+  }
+
+  hh = wp->get_history (wp->cls,
+                        TALER_BANK_DIRECTION_CREDIT,
+                        in_wire_off,
+                        wire_off_size,
+                        INT64_MAX,
+                        &history_credit_cb,
+                        NULL);
+  if (NULL == hh)
+  {
+    fprintf (stderr,
+             "Failed to obtain bank transaction history\n");
+    commit (GNUNET_DB_STATUS_HARD_ERROR);
+    global_ret = 1;
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]