gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [taler-exchange] 01/03: fix #5010 for taler-exchange-aggreg


From: gnunet
Subject: [GNUnet-SVN] [taler-exchange] 01/03: fix #5010 for taler-exchange-aggregator
Date: Sat, 24 Jun 2017 23:27:39 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

commit 2d662e3f8e62e750cf2dcf2030cc69e8ae176960
Author: Christian Grothoff <address@hidden>
AuthorDate: Sat Jun 24 12:15:11 2017 +0200

    fix #5010 for taler-exchange-aggregator
---
 src/exchange/taler-exchange-aggregator.c    | 411 ++++++++++----------
 src/exchangedb/plugin_exchangedb_postgres.c | 575 +++++++++++-----------------
 src/exchangedb/test_exchangedb.c            |  36 +-
 src/include/taler_exchangedb_plugin.h       |  42 +-
 4 files changed, 466 insertions(+), 598 deletions(-)

diff --git a/src/exchange/taler-exchange-aggregator.c 
b/src/exchange/taler-exchange-aggregator.c
index 8dd46f7..7bd437b 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -346,18 +346,19 @@ advance_fees (struct WirePlugin *wp,
  * @param wp wire transfer fee data structure to update
  * @param now timestamp to update fees to
  * @param session DB session to use
- * @return #GNUNET_OK on success, #GNUNET_SYSERR if we
- *         lack current fee information (and need to exit)
+ * @return transaction status
  */
-static int
+static enum GNUNET_DB_QueryStatus
 update_fees (struct WirePlugin *wp,
              struct GNUNET_TIME_Absolute now,
              struct TALER_EXCHANGEDB_Session *session)
 {
+  enum GNUNET_DB_QueryStatus qs;
+  
   advance_fees (wp,
                 now);
   if (NULL != wp->af)
-    return GNUNET_OK;
+    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
   /* Let's try to load it from disk... */
   wp->af = TALER_EXCHANGEDB_fees_read (cfg,
                                        wp->type);
@@ -367,26 +368,26 @@ update_fees (struct WirePlugin *wp,
        NULL != p;
        p = p->next)
   {
-    if (GNUNET_SYSERR ==
-        db_plugin->insert_wire_fee (db_plugin->cls,
-                                    session,
-                                    wp->type,
-                                    p->start_date,
-                                    p->end_date,
-                                    &p->wire_fee,
-                                    &p->master_sig))
+    qs = db_plugin->insert_wire_fee (db_plugin->cls,
+                                    session,
+                                    wp->type,
+                                    p->start_date,
+                                    p->end_date,
+                                    &p->wire_fee,
+                                    &p->master_sig);
+    if (qs < 0)
     {
       TALER_EXCHANGEDB_fees_free (wp->af);
       wp->af = NULL;
-      return GNUNET_SYSERR;
+      return qs;
     }
   }
   if (NULL != wp->af)
-    return GNUNET_OK;
+    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
   GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
               "Failed to find current wire transfer fees for `%s'\n",
               wp->type);
-  return GNUNET_SYSERR;
+  return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
 }
 
 
@@ -425,6 +426,26 @@ find_plugin (const char *type)
   return wp;
 }
 
+
+/**
+ * Free data stored in #au.
+ */
+static void
+cleanup_au (void)
+{
+  if (NULL == au)
+    return;
+  GNUNET_free_non_null (au->additional_rows);
+  if (NULL != au->wire)
+  {
+    json_decref (au->wire);
+    au->wire = NULL;
+  }
+  GNUNET_free (au);
+  au = NULL;
+}
+
+
 /**
  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
  *
@@ -463,11 +484,7 @@ shutdown_task (void *cls)
     }
     db_plugin->rollback (db_plugin->cls,
                          au->session);
-    GNUNET_free_non_null (au->additional_rows);
-    if (NULL != au->wire)
-      json_decref (au->wire);
-    au = NULL;
-    GNUNET_free (au);
+    cleanup_au ();
   }
   if (NULL != ctc)
   {
@@ -564,9 +581,9 @@ exchange_serve_process_config ()
  * @param wire_deadline by which the merchant adviced that he would like the
  *        wire transfer to be executed
  * @param wire wire details for the merchant
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ * @return transaction status code,  #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to 
continue to iterate
  */
-static int
+static enum GNUNET_DB_QueryStatus
 deposit_cb (void *cls,
             uint64_t row_id,
             const struct TALER_MerchantPublicKeyP *merchant_pub,
@@ -588,7 +605,7 @@ deposit_cb (void *cls,
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Fatally malformed record at row %llu\n",
                 (unsigned long long) row_id);
-    return GNUNET_SYSERR;
+    return GNUNET_DB_STATUS_HARD_ERROR;
   }
   au->row_id = row_id;
   GNUNET_assert (NULL == au->wire);
@@ -604,38 +621,41 @@ deposit_cb (void *cls,
 
   au->wp = find_plugin (extract_type (au->wire));
   if (NULL == au->wp)
-    return GNUNET_SYSERR;
+    return GNUNET_DB_STATUS_HARD_ERROR;
 
   /* make sure we have current fees */
   au->execution_time = GNUNET_TIME_absolute_get ();
   (void) GNUNET_TIME_round_abs (&au->execution_time);
-  if (GNUNET_OK !=
-      update_fees (au->wp,
-                   au->execution_time,
-                   au->session))
-    return GNUNET_SYSERR;
+  qs = update_fees (au->wp,
+                   au->execution_time,
+                   au->session);
+  if (qs <= 0)
+  {
+    if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+      qs = GNUNET_DB_STATUS_HARD_ERROR;
+    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+    return qs;
+  }
   au->wire_fee = au->wp->af->wire_fee;
 
-  if (GNUNET_OK !=
-      db_plugin->insert_aggregation_tracking (db_plugin->cls,
-                                              au->session,
-                                              &au->wtid,
-                                              row_id))
+  qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
+                                              au->session,
+                                              &au->wtid,
+                                              row_id);
+  if (qs <= 0)
   {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
+    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+    return qs;
   }
   qs = db_plugin->mark_deposit_done (db_plugin->cls,
                                     au->session,
                                     row_id);
   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
   {
-    /* FIXME #5010 */
-    GNUNET_break (0);
-    au->failed = GNUNET_YES;
-    return GNUNET_SYSERR;
+    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+    return qs;
   }
-  return GNUNET_OK;
+  return qs;
 }
 
 
@@ -653,9 +673,9 @@ deposit_cb (void *cls,
  * @param wire_deadline by which the merchant adviced that he would like the
  *        wire transfer to be executed
  * @param wire wire details for the merchant
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ * @return transaction status code
  */
-static int
+static enum GNUNET_DB_QueryStatus
 aggregate_cb (void *cls,
               uint64_t row_id,
               const struct TALER_MerchantPublicKeyP *merchant_pub,
@@ -682,9 +702,12 @@ aggregate_cb (void *cls,
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Fatally malformed record at %llu\n",
                 (unsigned long long) row_id);
-    return GNUNET_SYSERR;
+    return GNUNET_DB_STATUS_HARD_ERROR;
   }
   /* add to total */
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Adding transaction amount %s to aggregation\n",
+             TALER_amount2s (&delta));
   if (GNUNET_OK !=
       TALER_amount_add (&au->total_amount,
                         &au->total_amount,
@@ -694,14 +717,14 @@ aggregate_cb (void *cls,
                 "Overflow or currency incompatibility during aggregation at 
%llu\n",
                 (unsigned long long) row_id);
     /* Skip this one, but keep going! */
-    return GNUNET_OK;
+    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
   }
   if (au->rows_offset >= aggregation_limit)
   {
     /* Bug: we asked for at most #aggregation_limit results! */
     GNUNET_break (0);
     /* Skip this one, but keep going. */
-    return GNUNET_OK;
+    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
   }
   if (NULL == au->additional_rows)
     au->additional_rows = GNUNET_new_array (aggregation_limit,
@@ -709,26 +732,27 @@ aggregate_cb (void *cls,
   /* "append" to our list of rows */
   au->additional_rows[au->rows_offset++] = row_id;
   /* insert into aggregation tracking table */
-  if (GNUNET_OK !=
-      db_plugin->insert_aggregation_tracking (db_plugin->cls,
-                                              au->session,
-                                              &au->wtid,
-                                              row_id))
+  qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
+                                              au->session,
+                                              &au->wtid,
+                                              row_id);
+  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
   {
-    GNUNET_break (0);
-    return GNUNET_SYSERR;
+    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+    return qs;
   }
   qs = db_plugin->mark_deposit_done (db_plugin->cls,
                                     au->session,
                                     row_id);
   if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
   {
-    /* FIXME: #5010 */
-    GNUNET_break (0);
-    au->failed = GNUNET_YES;
-    return GNUNET_SYSERR;
+    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+    return qs;
   }
-  return GNUNET_OK;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Added row %llu to aggregation\n",
+             (unsigned long long) row_id);
+  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
 }
 
 
@@ -948,15 +972,18 @@ expired_reserve_cb (void *cls,
   }
 
   /* lookup `closing_fee` */
-  if (GNUNET_OK !=
-      update_fees (wp,
-                  now,
-                  session))
+  qs = update_fees (wp,
+                   now,
+                   session);
+  if (qs <= 0)
   {
-    GNUNET_break (0);
+    if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+      qs = GNUNET_DB_STATUS_HARD_ERROR;
+    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     global_ret = GNUNET_SYSERR;
-    GNUNET_SCHEDULER_shutdown ();
-    return GNUNET_DB_STATUS_HARD_ERROR;
+    if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+      GNUNET_SCHEDULER_shutdown ();
+    return qs;
   }
   closing_fee = &wp->af->closing_fee;
 
@@ -1144,7 +1171,6 @@ run_aggregation (void *cls)
   static int swap;
   struct TALER_EXCHANGEDB_Session *session;
   enum GNUNET_DB_QueryStatus qs;
-  int ret;
   const struct GNUNET_SCHEDULER_TaskContext *tc;
 
   task = NULL;
@@ -1179,19 +1205,16 @@ run_aggregation (void *cls)
   }
   au = GNUNET_new (struct AggregationUnit);
   au->session = session;
-  ret = db_plugin->get_ready_deposit (db_plugin->cls,
-                                      session,
-                                      &deposit_cb,
-                                      au);
-  if (0 >= ret)
+  qs = db_plugin->get_ready_deposit (db_plugin->cls,
+                                    session,
+                                    &deposit_cb,
+                                    au);
+  if (0 >= qs)
   {
-    if (NULL != au->wire)
-      json_decref (au->wire);
-    GNUNET_free (au);
-    au = NULL;
+    cleanup_au ();
     db_plugin->rollback (db_plugin->cls,
                          session);
-    if (GNUNET_SYSERR == ret)
+    if (GNUNET_DB_STATUS_HARD_ERROR == qs)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Failed to execute deposit iteration!\n");
@@ -1199,6 +1222,14 @@ run_aggregation (void *cls)
       GNUNET_SCHEDULER_shutdown ();
       return;
     }
+    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+    {
+      /* should re-try immediately */
+      swap--; /* do not count failed attempts */
+      task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+                                      NULL);
+      return;
+    }
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "No more ready deposits, going to sleep\n");
     if ( (GNUNET_YES == test_mode) &&
@@ -1209,12 +1240,13 @@ run_aggregation (void *cls)
     }
     else
     {
-      /* nothing to do, sleep for a minute and try again */
       if ( (GNUNET_NO == reserves_idle) ||
-           (GNUNET_YES == test_mode) )
+          (GNUNET_YES == test_mode) )
+       /* Possibly more to on reserves, go for it immediately */
        task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
                                         NULL);
       else
+       /* nothing to do, sleep for a minute and try again */
        task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
                                             &run_aggregation,
                                             NULL);
@@ -1226,29 +1258,37 @@ run_aggregation (void *cls)
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
               "Found ready deposit for %s, aggregating\n",
               TALER_B2S (&au->merchant_pub));
-  ret = db_plugin->iterate_matching_deposits (db_plugin->cls,
-                                              session,
-                                              &au->h_wire,
-                                              &au->merchant_pub,
-                                              &aggregate_cb,
-                                              au,
-                                              aggregation_limit);
-  if ( (GNUNET_SYSERR == ret) ||
+  qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
+                                            session,
+                                            &au->h_wire,
+                                            &au->merchant_pub,
+                                            &aggregate_cb,
+                                            au,
+                                            aggregation_limit);
+  if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) ||
        (GNUNET_YES == au->failed) )
   {
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                 "Failed to execute deposit iteration!\n");
-    GNUNET_free_non_null (au->additional_rows);
-    json_decref (au->wire);
-    GNUNET_free (au);
-    au = NULL;
+    cleanup_au ();
     db_plugin->rollback (db_plugin->cls,
                          session);
     global_ret = GNUNET_SYSERR;
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
-
+  if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+  {
+    /* serializiability issue, try again */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Serialization issue, trying again later!\n");
+    db_plugin->rollback (db_plugin->cls,
+                         session);
+    task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+                                    NULL);
+    return;
+  }
+  
   /* Subtract wire transfer fee and round to the unit supported by the
      wire transfer method; Check if after rounding down, we still have
      an amount to transfer, and if not mark as 'tiny'. */
@@ -1263,13 +1303,16 @@ run_aggregation (void *cls)
          (0 == au->final_amount.fraction) ) )
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                "Aggregate value too low for transfer\n");
+                "Aggregate value too low for transfer (%d/%s)\n",
+               qs,
+               TALER_amount2s (&au->final_amount));
     /* Rollback ongoing transaction, as we will not use the respective
        WTID and thus need to remove the tracking data */
     db_plugin->rollback (db_plugin->cls,
                          session);
-    /* Start another transaction to mark all* of the selected deposits
-       *as minor! */
+
+    /* There were results, just the value was too low.  Start another
+       transaction to mark all* of the selected deposits as minor! */
     if (GNUNET_OK !=
         db_plugin->start (db_plugin->cls,
                           session))
@@ -1277,16 +1320,11 @@ run_aggregation (void *cls)
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                   "Failed to start database transaction!\n");
       global_ret = GNUNET_SYSERR;
+      cleanup_au ();      
       GNUNET_SCHEDULER_shutdown ();
-      GNUNET_free_non_null (au->additional_rows);
-      if (NULL != au->wire)
-        json_decref (au->wire);
-      GNUNET_free (au);
-      au = NULL;
       return;
     }
     /* Mark transactions by row_id as minor */
-    ret = GNUNET_OK;
     qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
                                       session,
                                       au->row_id);
@@ -1303,13 +1341,11 @@ run_aggregation (void *cls)
     }
     if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
     {
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Serialization issue, trying again later!\n");
       db_plugin->rollback (db_plugin->cls,
                           session);
-      GNUNET_free_non_null (au->additional_rows);
-      if (NULL != au->wire)
-       json_decref (au->wire);
-      GNUNET_free (au);
-      au = NULL;
+      cleanup_au ();
       /* start again */
       task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                       NULL);
@@ -1319,21 +1355,13 @@ run_aggregation (void *cls)
     {
       db_plugin->rollback (db_plugin->cls,
                           session);
-      GNUNET_free_non_null (au->additional_rows);
-      if (NULL != au->wire)
-       json_decref (au->wire);
-      GNUNET_free (au);
-      au = NULL;
+      cleanup_au ();
       GNUNET_SCHEDULER_shutdown ();
       return;
     }
     /* commit */
     (void) commit_or_warn (session);
-    GNUNET_free_non_null (au->additional_rows);
-    if (NULL != au->wire)
-      json_decref (au->wire);
-    GNUNET_free (au);
-    au = NULL;
+    cleanup_au ();
     /* start again */
     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                      NULL);
@@ -1361,11 +1389,7 @@ run_aggregation (void *cls)
     GNUNET_break (0); /* why? how to best recover? */
     db_plugin->rollback (db_plugin->cls,
                          session);
-    GNUNET_free_non_null (au->additional_rows);
-    if (NULL != au->wire)
-      json_decref (au->wire);
-    GNUNET_free (au);
-    au = NULL;
+    cleanup_au ();
     /* start again */
     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                      NULL);
@@ -1388,8 +1412,10 @@ prepare_cb (void *cls,
             size_t buf_size)
 {
   struct TALER_EXCHANGEDB_Session *session = au->session;
+  enum GNUNET_DB_QueryStatus qs;
 
   GNUNET_free_non_null (au->additional_rows);
+  au->additional_rows = NULL;
   if (NULL == buf)
   {
     GNUNET_break (0); /* why? how to best recover? */
@@ -1398,74 +1424,53 @@ prepare_cb (void *cls,
     /* start again */
     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                      NULL);
-    if (NULL != au->wire)
-    {
-      json_decref (au->wire);
-      au->wire = NULL;
-    }
-    GNUNET_free (au);
-    au = NULL;
+    cleanup_au ();
     return;
   }
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Storing %u bytes of wire prepare data\n",
+             (unsigned int) buf_size);
   /* Commit our intention to execute the wire transfer! */
-  if (GNUNET_OK !=
-      db_plugin->wire_prepare_data_insert (db_plugin->cls,
-                                           session,
-                                           au->wp->type,
-                                           buf,
-                                           buf_size))
+  qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
+                                           session,
+                                           au->wp->type,
+                                           buf,
+                                           buf_size);
+  /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
+     table constraints */
+  if (qs >= 0)
+    qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
+                                            session,
+                                            au->execution_time,
+                                            &au->wtid,
+                                            au->wire,
+                                            &au->final_amount);
+  cleanup_au ();
+  if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
   {
-    GNUNET_break (0); /* why? how to best recover? */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Serialization issue for prepared wire data; trying again 
later!\n");
     db_plugin->rollback (db_plugin->cls,
                          session);
     /* start again */
     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                      NULL);
-    if (NULL != au->wire)
-    {
-      json_decref (au->wire);
-      au->wire = NULL;
-    }
-    GNUNET_free (au);
-    au = NULL;
     return;
   }
-
-  /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
-     table constraints */
-  if (GNUNET_OK !=
-      db_plugin->store_wire_transfer_out (db_plugin->cls,
-                                          session,
-                                          au->execution_time,
-                                          &au->wtid,
-                                          au->wire,
-                                          &au->final_amount))
+  if (GNUNET_DB_STATUS_HARD_ERROR == qs)
   {
-    GNUNET_break (0); /* why? how to best recover? */
+    GNUNET_break (0);
     db_plugin->rollback (db_plugin->cls,
                          session);
-    /* start again */
-    task = GNUNET_SCHEDULER_add_now (&run_aggregation,
-                                     NULL);
-    if (NULL != au->wire)
-    {
-      json_decref (au->wire);
-      au->wire = NULL;
-    }
-    GNUNET_free (au);
-    au = NULL;
+    /* die hard */
+    global_ret = GNUNET_SYSERR;
+    GNUNET_SCHEDULER_shutdown ();
     return;
   }
+
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Stored wire transfer out instructions\n");
-  if (NULL != au->wire)
-  {
-    json_decref (au->wire);
-    au->wire = NULL;
-  }
-  GNUNET_free (au);
-  au = NULL;
 
   /* Now we can finally commit the overall transaction, as we are
      again consistent if all of this passes. */
@@ -1473,6 +1478,8 @@ prepare_cb (void *cls,
   {
   case GNUNET_DB_STATUS_SOFT_ERROR:
     /* try again */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Commit issue for prepared wire data; trying again later!\n");
     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                      NULL);
     return;
@@ -1512,6 +1519,7 @@ wire_confirm_cb (void *cls,
                  const char *emsg)
 {
   struct TALER_EXCHANGEDB_Session *session = wpd->session;
+  enum GNUNET_DB_QueryStatus qs;
 
   wpd->eh = NULL;
   if (GNUNET_SYSERR == success)
@@ -1527,16 +1535,25 @@ wire_confirm_cb (void *cls,
     wpd = NULL;
     return;
   }
-  if (GNUNET_OK !=
-      db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
-                                                  session,
-                                                  wpd->row_id))
+  qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
+                                                  session,
+                                                  wpd->row_id);
+  if (0 >= qs)
   {
-    GNUNET_break (0); /* why!? */
+    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
     db_plugin->rollback (db_plugin->cls,
                          session);
-    global_ret = GNUNET_SYSERR;
-    GNUNET_SCHEDULER_shutdown ();
+    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+    {
+      /* try again */
+      task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+                                      NULL);
+    }
+    else
+    {
+      global_ret = GNUNET_SYSERR;
+      GNUNET_SCHEDULER_shutdown ();
+    }
     GNUNET_free (wpd);
     wpd = NULL;
     return;
@@ -1621,7 +1638,7 @@ wire_prepare_cb (void *cls,
 static void
 run_transfers (void *cls)
 {
-  int ret;
+  enum GNUNET_DB_QueryStatus qs;
   struct TALER_EXCHANGEDB_Session *session;
   const struct GNUNET_SCHEDULER_TaskContext *tc;
 
@@ -1651,35 +1668,39 @@ run_transfers (void *cls)
   }
   wpd = GNUNET_new (struct WirePrepareData);
   wpd->session = session;
-  ret = db_plugin->wire_prepare_data_get (db_plugin->cls,
-                                          session,
-                                          &wire_prepare_cb,
-                                          NULL);
-  if (GNUNET_SYSERR == ret)
+  qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
+                                        session,
+                                        &wire_prepare_cb,
+                                        NULL);
+  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
+    return;  /* continues in #wire_prepare_cb() */
+  db_plugin->rollback (db_plugin->cls,
+                      session);
+  GNUNET_free (wpd);
+  wpd = NULL;
+  switch (qs)
   {
-    GNUNET_break (0); /* why? how to best recover? */
-    db_plugin->rollback (db_plugin->cls,
-                         session);
+  case GNUNET_DB_STATUS_HARD_ERROR:
+    GNUNET_break (0);
     global_ret = GNUNET_SYSERR;
     GNUNET_SCHEDULER_shutdown ();
-    GNUNET_free (wpd);
-    wpd = NULL;
     return;
-  }
-  if (GNUNET_NO == ret)
-  {
+  case GNUNET_DB_STATUS_SOFT_ERROR:
+    /* try again */
+    task = GNUNET_SCHEDULER_add_now (&run_transfers,
+                                    NULL);      
+    return;
+  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     /* no more prepared wire transfers, go back to aggregation! */
     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                 "No more pending wire transfers, starting aggregation\n");
-    db_plugin->rollback (db_plugin->cls,
-                         session);
     task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                      NULL);
-    GNUNET_free (wpd);
-    wpd = NULL;
     return;
+  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+    /* should be impossible */
+    GNUNET_assert (0);
   }
-  /* otherwise, continues in #wire_prepare_cb() */
 }
 
 
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c 
b/src/exchangedb/plugin_exchangedb_postgres.c
index 9ec998a..4c94c2d 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -91,24 +91,6 @@ struct TALER_EXCHANGEDB_Session
    */
   PGconn *conn;
 
-  /**
-   * Transaction state.  Set to #GNUNET_OK by #postgres_start().
-   * Set to #GNUNET_NO if any part of the transaction failed in a
-   * transient way (i.e. #PG_DIAG_SQLSTATE_DEADLOCK or
-   * #PG_DIAG_SQLSTATE_SERIALIZATION_FAILURE).  Set to
-   * #GNUNET_SYSERR if any part of the transaction failed in a
-   * hard way or if we are not within a transaction scope.
-   *
-   * If #GNUNET_NO, #postgres_commit() will always just do a
-   * rollback and return #GNUNET_NO as well (to retry).
-   *
-   * If #GNUNET_SYSERR, #postgres_commit() will always just do a
-   * rollback and return #GNUNET_SYSERR as well.
-   *
-   * If #GNUNET_OK, #postgres_commit() will try to commit and
-   * return the result from the commit operation.
-   */
-  int state;
 };
 
 
@@ -1554,7 +1536,6 @@ postgres_get_session (void *cls)
     return NULL;
   }
   session = GNUNET_new (struct TALER_EXCHANGEDB_Session);
-  session->state = GNUNET_SYSERR;
   session->conn = db_conn;
   if (0 != pthread_setspecific (pc->db_conn_threadlocal,
                                 session))
@@ -1592,11 +1573,9 @@ postgres_start (void *cls,
                      PQerrorMessage (session->conn));
     GNUNET_break (0);
     PQclear (result);
-    session->state = GNUNET_SYSERR;
     return GNUNET_SYSERR;
   }
   PQclear (result);
-  session->state = GNUNET_OK;
   return GNUNET_OK;
 }
 
@@ -1619,51 +1598,6 @@ postgres_rollback (void *cls,
   GNUNET_break (PGRES_COMMAND_OK ==
                 PQresultStatus (result));
   PQclear (result);
-  session->state = GNUNET_SYSERR;
-}
-
-
-/**
- * Check the @a result's error code to see what happened.
- * Also logs errors.
- *
- * @param session session used
- * @param result result to check
- * @return #GNUNET_OK if the request/transaction succeeded
- *         #GNUNET_NO if it failed but could succeed if retried
- *         #GNUNET_SYSERR on hard errors
- */
-static int
-evaluate_pq_result (struct TALER_EXCHANGEDB_Session *session,
-                    PGresult *result)
-{
-  if (PGRES_COMMAND_OK !=
-      PQresultStatus (result))
-  {
-    const char *sqlstate;
-
-    sqlstate = PQresultErrorField (result,
-                                   PG_DIAG_SQLSTATE);
-    if (NULL == sqlstate)
-    {
-      /* very unexpected... */
-      GNUNET_break (0);
-      return GNUNET_SYSERR;
-    }
-    if ( (0 == strcmp (sqlstate,
-                       PQ_DIAG_SQLSTATE_DEADLOCK)) ||
-         (0 == strcmp (sqlstate,
-                       PQ_DIAG_SQLSTATE_SERIALIZATION_FAILURE)) )
-    {
-      /* These two can be retried and have a fair chance of working
-         the next time */
-      QUERY_ERR (result, session->conn);
-      return GNUNET_NO;
-    }
-    BREAK_DB_ERR(result, session->conn);
-    return GNUNET_SYSERR;
-  }
-  return GNUNET_OK;
 }
 
 
@@ -1689,87 +1623,6 @@ postgres_commit (void *cls,
 
 
 /**
- * Update the @a session state based on the latest @a result from
- * the database.  Checks the status code of @a result and possibly
- * sets the state to failed (#GNUNET_SYSERR) or transiently failed
- * (#GNUNET_NO).
- *
- * @param session the session in which the transaction is running
- * @param statement name of the statement we were executing (for logging)
- * @param result the result we got from Postgres
- * @return current session state, i.e.
- *         #GNUNET_OK on success
- *         #GNUNET_NO if the transaction had a transient failure
- *         #GNUNET_SYSERR if the transaction had a hard failure
- */
-static int
-update_session_from_result (struct TALER_EXCHANGEDB_Session *session,
-                            const char *statement,
-                            PGresult *result)
-{
-  int ret;
-
-  if (GNUNET_OK != session->state)
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR; /* we already failed, why do we keep going? */
-  }
-  ret = evaluate_pq_result (session,
-                            result);
-  if (GNUNET_OK == ret)
-    return ret;
-  GNUNET_log ((GNUNET_NO == ret)
-              ? GNUNET_ERROR_TYPE_INFO
-              : GNUNET_ERROR_TYPE_ERROR,
-              "Statement `%s' failed: %s/%s/%s/%s/%s",
-              statement,
-              PQresultErrorField (result, PG_DIAG_MESSAGE_PRIMARY),
-              PQresultErrorField (result, PG_DIAG_MESSAGE_DETAIL),
-              PQresultErrorMessage (result),
-              PQresStatus (PQresultStatus (result)),
-              PQerrorMessage (session->conn));
-  session->state = ret;
-  return ret;
-}
-
-
-/**
- * Execute a named prepared @a statement that is NOT a SELECT statement
- * in @a session using the given @a params.  Returns the resulting session
- * state.
- *
- * @param session session to execute the statement in
- * @param statement name of the statement
- * @param params parameters to give to the statement 
(#GNUNET_PQ_query_param_end-terminated)
- * @return #GNUNET_OK on success
- *         #GNUNET_NO if the transaction had a transient failure
- *         #GNUNET_SYSERR if the transaction had a hard failure
- */
-static int
-execute_prepared_non_select (struct TALER_EXCHANGEDB_Session *session,
-                             const char *statement,
-                             const struct GNUNET_PQ_QueryParam *params)
-{
-  PGresult *result;
-  int ret;
-
-  if (GNUNET_OK != session->state)
-  {
-    GNUNET_break (0);
-    return GNUNET_SYSERR; /* we already failed, why keep going? */
-  }
-  result = GNUNET_PQ_exec_prepared (session->conn,
-                                    statement,
-                                    params);
-  ret = update_session_from_result (session,
-                                    statement,
-                                    result);
-  PQclear (result);
-  return ret;
-}
-
-
-/**
  * Insert a denomination key's public information into the database for
  * reference by auditors and other consistency checks.
  *
@@ -2787,10 +2640,9 @@ postgres_mark_deposit_done (void *cls,
  * @param session connection to the database
  * @param deposit_cb function to call for ONE such deposit
  * @param deposit_cb_cls closure for @a deposit_cb
- * @return number of rows processed, 0 if none exist,
- *         #GNUNET_SYSERR on error
+ * @return transaction status code
  */
-static int
+static enum GNUNET_DB_QueryStatus
 postgres_get_ready_deposit (void *cls,
                             struct TALER_EXCHANGEDB_Session *session,
                             TALER_EXCHANGEDB_DepositIterator deposit_cb,
@@ -2801,77 +2653,161 @@ postgres_get_ready_deposit (void *cls,
     GNUNET_PQ_query_param_absolute_time (&now),
     GNUNET_PQ_query_param_end
   };
-  PGresult *result;
-  unsigned int n;
-  int ret;
+  struct TALER_Amount amount_with_fee;
+  struct TALER_Amount deposit_fee;
+  struct GNUNET_TIME_Absolute wire_deadline;
+  struct GNUNET_HashCode h_contract_terms;
+  struct TALER_MerchantPublicKeyP merchant_pub;
+  struct TALER_CoinSpendPublicKeyP coin_pub;
+  uint64_t serial_id;
+  json_t *wire;
+  struct GNUNET_PQ_ResultSpec rs[] = {
+    GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
+                                 &serial_id),
+    TALER_PQ_result_spec_amount ("amount_with_fee",
+                                &amount_with_fee),
+    TALER_PQ_result_spec_amount ("fee_deposit",
+                                &deposit_fee),
+    GNUNET_PQ_result_spec_absolute_time ("wire_deadline",
+                                        &wire_deadline),
+    GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
+                                         &h_contract_terms),
+    GNUNET_PQ_result_spec_auto_from_type ("merchant_pub",
+                                         &merchant_pub),
+    GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
+                                         &coin_pub),
+    TALER_PQ_result_spec_json ("wire",
+                              &wire),
+    GNUNET_PQ_result_spec_end
+  };
+  enum GNUNET_DB_QueryStatus qs;
 
-  result = GNUNET_PQ_exec_prepared (session->conn,
-                                   "deposits_get_ready",
-                                   params);
-  if (PGRES_TUPLES_OK !=
-      PQresultStatus (result))
-  {
-    BREAK_DB_ERR (result, session->conn);
-    PQclear (result);
-    return GNUNET_SYSERR;
-  }
-  if (0 == (n = PQntuples (result)))
-  {
-    PQclear (result);
-    return 0;
-  }
-  GNUNET_break (1 == n);
+  qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
+                                                "deposits_get_ready",
+                                                params,
+                                                rs);
+  if (qs <= 0)
+    return qs;
+  qs = deposit_cb (deposit_cb_cls,
+                  serial_id,
+                  &merchant_pub,
+                  &coin_pub,
+                  &amount_with_fee,
+                  &deposit_fee,
+                  &h_contract_terms,
+                  wire_deadline,
+                  wire);
+  GNUNET_PQ_cleanup_result (rs);
+  return qs;
+}
+
+
+/**
+ * Closure for #match_deposit_cb().
+ */
+struct MatchingDepositContext
+{
+  /**
+   * Function to call for each result
+   */
+  TALER_EXCHANGEDB_DepositIterator deposit_cb;
+
+  /**
+   * Closure for @e deposit_cb.
+   */
+  void *deposit_cb_cls;
+
+  /**
+   * Public key of the merchant against which we are matching.
+   */
+  const struct TALER_MerchantPublicKeyP *merchant_pub;
+  
+  /**
+   * Maximum number of results to return.
+   */
+  uint32_t limit;
+
+  /**
+   * Loop counter, actual number of results returned.
+   */
+  unsigned int i;
+
+  /**
+   * Set to #GNUNET_SYSERR on hard errors.
+   */
+  int status;
+};
+
+
+/**
+ * Helper function for #postgres_iterate_matching_deposits().
+ * To be called with the results of a SELECT statement
+ * that has returned @a num_results results.
+ *
+ * @param cls closure of type `struct MatchingDepositContext *`
+ * @param result the postgres result
+ * @param num_result the number of results in @a result
+ */
+static void
+match_deposit_cb (void *cls,
+                 PGresult *result,
+                 unsigned int num_results)
+{
+  struct MatchingDepositContext *mdc = cls;
+  
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Found %u/%u matching deposits\n",
+             num_results,
+             mdc->limit);
+  num_results = GNUNET_MIN (num_results,
+                           mdc->limit);
+  for (mdc->i=0;mdc->i<num_results;mdc->i++)
   {
     struct TALER_Amount amount_with_fee;
     struct TALER_Amount deposit_fee;
     struct GNUNET_TIME_Absolute wire_deadline;
     struct GNUNET_HashCode h_contract_terms;
-    struct TALER_MerchantPublicKeyP merchant_pub;
     struct TALER_CoinSpendPublicKeyP coin_pub;
     uint64_t serial_id;
-    json_t *wire;
+    enum GNUNET_DB_QueryStatus qs;
     struct GNUNET_PQ_ResultSpec rs[] = {
       GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
-                                   &serial_id),
+                                    &serial_id),
       TALER_PQ_result_spec_amount ("amount_with_fee",
                                    &amount_with_fee),
       TALER_PQ_result_spec_amount ("fee_deposit",
                                    &deposit_fee),
       GNUNET_PQ_result_spec_absolute_time ("wire_deadline",
-                                          &wire_deadline),
+                                           &wire_deadline),
       GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
-                                           &h_contract_terms),
-      GNUNET_PQ_result_spec_auto_from_type ("merchant_pub",
-                                           &merchant_pub),
+                                            &h_contract_terms),
       GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
-                                           &coin_pub),
-      TALER_PQ_result_spec_json ("wire",
-                                 &wire),
+                                            &coin_pub),
       GNUNET_PQ_result_spec_end
     };
-
+    
     if (GNUNET_OK !=
         GNUNET_PQ_extract_result (result,
                                   rs,
-                                  0))
+                                  mdc->i))
     {
       GNUNET_break (0);
-      PQclear (result);
-      return GNUNET_SYSERR;
+      mdc->status = GNUNET_SYSERR;
+      return;
     }
-    ret = deposit_cb (deposit_cb_cls,
-                      serial_id,
-                      &merchant_pub,
-                      &coin_pub,
-                      &amount_with_fee,
-                      &deposit_fee,
-                      &h_contract_terms,
-                      wire_deadline,
-                      wire);
+    qs = mdc->deposit_cb (mdc->deposit_cb_cls,
+                         serial_id,
+                         mdc->merchant_pub,
+                         &coin_pub,
+                         &amount_with_fee,
+                         &deposit_fee,
+                         &h_contract_terms,
+                         wire_deadline,
+                         NULL);
     GNUNET_PQ_cleanup_result (rs);
-    PQclear (result);
+    if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
+      break;
   }
-  return (GNUNET_OK == ret) ? 1 : 0;
 }
 
 
@@ -2889,7 +2825,7 @@ postgres_get_ready_deposit (void *cls,
  * @return transaction status code, if positive:
  *         number of rows processed, 0 if none exist
  */
-static int // FIXME: enum GNUNET_DB_QueryStatus
+static enum GNUNET_DB_QueryStatus
 postgres_iterate_matching_deposits (void *cls,
                                     struct TALER_EXCHANGEDB_Session *session,
                                     const struct GNUNET_HashCode *h_wire,
@@ -2903,75 +2839,27 @@ postgres_iterate_matching_deposits (void *cls,
     GNUNET_PQ_query_param_auto_from_type (h_wire),
     GNUNET_PQ_query_param_end
   };
-  PGresult *result;
-  unsigned int i;
-  unsigned int n;
+  struct MatchingDepositContext mdc;
+  enum GNUNET_DB_QueryStatus qs;
 
-  result = GNUNET_PQ_exec_prepared (session->conn,
-                                    "deposits_iterate_matching",
-                                    params);
-  if (PGRES_TUPLES_OK !=
-      PQresultStatus (result))
-  {
-    BREAK_DB_ERR (result, session->conn);
-    PQclear (result);
-    return GNUNET_SYSERR;
-  }
-  if (0 == (n = PQntuples (result)))
-  {
-    PQclear (result);
-    return 0;
-  }
-  if (n > limit)
-    n = limit;
-  for (i=0;i<n;i++)
+  mdc.deposit_cb = deposit_cb;
+  mdc.deposit_cb_cls = deposit_cb_cls;
+  mdc.merchant_pub = merchant_pub;
+  mdc.limit = limit;
+  mdc.status = GNUNET_OK;
+  qs = GNUNET_PQ_eval_prepared_multi_select (session->conn,
+                                            "deposits_iterate_matching",
+                                            params,
+                                            &match_deposit_cb,
+                                            &mdc);
+  if (GNUNET_OK != mdc.status)
   {
-    struct TALER_Amount amount_with_fee;
-    struct TALER_Amount deposit_fee;
-    struct GNUNET_TIME_Absolute wire_deadline;
-    struct GNUNET_HashCode h_contract_terms;
-    struct TALER_CoinSpendPublicKeyP coin_pub;
-    uint64_t serial_id;
-    int ret;
-    struct GNUNET_PQ_ResultSpec rs[] = {
-      GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
-                                    &serial_id),
-      TALER_PQ_result_spec_amount ("amount_with_fee",
-                                   &amount_with_fee),
-      TALER_PQ_result_spec_amount ("fee_deposit",
-                                   &deposit_fee),
-      GNUNET_PQ_result_spec_absolute_time ("wire_deadline",
-                                           &wire_deadline),
-      GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
-                                            &h_contract_terms),
-      GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
-                                            &coin_pub),
-      GNUNET_PQ_result_spec_end
-    };
-    if (GNUNET_OK !=
-        GNUNET_PQ_extract_result (result,
-                                  rs,
-                                  i))
-    {
-      GNUNET_break (0);
-      PQclear (result);
-      return GNUNET_SYSERR;
-    }
-    ret = deposit_cb (deposit_cb_cls,
-                      serial_id,
-                      merchant_pub,
-                      &coin_pub,
-                      &amount_with_fee,
-                      &deposit_fee,
-                      &h_contract_terms,
-                      wire_deadline,
-                      NULL);
-    GNUNET_PQ_cleanup_result (rs);
-    if (GNUNET_OK != ret)
-      break;
+    GNUNET_break (0);
+    return GNUNET_DB_STATUS_HARD_ERROR;
   }
-  PQclear (result);
-  return i;
+  if (qs >= 0)
+    return mdc.i;
+  return qs;
 }
 
 
@@ -4493,11 +4381,9 @@ postgres_wire_lookup_deposit_wtid (void *cls,
  * @param session database connection
  * @param wtid the raw wire transfer identifier we used
  * @param deposit_serial_id row in the deposits table for which this is 
aggregation data
- * @return #GNUNET_OK on success,
- *         #GNUNET_NO on transient errors
- *         #GNUNET_SYSERR on DB errors
+ * @return transaction status code
  */
-static int
+static enum GNUNET_DB_QueryStatus
 postgres_insert_aggregation_tracking (void *cls,
                                       struct TALER_EXCHANGEDB_Session *session,
                                       const struct 
TALER_WireTransferIdentifierRawP *wtid,
@@ -4510,9 +4396,9 @@ postgres_insert_aggregation_tracking (void *cls,
     GNUNET_PQ_query_param_end
   };
 
-  return execute_prepared_non_select (session,
-                                      "insert_aggregation_tracking",
-                                      params);
+  return GNUNET_PQ_eval_prepared_non_select (session->conn,
+                                            "insert_aggregation_tracking",
+                                            params);
 }
 
 
@@ -4569,11 +4455,9 @@ postgres_get_wire_fee (void *cls,
  * @param end_date when does the fee end being valid
  * @param wire_fee how high is the wire transfer fee
  * @param master_sig signature over the above by the exchange master key
- * @return #GNUNET_OK on success or if the record exists,
- *         #GNUNET_NO on transient errors
- *         #GNUNET_SYSERR on failure
+ * @return transaction status code
  */
-static int
+static enum GNUNET_DB_QueryStatus
 postgres_insert_wire_fee (void *cls,
                           struct TALER_EXCHANGEDB_Session *session,
                           const char *type,
@@ -4594,43 +4478,46 @@ postgres_insert_wire_fee (void *cls,
   struct TALER_MasterSignatureP sig;
   struct GNUNET_TIME_Absolute sd;
   struct GNUNET_TIME_Absolute ed;
+  enum GNUNET_DB_QueryStatus qs;
 
-  if (GNUNET_OK ==
-      postgres_get_wire_fee (cls,
-                             session,
-                             type,
-                             start_date,
-                             &sd,
-                             &ed,
-                             &wf,
-                             &sig))
+  qs = postgres_get_wire_fee (cls,
+                             session,
+                             type,
+                             start_date,
+                             &sd,
+                             &ed,
+                             &wf,
+                             &sig);
+  if (qs < 0)
+    return qs;
+  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
   {
     if (0 != memcmp (&sig,
                      master_sig,
                      sizeof (sig)))
     {
       GNUNET_break (0);
-      return GNUNET_SYSERR;
+      return GNUNET_DB_STATUS_HARD_ERROR;
     }
     if (0 != TALER_amount_cmp (wire_fee,
                                &wf))
     {
       GNUNET_break (0);
-      return GNUNET_SYSERR;
+      return GNUNET_DB_STATUS_HARD_ERROR;
     }
     if ( (sd.abs_value_us != start_date.abs_value_us) ||
          (ed.abs_value_us != end_date.abs_value_us) )
     {
       GNUNET_break (0);
-      return GNUNET_SYSERR;
+      return GNUNET_DB_STATUS_HARD_ERROR;
     }
     /* equal record already exists */
-    return GNUNET_OK;
+    return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
   }
 
-  return execute_prepared_non_select (session,
-                                      "insert_wire_fee",
-                                      params);
+  return GNUNET_PQ_eval_prepared_non_select (session->conn,
+                                            "insert_wire_fee",
+                                            params);
 }
 
 
@@ -4862,9 +4749,9 @@ postgres_wire_prepare_data_insert (void *cls,
  * @param cls closure
  * @param session database connection
  * @param rowid which entry to mark as finished
- * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+ * @return transaction status code
  */
-static int
+static enum GNUNET_DB_QueryStatus
 postgres_wire_prepare_data_mark_finished (void *cls,
                                           struct TALER_EXCHANGEDB_Session 
*session,
                                           uint64_t rowid)
@@ -4874,9 +4761,9 @@ postgres_wire_prepare_data_mark_finished (void *cls,
     GNUNET_PQ_query_param_end
   };
 
-  return execute_prepared_non_select (session,
-                                      "wire_prepare_data_mark_done",
-                                      params);
+  return GNUNET_PQ_eval_prepared_non_select (session->conn,
+                                            "wire_prepare_data_mark_done",
+                                            params);
 }
 
 
@@ -4888,76 +4775,46 @@ postgres_wire_prepare_data_mark_finished (void *cls,
  * @param session database connection
  * @param cb function to call for ONE unfinished item
  * @param cb_cls closure for @a cb
- * @return #GNUNET_OK on success,
- *         #GNUNET_NO if there are no entries,
- *         #GNUNET_SYSERR on DB errors
+ * @return transaction status code
  */
-static int
+static enum GNUNET_DB_QueryStatus
 postgres_wire_prepare_data_get (void *cls,
                                 struct TALER_EXCHANGEDB_Session *session,
                                 TALER_EXCHANGEDB_WirePreparationIterator cb,
                                 void *cb_cls)
 {
-  PGresult *result;
+  enum GNUNET_DB_QueryStatus qs;
   struct GNUNET_PQ_QueryParam params[] = {
     GNUNET_PQ_query_param_end
   };
+  uint64_t prewire_uuid;
+  char *type;
+  void *buf = NULL;
+  size_t buf_size;
+  struct GNUNET_PQ_ResultSpec rs[] = {
+    GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
+                                 &prewire_uuid),
+    GNUNET_PQ_result_spec_string ("type",
+                                 &type),
+    GNUNET_PQ_result_spec_variable_size ("buf",
+                                        &buf,
+                                        &buf_size),
+    GNUNET_PQ_result_spec_end
+  };
 
-  result = GNUNET_PQ_exec_prepared (session->conn,
-                                   "wire_prepare_data_get",
-                                   params);
-  if (PGRES_TUPLES_OK != PQresultStatus (result))
-  {
-    QUERY_ERR (result, session->conn);
-    PQclear (result);
-    return GNUNET_SYSERR;
-  }
-  if (0 == PQntuples (result))
-  {
-    PQclear (result);
-    return GNUNET_NO;
-  }
-  if (1 != PQntuples (result))
-  {
-    GNUNET_break (0);
-    PQclear (result);
-    return GNUNET_SYSERR;
-  }
-
-  {
-    uint64_t prewire_uuid;
-    char *type;
-    void *buf = NULL;
-    size_t buf_size;
-    struct GNUNET_PQ_ResultSpec rs[] = {
-      GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
-                                    &prewire_uuid),
-      GNUNET_PQ_result_spec_string ("type",
-                                    &type),
-      GNUNET_PQ_result_spec_variable_size ("buf",
-                                           &buf,
-                                           &buf_size),
-      GNUNET_PQ_result_spec_end
-    };
-
-    if (GNUNET_OK !=
-        GNUNET_PQ_extract_result (result,
-                                 rs,
-                                 0))
-    {
-      GNUNET_break (0);
-      PQclear (result);
-      return GNUNET_SYSERR;
-    }
-    cb (cb_cls,
-        prewire_uuid,
-        type,
-        buf,
-        buf_size);
-    GNUNET_PQ_cleanup_result (rs);
-  }
-  PQclear (result);
-  return GNUNET_OK;
+  qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
+                                                "wire_prepare_data_get",
+                                                params,
+                                                rs);
+  if (0 >= qs)
+    return qs;
+  cb (cb_cls,
+      prewire_uuid,
+      type,
+      buf,
+      buf_size);
+  GNUNET_PQ_cleanup_result (rs);
+  return qs;
 }
 
 
@@ -5003,7 +4860,6 @@ postgres_start_deferred_wire_out (void *cls,
     return GNUNET_SYSERR;
   }
   PQclear (result);
-  session->state = GNUNET_OK;
   return GNUNET_OK;
 }
 
@@ -5017,10 +4873,9 @@ postgres_start_deferred_wire_out (void *cls,
  * @param wtid subject of the wire transfer
  * @param wire_account details about the receiver account of the wire transfer
  * @param amount amount that was transmitted
- * @return #GNUNET_OK on success
- *         #GNUNET_SYSERR on DB errors
+ * @return transaction status code
  */
-static int
+static enum GNUNET_DB_QueryStatus
 postgres_store_wire_transfer_out (void *cls,
                                   struct TALER_EXCHANGEDB_Session *session,
                                   struct GNUNET_TIME_Absolute date,
@@ -5036,9 +4891,9 @@ postgres_store_wire_transfer_out (void *cls,
     GNUNET_PQ_query_param_end
   };
 
-  return execute_prepared_non_select (session,
-                                      "insert_wire_out",
-                                      params);
+  return GNUNET_PQ_eval_prepared_non_select (session->conn,
+                                            "insert_wire_out",
+                                            params);
 }
 
 
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index 4ca7c39..773d643 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -914,10 +914,9 @@ static uint64_t deposit_rowid;
  * @param wire_deadline by which the merchant adviced that he would like the
  *        wire transfer to be executed
  * @param wire wire details for the merchant, NULL from 
iterate_matching_deposits()
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR if deposit does
- *         not match our expectations
+ * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to 
continue to iterate
  */
-static int
+static enum GNUNET_DB_QueryStatus
 deposit_cb (void *cls,
             uint64_t rowid,
             const struct TALER_MerchantPublicKeyP *merchant_pub,
@@ -953,10 +952,10 @@ deposit_cb (void *cls,
                        sizeof (struct GNUNET_HashCode))) ) )
   {
     GNUNET_break (0);
-    return GNUNET_SYSERR;
+    return GNUNET_DB_STATUS_HARD_ERROR;
   }
 
-  return GNUNET_OK;
+  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
 }
 
 
@@ -1164,7 +1163,7 @@ test_wire_fees (struct TALER_EXCHANGEDB_Session *session)
   GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK,
                               &master_sig,
                               sizeof (master_sig));
-  if (GNUNET_OK !=
+  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
       plugin->insert_wire_fee (plugin->cls,
                                session,
                                "wire-method",
@@ -1176,7 +1175,7 @@ test_wire_fees (struct TALER_EXCHANGEDB_Session *session)
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  if (GNUNET_OK !=
+  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
       plugin->insert_wire_fee (plugin->cls,
                                session,
                                "wire-method",
@@ -1800,7 +1799,7 @@ run (void *cls)
                                                   NULL));
   FAILIF (1 != auditor_row_cnt);
   result = 9;
-  FAILIF (1 !=
+  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
           plugin->iterate_matching_deposits (plugin->cls,
                                              session,
                                              &deposit.h_wire,
@@ -1808,7 +1807,7 @@ run (void *cls)
                                              &deposit_cb, &deposit,
                                              2));
 
-  FAILIF (1 !=
+  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
           plugin->get_ready_deposit (plugin->cls,
                                      session,
                                      &deposit_cb,
@@ -1821,7 +1820,7 @@ run (void *cls)
                          session));
   FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
           plugin->mark_deposit_tiny (plugin->cls,
-                                     session,
+                                    session,
                                      deposit_rowid));
   FAILIF (0 !=
           plugin->get_ready_deposit (plugin->cls,
@@ -1838,18 +1837,18 @@ run (void *cls)
   FAILIF (GNUNET_OK !=
           plugin->start (plugin->cls,
                          session));
-  FAILIF (GNUNET_NO !=
+  FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->test_deposit_done (plugin->cls,
                                      session,
                                      &deposit));
-  FAILIF (GNUNET_OK !=
+  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
           plugin->mark_deposit_done (plugin->cls,
                                      session,
                                      deposit_rowid));
   FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->commit (plugin->cls,
                           session));
-  FAILIF (GNUNET_YES !=
+  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
           plugin->test_deposit_done (plugin->cls,
                                      session,
                                      &deposit));
@@ -1857,17 +1856,18 @@ run (void *cls)
   result = 10;
   deposit2 = deposit;
   RND_BLK (&deposit2.merchant_pub); /* should fail if merchant is different */
-  FAILIF (0 !=
+  FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->have_deposit (plugin->cls,
                                 session,
                                 &deposit2));
   deposit2.merchant_pub = deposit.merchant_pub;
   RND_BLK (&deposit2.coin.coin_pub); /* should fail if coin is different */
-  FAILIF (0 !=
+  FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->have_deposit (plugin->cls,
                                 session,
                                 &deposit2));
-  FAILIF (GNUNET_OK != test_melting (session));
+  FAILIF (GNUNET_OK !=
+         test_melting (session));
 
 
   /* test insert_refund! */
@@ -1886,7 +1886,7 @@ run (void *cls)
 
   /* test payback / revocation */
   RND_BLK (&master_sig);
-  FAILIF (GNUNET_OK !=
+  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
           plugin->insert_denomination_revocation (plugin->cls,
                                                   session,
                                                   &dkp_pub_hash,
@@ -1897,7 +1897,7 @@ run (void *cls)
   FAILIF (GNUNET_OK !=
           plugin->start (plugin->cls,
                          session));
-  FAILIF (GNUNET_NO !=
+  FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
           plugin->insert_denomination_revocation (plugin->cls,
                                                   session,
                                                   &dkp_pub_hash,
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index 9f80fda..d3b48e4 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -674,9 +674,9 @@ struct TALER_EXCHANGEDB_Session;
  * @param wire_deadline by which the merchant adviced that he would like the
  *        wire transfer to be executed
  * @param receiver_wire_account wire details for the merchant, NULL from 
iterate_matching_deposits()
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to 
continue to iterate
  */
-typedef int
+typedef enum GNUNET_DB_QueryStatus
 (*TALER_EXCHANGEDB_DepositIterator)(void *cls,
                                     uint64_t rowid,
                                     const struct TALER_MerchantPublicKeyP 
*merchant_pub,
@@ -1383,10 +1383,9 @@ struct TALER_EXCHANGEDB_Plugin
    * @param session connection to the database
    * @param deposit_cb function to call for ONE such deposit
    * @param deposit_cb_cls closure for @a deposit_cb
-   * @return number of rows processed, 0 if none exist,
-   *         #GNUNET_SYSERR on error
+   * @return transaction status code
    */
-  int
+  enum GNUNET_DB_QueryStatus
   (*get_ready_deposit) (void *cls,
                         struct TALER_EXCHANGEDB_Session *session,
                         TALER_EXCHANGEDB_DepositIterator deposit_cb,
@@ -1418,9 +1417,9 @@ struct TALER_EXCHANGEDB_Plugin
    *        be #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT, larger values
    *        are not supported, smaller values would be inefficient.
    * @return number of rows processed, 0 if none exist,
-   *         #GNUNET_SYSERR on error
+   *         transaction status code on error
    */
-  int
+  enum GNUNET_DB_QueryStatus
   (*iterate_matching_deposits) (void *cls,
                                 struct TALER_EXCHANGEDB_Session *session,
                                 const struct GNUNET_HashCode *h_wire,
@@ -1753,11 +1752,9 @@ struct TALER_EXCHANGEDB_Plugin
    * @param session database connection
    * @param wtid the raw wire transfer identifier we used
    * @param deposit_serial_id row in the deposits table for which this is 
aggregation data
-   * @return #GNUNET_OK on success
-   *         #GNUNET_NO on transient errors
-   *         #GNUNET_SYSERR on DB errors
+   * @return transaction status code
    */
-  int
+  enum GNUNET_DB_QueryStatus
   (*insert_aggregation_tracking)(void *cls,
                                  struct TALER_EXCHANGEDB_Session *session,
                                  const struct TALER_WireTransferIdentifierRawP 
*wtid,
@@ -1774,11 +1771,9 @@ struct TALER_EXCHANGEDB_Plugin
    * @param end_date when does the fee end being valid
    * @param wire_fee how high is the wire transfer fee
    * @param master_sig signature over the above by the exchange master key
-   * @return #GNUNET_OK on success or if the record exists,
-   *         #GNUNET_NO on transient errors,
-   *         #GNUNET_SYSERR on failure
+   * @return transaction status code
    */
-  int
+  enum GNUNET_DB_QueryStatus
   (*insert_wire_fee)(void *cls,
                      struct TALER_EXCHANGEDB_Session *session,
                      const char *wire_method,
@@ -1787,7 +1782,7 @@ struct TALER_EXCHANGEDB_Plugin
                      const struct TALER_Amount *wire_fee,
                      const struct TALER_MasterSignatureP *master_sig);
 
-
+  
   /**
    * Obtain wire fee from database.
    *
@@ -1879,9 +1874,9 @@ struct TALER_EXCHANGEDB_Plugin
    * @param cls closure
    * @param session database connection
    * @param rowid which entry to mark as finished
-   * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+   * @return transaction status code
    */
-  int
+  enum GNUNET_DB_QueryStatus
   (*wire_prepare_data_mark_finished)(void *cls,
                                      struct TALER_EXCHANGEDB_Session *session,
                                      uint64_t rowid);
@@ -1895,11 +1890,9 @@ struct TALER_EXCHANGEDB_Plugin
    * @param session database connection
    * @param cb function to call for ONE unfinished item
    * @param cb_cls closure for @a cb
-   * @return #GNUNET_OK on success,
-   *         #GNUNET_NO if there are no entries,
-   *         #GNUNET_SYSERR on DB errors
+   * @return transaction status code
    */
-  int
+  enum GNUNET_DB_QueryStatus
   (*wire_prepare_data_get)(void *cls,
                            struct TALER_EXCHANGEDB_Session *session,
                            TALER_EXCHANGEDB_WirePreparationIterator cb,
@@ -1930,10 +1923,9 @@ struct TALER_EXCHANGEDB_Plugin
    * @param wtid subject of the wire transfer
    * @param wire_account details about the receiver account of the wire 
transfer
    * @param amount amount that was transmitted
-   * @return #GNUNET_OK on success
-   *         #GNUNET_SYSERR on DB errors
+   * @return transaction status code
    */
-  int
+  enum GNUNET_DB_QueryStatus
   (*store_wire_transfer_out)(void *cls,
                              struct TALER_EXCHANGEDB_Session *session,
                              struct GNUNET_TIME_Absolute date,

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]