gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-exchange] branch master updated: preparations for sharded wirewat


From: gnunet
Subject: [taler-exchange] branch master updated: preparations for sharded wirewatch
Date: Sun, 20 Jun 2021 16:41:06 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository exchange.

The following commit(s) were added to refs/heads/master by this push:
     new 108bf57d preparations for sharded wirewatch
108bf57d is described below

commit 108bf57d048a135cb71f9453540c9d6579ae2028
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sun Jun 20 16:41:04 2021 +0200

    preparations for sharded wirewatch
---
 src/exchange/taler-exchange-wirewatch.c     |  22 ++-
 src/exchangedb/exchange-0002.sql            |  34 +++-
 src/exchangedb/plugin_exchangedb_postgres.c | 297 +++++++++++++++++++++++++++-
 src/include/taler_exchangedb_plugin.h       |  41 +++-
 4 files changed, 382 insertions(+), 12 deletions(-)

diff --git a/src/exchange/taler-exchange-wirewatch.c 
b/src/exchange/taler-exchange-wirewatch.c
index 760dbe10..28fa81e7 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -528,8 +528,20 @@ find_transfers (void *cls)
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
+  wa_pos->delay = true;
+  wa_pos->current_batch_size = 0; /* reset counter */
+  wa_pos->session = session;
+  if (wa_pos->shard_end == wa_pos->last_row_off)
+  {
+    /* advance to next shard */
+    // FIXME: if other processes are running in parallel,
+    // update 'last_row_off' to next free shard!
+    wa_pos->shard_end = wa_pos->last_row_off + shard_size;
+  }
   if (! wa_pos->reset_mode)
   {
+    // FIXME: need good way to fetch
+    // shard data here!
     qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
                                                      session,
                                                      wa_pos->section_name,
@@ -553,16 +565,8 @@ find_transfers (void *cls)
                                        NULL);
       return;
     }
-    wa_pos->reset_mode = GNUNET_NO;
-  }
-  wa_pos->delay = true;
-  wa_pos->current_batch_size = 0; /* reset counter */
-  wa_pos->session = session;
-  if (wa_pos->shard_end == wa_pos->last_row_off)
-  {
-    /* advance to next shard */
-    wa_pos->shard_end += shard_size;
   }
+  wa_pos->reset_mode = true;
   limit = GNUNET_MIN (wa_pos->batch_size,
                       wa_pos->shard_end - wa_pos->last_row_off);
   GNUNET_assert (NULL == wa_pos->hh);
diff --git a/src/exchangedb/exchange-0002.sql b/src/exchangedb/exchange-0002.sql
index b03a7b51..361b69b8 100644
--- a/src/exchangedb/exchange-0002.sql
+++ b/src/exchangedb/exchange-0002.sql
@@ -1,6 +1,6 @@
 --
 -- This file is part of TALER
--- Copyright (C) 2020 Taler Systems SA
+-- Copyright (C) 2020-2021 Taler Systems SA
 --
 -- TALER is free software; you can redistribute it and/or modify it under the
 -- terms of the GNU General Public License as published by the Free Software
@@ -374,5 +374,37 @@ COMMENT ON TABLE signkey_revocations
   IS 'remembering which online signing keys have been revoked';
 
 
+
+CREATE TABLE IF NOT EXISTS work_shards
+  (shard_serial_id BIGSERIAL UNIQUE
+  ,last_attempt INT8 NOT NULL
+  ,start_row INT8 NOT NULL
+  ,end_row INT8 NOT NULL
+  ,completed BOOLEAN NOT NULL
+  ,job_name VARCHAR NOT NULL
+  ,PRIMARY KEY (job_name, start_row)
+  );
+CREATE INDEX IF NOT EXISTS work_shards_index
+  ON work_shards
+  (job_name
+  ,completed
+  ,last_attempt
+  );
+COMMENT ON TABLE work_shards
+  IS 'coordinates work between multiple processes working on the same job';
+COMMENT ON COLUMN work_shards.shard_serial_id
+  IS 'unique serial number identifying the shard';
+COMMENT ON COLUMN work_shards.last_attempt
+  IS 'last time a worker attempted to work on the shard';
+COMMENT ON COLUMN work_shards.completed
+  IS 'set to TRUE once the shard is finished by a worker';
+COMMENT ON COLUMN work_shards.start_row
+  IS 'row at which the shard scope starts, inclusive';
+COMMENT ON COLUMN work_shards.end_row
+  IS 'row at which the shard scope ends, exclusive';
+COMMENT ON COLUMN work_shards.job_name
+  IS 'unique name of the job the workers on this shard are performing';
+
+
 -- Complete transaction
 COMMIT;
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c 
b/src/exchangedb/plugin_exchangedb_postgres.c
index 04dc03cd..e61a1ac7 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1,6 +1,6 @@
 /*
   This file is part of TALER
-  Copyright (C) 2014--2020 Taler Systems SA
+  Copyright (C) 2014--2021 Taler Systems SA
 
   TALER is free software; you can redistribute it and/or modify it under the
   terms of the GNU General Public License as published by the Free Software
@@ -2438,6 +2438,52 @@ postgres_get_session (void *cls)
                               ") VALUES "
                               "($1, $2, $3, $4, $5, $6, $7, $8);",
                               8),
+
+      /* Used in #postgres_begin_shard() */
+      GNUNET_PQ_make_prepare ("get_open_shard",
+                              "SELECT"
+                              " start_row"
+                              ",end_row"
+                              " FROM work_shards"
+                              " WHERE job_name=$1"
+                              "   AND last_attempt<$2"
+                              "   AND completed=FALSE"
+                              " ORDER BY last_attempt ASC"
+                              " LIMIT 1;",
+                              2),
+      GNUNET_PQ_make_prepare ("reclaim_shard",
+                              "UPDATE work_shards"
+                              " SET last_attempt=$2"
+                              " WHERE job_name=$1"
+                              "   AND start_row=$3"
+                              "   AND end_row=$4",
+                              4),
+      GNUNET_PQ_make_prepare ("get_last_shard",
+                              "SELECT"
+                              " end_row"
+                              " FROM work_shards"
+                              " WHERE job_name=$1"
+                              "   AND completed=FALSE"
+                              " ORDER BY end_row DESC"
+                              " LIMIT 1;",
+                              1),
+      GNUNET_PQ_make_prepare ("claim_next_shard",
+                              "INSERT INTO work_shards"
+                              "(job_name"
+                              ",last_attempt"
+                              ",start_row"
+                              ",end_row"
+                              ") VALUES "
+                              "($1, $2, $3, $4);",
+                              4),
+      /* Used in #postgres_complete_shard() */
+      GNUNET_PQ_make_prepare ("complete_shard",
+                              "UPDATE work_shards"
+                              " SET completed=TRUE"
+                              " WHERE job_name=$1"
+                              "   AND start_row=$2"
+                              "   AND end_row=$3",
+                              3),
       GNUNET_PQ_PREPARED_STATEMENT_END
     };
 
@@ -10149,6 +10195,251 @@ postgres_insert_records_by_table (void *cls,
 }
 
 
+/**
+ * Function called to grab a work shard on an operation @a op. Runs in its
+ * own transaction (hence no session provided).
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param job_name name of the operation to grab a word shard for
+ * @param delay minimum age of a shard to grab
+ * @param size desired shard size
+ * @param[out] start_row inclusive start row of the shard (returned)
+ * @param[out] end_row exclusive end row of the shard (returned)
+ * @return transaction status code
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_begin_shard (void *cls,
+                      const char *job_name,
+                      struct GNUNET_TIME_Relative delay,
+                      uint64_t shard_size,
+                      uint64_t *start_row,
+                      uint64_t *end_row)
+{
+  struct TALER_EXCHANGEDB_Session *session;
+
+  session = postgres_get_session (cls);
+  if (NULL == session)
+    return GNUNET_DB_STATUS_HARD_ERROR;
+  for (unsigned int retries = 0; retries<3; retries++)
+  {
+    if (GNUNET_OK !=
+        postgres_start (cls,
+                        session,
+                        "begin_shard"))
+    {
+      GNUNET_break (0);
+      return GNUNET_DB_STATUS_HARD_ERROR;
+    }
+
+    {
+      struct GNUNET_TIME_Absolute past;
+      enum GNUNET_DB_QueryStatus qs;
+      struct GNUNET_PQ_QueryParam params[] = {
+        GNUNET_PQ_query_param_string (job_name),
+        GNUNET_PQ_query_param_absolute_time (&past),
+        GNUNET_PQ_query_param_end
+      };
+      struct GNUNET_PQ_ResultSpec rs[] = {
+        GNUNET_PQ_result_spec_uint64 ("start_row",
+                                      start_row),
+        GNUNET_PQ_result_spec_uint64 ("end_row",
+                                      end_row),
+        GNUNET_PQ_result_spec_end
+      };
+
+      past = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
+                                            delay);
+      qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
+                                                     "get_open_shard",
+                                                     params,
+                                                     rs);
+      switch (qs)
+      {
+      case GNUNET_DB_STATUS_HARD_ERROR:
+        GNUNET_break (0);
+        postgres_rollback (cls,
+                           session);
+        return qs;
+      case GNUNET_DB_STATUS_SOFT_ERROR:
+        postgres_rollback (cls,
+                           session);
+        continue;
+      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+        {
+          enum GNUNET_DB_QueryStatus qs;
+          struct GNUNET_TIME_Absolute now;
+          struct GNUNET_PQ_QueryParam params[] = {
+            GNUNET_PQ_query_param_string (job_name),
+            GNUNET_PQ_query_param_absolute_time (&now),
+            GNUNET_PQ_query_param_uint64 (start_row),
+            GNUNET_PQ_query_param_uint64 (end_row),
+            GNUNET_PQ_query_param_end
+          };
+
+          now = GNUNET_TIME_absolute_get ();
+          qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
+                                                   "reclaim_shard",
+                                                   params);
+          switch (qs)
+          {
+          case GNUNET_DB_STATUS_HARD_ERROR:
+            GNUNET_break (0);
+            postgres_rollback (cls,
+                               session);
+            return qs;
+          case GNUNET_DB_STATUS_SOFT_ERROR:
+            postgres_rollback (cls,
+                               session);
+            continue;
+          case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+            goto commit;
+          case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+            GNUNET_break (0); /* logic error, should be impossible */
+            postgres_rollback (cls,
+                               session);
+            return GNUNET_DB_STATUS_HARD_ERROR;
+          }
+        }
+        break; /* actually unreachable */
+      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+        break; /* continued below */
+      }
+    } /* get_open_shard */
+
+    /* No open shard, find last 'end_row' */
+    {
+      enum GNUNET_DB_QueryStatus qs;
+      struct GNUNET_PQ_QueryParam params[] = {
+        GNUNET_PQ_query_param_string (job_name),
+        GNUNET_PQ_query_param_end
+      };
+      struct GNUNET_PQ_ResultSpec rs[] = {
+        GNUNET_PQ_result_spec_uint64 ("end_row",
+                                      start_row),
+        GNUNET_PQ_result_spec_end
+      };
+
+      qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
+                                                     "get_last_shard",
+                                                     params,
+                                                     rs);
+      switch (qs)
+      {
+      case GNUNET_DB_STATUS_HARD_ERROR:
+        GNUNET_break (0);
+        postgres_rollback (cls,
+                           session);
+        return qs;
+      case GNUNET_DB_STATUS_SOFT_ERROR:
+        postgres_rollback (cls,
+                           session);
+        continue;
+      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+        break;
+      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+        *start_row = 0; /* base-case: no shards yet */
+        break; /* continued below */
+      }
+      *end_row = *start_row + shard_size;
+    } /* get_last_shard */
+
+    /* Claim fresh shard */
+    {
+      enum GNUNET_DB_QueryStatus qs;
+      struct GNUNET_TIME_Absolute now;
+      struct GNUNET_PQ_QueryParam params[] = {
+        GNUNET_PQ_query_param_string (job_name),
+        GNUNET_PQ_query_param_absolute_time (&now),
+        GNUNET_PQ_query_param_uint64 (start_row),
+        GNUNET_PQ_query_param_uint64 (end_row),
+        GNUNET_PQ_query_param_end
+      };
+
+      now = GNUNET_TIME_absolute_get ();
+      qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
+                                               "claim_next_shard",
+                                               params);
+      switch (qs)
+      {
+      case GNUNET_DB_STATUS_HARD_ERROR:
+        GNUNET_break (0);
+        postgres_rollback (cls,
+                           session);
+        return qs;
+      case GNUNET_DB_STATUS_SOFT_ERROR:
+        postgres_rollback (cls,
+                           session);
+        continue;
+      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+        /* continued below */
+        break;
+      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+        GNUNET_break (0);
+        postgres_rollback (cls,
+                           session);
+        continue;
+      }
+    } /* claim_next_shard */
+
+    /* commit */
+commit:
+    {
+      enum GNUNET_DB_QueryStatus qs;
+
+      qs = postgres_commit (cls,
+                            session);
+      switch (qs)
+      {
+      case GNUNET_DB_STATUS_HARD_ERROR:
+        GNUNET_break (0);
+        postgres_rollback (cls,
+                           session);
+        return qs;
+      case GNUNET_DB_STATUS_SOFT_ERROR:
+        postgres_rollback (cls,
+                           session);
+        continue;
+      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+        return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+      }
+    }
+  } /* retry 'for' loop */
+  return GNUNET_DB_STATUS_SOFT_ERROR;
+}
+
+
+/**
+ * Function called to persist that work on a shard was completed.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session a session
+ * @param job_name name of the operation to grab a word shard for
+ * @param start_row inclusive start row of the shard
+ * @param end_row exclusive end row of the shard
+ * @return transaction status code
+ */
+enum GNUNET_DB_QueryStatus
+postgres_complete_shard (void *cls,
+                         struct TALER_EXCHANGEDB_Session *session,
+                         const char *job_name,
+                         uint64_t start_row,
+                         uint64_t end_row)
+{
+  struct GNUNET_PQ_QueryParam params[] = {
+    GNUNET_PQ_query_param_string (job_name),
+    GNUNET_PQ_query_param_uint64 (&start_row),
+    GNUNET_PQ_query_param_uint64 (&end_row),
+    GNUNET_PQ_query_param_end
+  };
+
+  (void) cls;
+  return GNUNET_PQ_eval_prepared_non_select (session->conn,
+                                             "complete_shard",
+                                             params);
+}
+
+
 /**
  * Initialize Postgres database subsystem.
  *
@@ -10353,6 +10644,10 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
     = &postgres_lookup_records_by_table;
   plugin->insert_records_by_table
     = &postgres_insert_records_by_table;
+  plugin->begin_shard
+    = &postgres_begin_shard;
+  plugin->complete_shard
+    = &postgres_complete_shard;
   return plugin;
 }
 
diff --git a/src/include/taler_exchangedb_plugin.h 
b/src/include/taler_exchangedb_plugin.h
index 686edb86..75e8f8bb 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -1,6 +1,6 @@
 /*
   This file is part of TALER
-  Copyright (C) 2014-2020 Taler Systems SA
+  Copyright (C) 2014-2021 Taler Systems SA
 
   TALER is free software; you can redistribute it and/or modify it under the
   terms of the GNU General Public License as published by the Free Software
@@ -3820,6 +3820,45 @@ struct TALER_EXCHANGEDB_Plugin
                              struct TALER_EXCHANGEDB_Session *session,
                              const struct TALER_EXCHANGEDB_TableData *td);
 
+
+  /**
+   * Function called to grab a work shard on an operation @a op. Runs in its
+   * own transaction (hence no session provided).
+   *
+   * @param cls the @e cls of this struct with the plugin-specific state
+   * @param job_name name of the operation to grab a word shard for
+   * @param delay minimum age of a shard to grab
+   * @param size desired shard size
+   * @param[out] start_row inclusive start row of the shard (returned)
+   * @param[out] end_row exclusive end row of the shard (returned)
+   * @return transaction status code
+   */
+  enum GNUNET_DB_QueryStatus
+  (*begin_shard)(void *cls,
+                 const char *job_name,
+                 struct GNUNET_TIME_Relative delay,
+                 uint64_t shard_size,
+                 uint64_t *start_row,
+                 uint64_t *end_row);
+
+
+  /**
+   * Function called to persist that work on a shard was completed.
+   *
+   * @param cls the @e cls of this struct with the plugin-specific state
+   * @param session a session
+   * @param job_name name of the operation to grab a word shard for
+   * @param start_row inclusive start row of the shard
+   * @param end_row exclusive end row of the shard
+   * @return transaction status code
+   */
+  enum GNUNET_DB_QueryStatus
+  (*complete_shard)(void *cls,
+                    struct TALER_EXCHANGEDB_Session *session,
+                    const char *job_name,
+                    uint64_t start_row,
+                    uint64_t end_row);
+
 };
 
 #endif /* _TALER_EXCHANGE_DB_H */

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]