gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-anastasis] branch master updated: async external reducer, externa


From: gnunet
Subject: [taler-anastasis] branch master updated: async external reducer, external reducer for start state
Date: Wed, 06 Oct 2021 10:54:19 +0200

This is an automated email from the git hooks/post-receive script.

dold pushed a commit to branch master
in repository anastasis.

The following commit(s) were added to refs/heads/master by this push:
     new f5a6b37  async external reducer, external reducer for start state
f5a6b37 is described below

commit f5a6b377fb13f46b073a892410237c54eceee4ed
Author: Florian Dold <florian@dold.me>
AuthorDate: Wed Oct 6 10:54:04 2021 +0200

    async external reducer, external reducer for start state
---
 contrib/gana                               |   2 +-
 src/reducer/anastasis_api_backup_redux.c   |  62 +++++
 src/reducer/anastasis_api_recovery_redux.c |  62 +++++
 src/reducer/anastasis_api_redux.c          | 354 +++++++++++++++++++----------
 src/reducer/anastasis_api_redux.h          |  10 +
 5 files changed, 366 insertions(+), 124 deletions(-)

diff --git a/contrib/gana b/contrib/gana
index 90aee6a..f126ffd 160000
--- a/contrib/gana
+++ b/contrib/gana
@@ -1 +1 @@
-Subproject commit 90aee6a0ba5c9e3e52074a2dabe1b3b9421ad165
+Subproject commit f126ffd32255c68f4fbef5e9ef849ef04855b0a9
diff --git a/src/reducer/anastasis_api_backup_redux.c 
b/src/reducer/anastasis_api_backup_redux.c
index 27b5730..cb3bd5a 100644
--- a/src/reducer/anastasis_api_backup_redux.c
+++ b/src/reducer/anastasis_api_backup_redux.c
@@ -177,6 +177,68 @@ json_t *
 ANASTASIS_backup_start (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   json_t *initial_state;
+  const char *external_reducer = ANASTASIS_REDUX_probe_external_reducer ();
+
+  if (NULL != external_reducer)
+  {
+    int pipefd_stdout[2];
+    pid_t pid = 0;
+    int status;
+    FILE *reducer_stdout;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Using external reducer '%s' for backup start status\n",
+                external_reducer);
+
+    GNUNET_assert (0 == pipe (pipefd_stdout));
+    pid = fork ();
+    if (pid == 0)
+    {
+      close (pipefd_stdout[0]);
+      dup2 (pipefd_stdout[1], STDOUT_FILENO);
+      execlp (external_reducer,
+              external_reducer,
+              "-b",
+              NULL);
+      GNUNET_assert (0);
+    }
+
+    close (pipefd_stdout[1]);
+    reducer_stdout = fdopen (pipefd_stdout[0],
+                             "r");
+    {
+      json_error_t err;
+
+      initial_state = json_loadf (reducer_stdout,
+                                  0,
+                                  &err);
+
+      if (NULL == initial_state)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                    "External reducer did not output valid JSON: %s:%d:%d 
%s\n",
+                    err.source,
+                    err.line,
+                    err.column,
+                    err.text);
+        GNUNET_assert (0 == fclose (reducer_stdout));
+        waitpid (pid, &status, 0);
+        return NULL;
+      }
+    }
+
+    GNUNET_assert (NULL != initial_state);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Waiting for external reducer to terminate.\n");
+    GNUNET_assert (0 == fclose (reducer_stdout));
+    reducer_stdout = NULL;
+    waitpid (pid, &status, 0);
+
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "External reducer finished with exit status '%d'\n",
+                status);
+    return initial_state;
+  }
 
   (void) cfg;
   initial_state = ANASTASIS_REDUX_load_continents_ ();
diff --git a/src/reducer/anastasis_api_recovery_redux.c 
b/src/reducer/anastasis_api_recovery_redux.c
index 59f5ae3..94e5e10 100644
--- a/src/reducer/anastasis_api_recovery_redux.c
+++ b/src/reducer/anastasis_api_recovery_redux.c
@@ -83,6 +83,68 @@ json_t *
 ANASTASIS_recovery_start (const struct GNUNET_CONFIGURATION_Handle *cfg)
 {
   json_t *initial_state;
+  const char *external_reducer = ANASTASIS_REDUX_probe_external_reducer ();
+
+  if (NULL != external_reducer)
+  {
+    int pipefd_stdout[2];
+    pid_t pid = 0;
+    int status;
+    FILE *reducer_stdout;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Using external reducer '%s' for recovery start status\n",
+                external_reducer);
+
+    GNUNET_assert (0 == pipe (pipefd_stdout));
+    pid = fork ();
+    if (pid == 0)
+    {
+      close (pipefd_stdout[0]);
+      dup2 (pipefd_stdout[1], STDOUT_FILENO);
+      execlp (external_reducer,
+              external_reducer,
+              "-r",
+              NULL);
+      GNUNET_assert (0);
+    }
+
+    close (pipefd_stdout[1]);
+    reducer_stdout = fdopen (pipefd_stdout[0],
+                             "r");
+    {
+      json_error_t err;
+
+      initial_state = json_loadf (reducer_stdout,
+                                  0,
+                                  &err);
+
+      if (NULL == initial_state)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                    "External reducer did not output valid JSON: %s:%d:%d 
%s\n",
+                    err.source,
+                    err.line,
+                    err.column,
+                    err.text);
+        GNUNET_assert (0 == fclose (reducer_stdout));
+        waitpid (pid, &status, 0);
+        return NULL;
+      }
+    }
+
+    GNUNET_assert (NULL != initial_state);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Waiting for external reducer to terminate.\n");
+    GNUNET_assert (0 == fclose (reducer_stdout));
+    reducer_stdout = NULL;
+    waitpid (pid, &status, 0);
+
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "External reducer finished with exit status '%d'\n",
+                status);
+    return initial_state;
+  }
 
   (void) cfg;
   initial_state = ANASTASIS_REDUX_load_continents_ ();
diff --git a/src/reducer/anastasis_api_redux.c 
b/src/reducer/anastasis_api_redux.c
index 92801f4..59770f3 100644
--- a/src/reducer/anastasis_api_redux.c
+++ b/src/reducer/anastasis_api_redux.c
@@ -234,6 +234,26 @@ static json_t *redux_countries;
  */
 static json_t *provider_list;
 
+/**
+ * External reducer binary or NULL
+ * to use internal reducer.
+ */
+static char *external_reducer_binary;
+
+
+const char *
+ANASTASIS_REDUX_probe_external_reducer (void)
+{
+  if (NULL != external_reducer_binary)
+    return external_reducer_binary;
+  external_reducer_binary = getenv ("ANASTASIS_EXTERNAL_REDUCER");
+  if (NULL != external_reducer_binary)
+    unsetenv ("ANASTASIS_EXTERNAL_REDUCER");
+
+  return external_reducer_binary;
+
+}
+
 
 /**
  * Extract the mode of a state from json
@@ -1480,38 +1500,145 @@ typedef struct ANASTASIS_ReduxAction *
                    ANASTASIS_ActionCallback cb,
                    void *cb_cls);
 
+
 /**
- * Dummy cleanup function.
+ * Closure for read operations on the external reducer.
  */
-static void
-dummy_cleanup (void *cls)
+struct ExternalReducerCls
 {
-  GNUNET_assert (0);
-}
-
+  struct GNUNET_Buffer read_buffer;
+  struct GNUNET_SCHEDULER_Task *read_task;
+  struct GNUNET_DISK_PipeHandle *reducer_stdin;
+  struct GNUNET_DISK_PipeHandle *reducer_stdout;
+  struct GNUNET_OS_Process *reducer_process;
+  ANASTASIS_ActionCallback action_cb;
+  void *action_cb_cls;
+};
 
 /**
- * Closure for external_redux_done.
+ * Clean up and destroy the external reducer state.
+ *
+ * @param cls closure, a 'struct ExternalReducerCls *'
  */
-struct ExternalReduxCls
+static void
+cleanup_external_reducer (void *cls)
 {
-  ANASTASIS_ActionCallback cb;
-  void *cb_cls;
-  json_t *new_state;
-};
+  struct ExternalReducerCls *red_cls = cls;
+
+  if (NULL != red_cls->read_task)
+  {
+    GNUNET_SCHEDULER_cancel (red_cls->read_task);
+    red_cls->read_task = NULL;
+  }
+
+  GNUNET_buffer_clear (&red_cls->read_buffer);
+  if (NULL != red_cls->reducer_stdin)
+  {
+    GNUNET_DISK_pipe_close (red_cls->reducer_stdin);
+    red_cls->reducer_stdin = NULL;
+  }
+  if (NULL != red_cls->reducer_stdout)
+  {
+    GNUNET_DISK_pipe_close (red_cls->reducer_stdout);
+    red_cls->reducer_stdout = NULL;
+  }
+
+  if (NULL != red_cls->reducer_process)
+  {
+    enum GNUNET_OS_ProcessStatusType type;
+    unsigned long code;
+    enum GNUNET_GenericReturnValue pwret;
+
+    pwret = GNUNET_OS_process_wait_status (red_cls->reducer_process,
+                                           &type,
+                                           &code);
+
+    GNUNET_assert (GNUNET_SYSERR != pwret);
+    if (GNUNET_NO == pwret)
+    {
+      GNUNET_OS_process_kill (red_cls->reducer_process,
+                              SIGTERM);
+      GNUNET_assert (GNUNET_SYSERR != GNUNET_OS_process_wait (
+                       red_cls->reducer_process));
+    }
+
+    GNUNET_OS_process_destroy (red_cls->reducer_process);
+    red_cls->reducer_process = NULL;
+  }
+
+  GNUNET_free (red_cls);
+}
 
 
 /**
- * Callback called when the redux action has been processed by
- * the external reducer.
+ * Task called when
+ *
+ * @param cls closure, a 'struct ExternalReducerCls *'
  */
 static void
-external_redux_done (void *cls)
+external_reducer_read_cb (void *cls)
 {
-  struct ExternalReduxCls *erc = cls;
-  erc->cb (erc->cb_cls,
-           TALER_EC_NONE,
-           erc->new_state);
+  struct ExternalReducerCls *red_cls = cls;
+  ssize_t sret;
+  char buf[256];
+
+  red_cls->read_task = NULL;
+
+  sret = GNUNET_DISK_file_read (GNUNET_DISK_pipe_handle (
+                                  red_cls->reducer_stdout,
+                                  GNUNET_DISK_PIPE_END_READ),
+                                buf,
+                                256);
+  if (sret < 0)
+  {
+    GNUNET_break (0);
+    red_cls->action_cb (red_cls->action_cb_cls,
+                        TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR,
+                        NULL);
+    cleanup_external_reducer (red_cls);
+    return;
+  }
+  else if (0 == sret)
+  {
+    char *str = GNUNET_buffer_reap_str (&red_cls->read_buffer);
+    json_t *json;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Got external reducer response: '%s'\n",
+                str);
+
+    json = json_loads (str, 0, NULL);
+
+    if (NULL == json)
+    {
+      GNUNET_break (0);
+      red_cls->action_cb (red_cls->action_cb_cls,
+                          TALER_EC_ANASTASIS_REDUCER_INTERNAL_ERROR,
+                          NULL);
+      cleanup_external_reducer (red_cls);
+      return;
+    }
+
+    red_cls->action_cb (red_cls->action_cb_cls,
+                        TALER_EC_NONE,
+                        json);
+    cleanup_external_reducer (red_cls);
+    return;
+  }
+  else
+  {
+    GNUNET_buffer_write (&red_cls->read_buffer,
+                         buf,
+                         sret);
+
+    red_cls->read_task = GNUNET_SCHEDULER_add_read_file (
+      GNUNET_TIME_UNIT_FOREVER_REL,
+      GNUNET_DISK_pipe_handle (
+        red_cls->reducer_stdout,
+        GNUNET_DISK_PIPE_END_READ),
+      external_reducer_read_cb,
+      red_cls);
+  }
 }
 
 
@@ -1527,116 +1654,95 @@ redux_action_external (const char *ext_reducer,
                        ANASTASIS_ActionCallback cb,
                        void *cb_cls)
 {
-  struct ANASTASIS_ReduxAction *act = NULL;
-  int pipefd_stdout[2];
-  int pipefd_stdin[2];
-  pid_t pid = 0;
-  int status;
-  FILE *reducer_stdout;
-  FILE *reducer_stdin;
-  json_t *next_state;
+  char *arg_str;
+  char *state_str = json_dumps (state, JSON_COMPACT);
+  ssize_t sret;
+  struct ExternalReducerCls *red_cls = GNUNET_new (struct ExternalReducerCls);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Using external reducer '%s'\n",
-              ext_reducer);
-
-  GNUNET_assert (0 == pipe (pipefd_stdout));
-  GNUNET_assert (0 == pipe (pipefd_stdin));
-  pid = fork ();
-  if (pid == 0)
-  {
-    /* Child */
-
-    char *arg_str = json_dumps (arguments, JSON_COMPACT);
-
-    close (pipefd_stdout[0]);
-    dup2 (pipefd_stdout[1], STDOUT_FILENO);
-
-    close (pipefd_stdin[1]);
-    dup2 (pipefd_stdin[0], STDIN_FILENO);
-
-    /* Unset environment variable, otherwise anastasis-reducer
-       would recursively shell out to itself. */
-    unsetenv ("ANASTASIS_EXTERNAL_REDUCER");
-
-    execlp (ext_reducer,
-            ext_reducer,
-            "-a",
-            arg_str,
-            action,
-            NULL);
-    GNUNET_assert (0);
-  }
-
-  /* Only parent reaches here */
-
-  close (pipefd_stdout[1]);
-  close (pipefd_stdin[0]);
+  if (NULL == arguments)
+    arg_str = GNUNET_strdup ("{}");
+  else
+    arg_str = json_dumps (arguments, JSON_COMPACT);
 
-  reducer_stdout = fdopen (pipefd_stdout[0],
-                           "r");
-  reducer_stdin = fdopen (pipefd_stdin[1],
-                          "w");
+  red_cls->action_cb = cb;
+  red_cls->action_cb_cls = cb_cls;
 
-  GNUNET_assert (0 == json_dumpf (state,
-                                  reducer_stdin,
-                                  JSON_COMPACT));
+  GNUNET_assert (NULL != (red_cls->reducer_stdin = GNUNET_DISK_pipe (
+                            GNUNET_DISK_PF_NONE)));
+  GNUNET_assert (NULL != (red_cls->reducer_stdout = GNUNET_DISK_pipe (
+                            GNUNET_DISK_PF_NONE)));
 
-  GNUNET_assert (0 == fclose (reducer_stdin));
-  reducer_stdin = NULL;
+  /* By the time we're here, this variable should be unset, because
+     otherwise using anastasis-reducer as the external reducer
+     will lead to infinite recursion. */
+  GNUNET_assert (NULL == getenv ("ANASTASIS_EXTERNAL_REDUCER"));
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Wrote old state to reducer stdin.\n");
-
+              "Starting external reducer with action '%s' and argument '%s'\n",
+              action,
+              arg_str);
+
+  red_cls->reducer_process = GNUNET_OS_start_process 
(GNUNET_OS_INHERIT_STD_ERR,
+                                                      red_cls->reducer_stdin,
+                                                      red_cls->reducer_stdout,
+                                                      NULL,
+                                                      ext_reducer,
+                                                      ext_reducer,
+                                                      "-a",
+                                                      arg_str,
+                                                      action,
+                                                      NULL);
+
+  GNUNET_free (arg_str);
+
+  if (NULL == red_cls->reducer_process)
   {
-    json_error_t err;
-
-    next_state = json_loadf (reducer_stdout,
-                             0,
-                             &err);
-
-    if (NULL == next_state)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                  "External reducer did not output valid JSON: %s:%d:%d %s\n",
-                  err.source,
-                  err.line,
-                  err.column,
-                  err.text);
-      GNUNET_assert (0 == fclose (reducer_stdout));
-      waitpid (pid, &status, 0);
-      return NULL;
-    }
+    GNUNET_break (0);
+    GNUNET_free (state_str);
+    cleanup_external_reducer (red_cls);
+    return NULL;
   }
 
-  /* FIXME: report error instead! */
-  GNUNET_assert (NULL != next_state);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Waiting for external reducer to terminate.\n");
-  GNUNET_assert (0 == fclose (reducer_stdout));
-  reducer_stdout = NULL;
-  waitpid (pid, &status, 0);
+  /* Close pipe ends we don't use. */
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin,
+                                             GNUNET_DISK_PIPE_END_READ));
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_DISK_pipe_close_end (red_cls->reducer_stdout,
+                                             GNUNET_DISK_PIPE_END_WRITE));
+
+  sret = GNUNET_DISK_file_write_blocking (GNUNET_DISK_pipe_handle (
+                                            red_cls->reducer_stdin,
+                                            GNUNET_DISK_PIPE_END_WRITE),
+                                          state_str,
+                                          strlen (state_str));
+  GNUNET_free (state_str);
+  if (sret <= 0)
+  {
+    GNUNET_break (0);
+    cleanup_external_reducer (red_cls);
+    return NULL;
+  }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "External reducer finished with exit status '%d'\n",
-              status);
+  GNUNET_assert (GNUNET_OK ==
+                 GNUNET_DISK_pipe_close_end (red_cls->reducer_stdin,
+                                             GNUNET_DISK_PIPE_END_WRITE));
 
-  act = GNUNET_new (struct ANASTASIS_ReduxAction);
-  /* Callback is called immediately, cleanup must never be called */
-  act->cleanup = &dummy_cleanup;
+  red_cls->read_task = GNUNET_SCHEDULER_add_read_file (
+    GNUNET_TIME_UNIT_FOREVER_REL,
+    GNUNET_DISK_pipe_handle (
+      red_cls->reducer_stdout,
+      GNUNET_DISK_PIPE_END_READ),
+    external_reducer_read_cb,
+    red_cls);
 
   {
-    struct ExternalReduxCls *sched_cls = GNUNET_new (struct ExternalReduxCls);
-
-    sched_cls->cb = cb;
-    sched_cls->cb_cls = cb_cls;
-    sched_cls->new_state = next_state;
-    GNUNET_SCHEDULER_add_now (external_redux_done,
-                              sched_cls);
+    struct ANASTASIS_ReduxAction *ra = GNUNET_new (struct
+                                                   ANASTASIS_ReduxAction);
+    ra->cleanup_cls = red_cls;
+    ra->cleanup = cleanup_external_reducer;
+    return ra;
   }
-
-  return act;
 }
 
 
@@ -1699,16 +1805,18 @@ ANASTASIS_redux_action (const json_t *state,
   const char *s = json_string_value (json_object_get (state,
                                                       "backup_state"));
   enum ANASTASIS_GenericState gs;
-  const char *ext_reducer = getenv ("ANASTASIS_EXTERNAL_REDUCER");
 
   /* If requested, handle action with external reducer, used for testing. */
-  if (NULL != ext_reducer)
-    return redux_action_external (ext_reducer,
-                                  state,
-                                  action,
-                                  arguments,
-                                  cb,
-                                  cb_cls);
+  {
+    const char *ext_reducer = ANASTASIS_REDUX_probe_external_reducer ();
+    if (NULL != ext_reducer)
+      return redux_action_external (ext_reducer,
+                                    state,
+                                    action,
+                                    arguments,
+                                    cb,
+                                    cb_cls);
+  }
 
   if (NULL == s)
   {
diff --git a/src/reducer/anastasis_api_redux.h 
b/src/reducer/anastasis_api_redux.h
index 4d62d5e..b4fe5c4 100644
--- a/src/reducer/anastasis_api_redux.h
+++ b/src/reducer/anastasis_api_redux.h
@@ -327,6 +327,16 @@ ANASTASIS_backup_action_ (json_t *state,
                           void *cb_cls);
 
 
+/**
+ * Check if an external reducer binary is requested.
+ * Cache the result and unset the corresponding environment
+ * variable.
+ *
+ * @returns name of the external reducer or NULL to user internal reducer
+ */
+const char *
+ANASTASIS_REDUX_probe_external_reducer (void);
+
 /**
  * Generic container for an action with asynchronous activities.
  */

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]