gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] 01/04: Rewrite cadet tests using MQ API


From: gnunet
Subject: [GNUnet-SVN] [gnunet] 01/04: Rewrite cadet tests using MQ API
Date: Tue, 21 Feb 2017 13:49:32 +0100

This is an automated email from the git hooks/post-receive script.

bart-polot pushed a commit to branch master
in repository gnunet.

commit 291f4c74df104728a0a0bb105650b2f33f9b3d9d
Author: Bart Polot <address@hidden>
AuthorDate: Tue Feb 21 13:30:07 2017 +0100

    Rewrite cadet tests using MQ API
---
 src/cadet/.gitignore                         |   1 +
 src/cadet/Makefile.am                        |  28 +-
 src/cadet/cadet_test_lib_new.c               | 362 ++++++++++++++
 src/cadet/cadet_test_lib_new.h               | 106 ++++
 src/cadet/test_cadet.c                       |   4 +-
 src/cadet/{test_cadet.c => test_cadet_new.c} | 715 +++++++++++----------------
 6 files changed, 799 insertions(+), 417 deletions(-)

diff --git a/src/cadet/.gitignore b/src/cadet/.gitignore
index a38b8f495..a73006dae 100644
--- a/src/cadet/.gitignore
+++ b/src/cadet/.gitignore
@@ -21,3 +21,4 @@ test_cadet_local
 test_cadet_single
 gnunet-service-cadet-new
 test_cadet_local_mq
+test_cadet_2_forward_new
\ No newline at end of file
diff --git a/src/cadet/Makefile.am b/src/cadet/Makefile.am
index 1a51453c9..74791d66e 100644
--- a/src/cadet/Makefile.am
+++ b/src/cadet/Makefile.am
@@ -110,7 +110,7 @@ endif
 
 
 if HAVE_TESTING
- noinst_LIBRARIES = libgnunetcadettest.a $(noinst_LIB_EXP)
+ noinst_LIBRARIES = libgnunetcadettest.a libgnunetcadettestnew.a 
$(noinst_LIB_EXP)
  noinst_PROGRAMS = gnunet-cadet-profiler
 endif
 
@@ -124,6 +124,7 @@ libgnunetcadettest_a_LIBADD = \
 if HAVE_TESTING
 check_PROGRAMS = \
   test_cadet_local_mq \
+  test_cadet_2_forward_new \
   test_cadet_single \
   test_cadet_local \
   test_cadet_2_forward \
@@ -245,6 +246,31 @@ test_cadet_5_speed_reliable_backwards_SOURCES = \
 test_cadet_5_speed_reliable_backwards_LDADD = $(ld_cadet_test_lib)
 
 
+# NEW TESTS
+libgnunetcadettestnew_a_SOURCES = \
+  cadet_test_lib_new.c cadet_test_lib_new.h
+libgnunetcadettestnew_a_LIBADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testbed/libgnunettestbed.la \
+ libgnunetcadetnew.la
+
+ld_cadet_test_lib_new = \
+  $(top_builddir)/src/util/libgnunetutil.la \
+  $(top_builddir)/src/testing/libgnunettesting.la \
+  libgnunetcadetnew.la \
+  libgnunetcadettestnew.a \
+  $(top_builddir)/src/testbed/libgnunettestbed.la \
+  $(top_builddir)/src/statistics/libgnunetstatistics.la
+dep_cadet_test_lib_new = \
+  libgnunetcadetnew.la \
+  libgnunetcadettestnew.a \
+  $(top_builddir)/src/statistics/libgnunetstatistics.la
+
+test_cadet_2_forward_new_SOURCES = \
+  test_cadet_new.c
+test_cadet_2_forward_new_LDADD = $(ld_cadet_test_lib_new)
+
+
 if ENABLE_TEST_RUN
 AM_TESTS_ENVIRONMENT=export 
GNUNET_PREFIX=$${GNUNET_PREFIX:address@hidden@};export 
PATH=$${GNUNET_PREFIX:address@hidden@}/bin:$$PATH;unset XDG_DATA_HOME;unset 
XDG_CONFIG_HOME;
 TESTS = \
diff --git a/src/cadet/cadet_test_lib_new.c b/src/cadet/cadet_test_lib_new.c
new file mode 100644
index 000000000..c3a1540f4
--- /dev/null
+++ b/src/cadet/cadet_test_lib_new.c
@@ -0,0 +1,362 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2012, 2017 GNUnet e.V.
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
+*/
+/**
+ * @file cadet/cadet_test_lib.c
+ * @author Bartlomiej Polot
+ * @brief library for writing CADET tests
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "cadet_test_lib_new.h"
+#include "gnunet_cadet_service.h"
+
+
+/**
+ * Test context for a CADET Test.
+ */
+struct GNUNET_CADET_TEST_Context
+{
+  /**
+   * Array of running peers.
+   */
+  struct GNUNET_TESTBED_Peer **peers;
+
+  /**
+   * Array of handles to the CADET for each peer.
+   */
+  struct GNUNET_CADET_Handle **cadets;
+
+  /**
+   * Operation associated with the connection to the CADET.
+   */
+  struct GNUNET_TESTBED_Operation **ops;
+
+  /**
+   * Number of peers running, size of the arrays above.
+   */
+  unsigned int num_peers;
+
+  /**
+   * Main function of the test to run once all CADETs are available.
+   */
+  GNUNET_CADET_TEST_AppMain app_main;
+
+  /**
+   * Closure for 'app_main'.
+   */
+  void *app_main_cls;
+
+  /**
+   * Handler for incoming tunnels.
+   */
+  GNUNET_CADET_ConnectEventHandler connects;
+
+  /**
+   * Function called when the transmit window size changes.
+   */
+  GNUNET_CADET_WindowSizeEventHandler window_changes;
+
+  /**
+   * Cleaner for destroyed incoming tunnels.
+   */
+  GNUNET_CADET_DisconnectEventHandler disconnects;
+
+  /**
+   * Message handlers.
+   */
+  struct GNUNET_MQ_MessageHandler *handlers;
+
+  /**
+   * Application ports.
+   */
+  const struct GNUNET_HashCode **ports;
+
+  /**
+   * Number of ports in #ports.
+   */
+  unsigned int port_count;
+
+};
+
+
+/**
+ * Context for a cadet adapter callback.
+ */
+struct GNUNET_CADET_TEST_AdapterContext
+{
+  /**
+   * Peer number for the particular peer.
+   */
+  unsigned int peer;
+
+  /**
+   * Port handlers for open ports.
+   */
+  struct GNUNET_CADET_Port **ports;
+ 
+  /**
+   * General context.
+   */
+  struct GNUNET_CADET_TEST_Context *ctx;
+};
+
+
+/**
+ * Adapter function called to establish a connection to
+ * the CADET service.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer to connect to; will be available until
+ *          GNUNET_TESTBED_operation_done() is called on the operation returned
+ *          from GNUNET_TESTBED_service_connect()
+ * @return service handle to return in 'op_result', NULL on error
+ */
+static void *
+cadet_connect_adapter (void *cls,
+                       const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+  struct GNUNET_CADET_TEST_AdapterContext *actx = cls;
+  struct GNUNET_CADET_TEST_Context *ctx = actx->ctx;
+  struct GNUNET_CADET_Handle *h;
+  unsigned int i;
+
+  h = GNUNET_CADET_connecT (cfg);
+  if (NULL == ctx->ports)
+    return h;
+
+  actx->ports = GNUNET_new_array (ctx->port_count, struct GNUNET_CADET_Port *);
+  for (i = 0; i < ctx->port_count; i++)
+  {
+    actx->ports[i] = GNUNET_CADET_open_porT (h,
+                                             ctx->ports[i],
+                                             ctx->connects,
+                                             (void *) (long) actx->peer,
+                                             ctx->window_changes,
+                                             ctx->disconnects,
+                                             ctx->handlers);
+  }
+  return h;
+}
+
+
+/**
+ * Adapter function called to destroy a connection to
+ * the CADET service.
+ *
+ * @param cls closure
+ * @param op_result service handle returned from the connect adapter
+ */
+static void
+cadet_disconnect_adapter (void *cls,
+                         void *op_result)
+{
+  struct GNUNET_CADET_Handle *cadet = op_result;
+  struct GNUNET_CADET_TEST_AdapterContext *actx = cls;
+
+  if (NULL != actx->ports)
+  {
+    for (int i = 0; i < actx->ctx->port_count; i++)
+    {
+      GNUNET_CADET_close_port (actx->ports[i]);
+      actx->ports[i] = NULL;
+    }
+    GNUNET_free (actx->ports);
+  }
+  GNUNET_free (actx);
+  GNUNET_CADET_disconnect (cadet);
+}
+
+
+/**
+ * Callback to be called when a service connect operation is completed.
+ *
+ * @param cls The callback closure from functions generating an operation.
+ * @param op The operation that has been finished.
+ * @param ca_result The service handle returned from
+ *                  GNUNET_TESTBED_ConnectAdapter() (cadet handle).
+ * @param emsg Error message in case the operation has failed.
+ *             NULL if operation has executed successfully.
+ */
+static void
+cadet_connect_cb (void *cls,
+                 struct GNUNET_TESTBED_Operation *op,
+                 void *ca_result,
+                 const char *emsg)
+{
+  struct GNUNET_CADET_TEST_Context *ctx = cls;
+  unsigned int i;
+
+  if (NULL != emsg)
+  {
+    fprintf (stderr, "Failed to connect to CADET service: %s\n",
+             emsg);
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
+  for (i = 0; i < ctx->num_peers; i++)
+    if (op == ctx->ops[i])
+    {
+      ctx->cadets[i] = ca_result;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "...cadet %u connected\n", i);
+    }
+  for (i = 0; i < ctx->num_peers; i++)
+    if (NULL == ctx->cadets[i])
+      return; /* still some CADET connections missing */
+  /* all CADET connections ready! */
+  ctx->app_main (ctx->app_main_cls,
+                 ctx,
+                 ctx->num_peers,
+                 ctx->peers,
+                 ctx->cadets);
+}
+
+
+void
+GNUNET_CADET_TEST_cleanup (struct GNUNET_CADET_TEST_Context *ctx)
+{
+  unsigned int i;
+
+  for (i = 0; i < ctx->num_peers; i++)
+  {
+    GNUNET_assert (NULL != ctx->ops[i]);
+    GNUNET_TESTBED_operation_done (ctx->ops[i]);
+    ctx->ops[i] = NULL;
+  }
+  GNUNET_free (ctx->ops);
+  GNUNET_free (ctx->cadets);
+  GNUNET_free (ctx);
+  GNUNET_SCHEDULER_shutdown ();
+}
+
+
+/**
+ * Callback run when the testbed is ready (peers running and connected to
+ * each other)
+ *
+ * @param cls Closure (context).
+ * @param h the run handle
+ * @param num_peers Number of peers that are running.
+ * @param peers Handles to each one of the @c num_peers peers.
+ * @param links_succeeded the number of overlay link connection attempts that
+ *          succeeded
+ * @param links_failed the number of overlay link connection attempts that
+ *          failed
+ */
+static void
+cadet_test_run (void *cls,
+               struct GNUNET_TESTBED_RunHandle *h,
+               unsigned int num_peers,
+               struct GNUNET_TESTBED_Peer **peers,
+               unsigned int links_succeeded,
+               unsigned int links_failed)
+{
+  struct GNUNET_CADET_TEST_Context *ctx = cls;
+  unsigned int i;
+
+  if (0 != links_failed)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Some links failed (%u), ending\n",
+                links_failed);
+    exit (2);
+  }
+
+  if  (num_peers != ctx->num_peers)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peers started %u/%u, ending\n",
+                num_peers, ctx->num_peers);
+    exit (1);
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Testbed up, %u peers and %u links\n",
+              num_peers, links_succeeded);
+  ctx->peers = peers;
+  for (i = 0; i < num_peers; i++)
+  {
+    struct GNUNET_CADET_TEST_AdapterContext *newctx;
+    newctx = GNUNET_new (struct GNUNET_CADET_TEST_AdapterContext);
+    newctx->peer = i;
+    newctx->ctx = ctx;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connecting to cadet %u\n", i);
+    ctx->ops[i] = GNUNET_TESTBED_service_connect (ctx,
+                                                  peers[i],
+                                                  "cadet",
+                                                  &cadet_connect_cb,
+                                                  ctx,
+                                                  &cadet_connect_adapter,
+                                                  &cadet_disconnect_adapter,
+                                                  newctx);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "op handle %p\n", ctx->ops[i]);
+  }
+}
+
+
+/**
+ * Run a test using the given name, configuration file and number of peers.
+ * All cadet callbacks will receive the peer number (long) as the closure.
+ *
+ * @param testname Name of the test (for logging).
+ * @param cfgfile Name of the configuration file.
+ * @param num_peers Number of peers to start.
+ * @param tmain Main function to run once the testbed is ready.
+ * @param tmain_cls Closure for @a tmain.
+ * @param connects Handler for incoming channels.
+ * @param window_changes Handler for the window size change notification.
+ * @param disconnects Cleaner for destroyed incoming channels.
+ * @param handlers Message handlers.
+ * @param ports Ports the peers offer, NULL-terminated.
+ */
+void
+GNUNET_CADET_TEST_ruN (const char *testname,
+                       const char *cfgfile,
+                       unsigned int num_peers,
+                       GNUNET_CADET_TEST_AppMain tmain,
+                       void *tmain_cls,
+                       GNUNET_CADET_ConnectEventHandler connects,
+                       GNUNET_CADET_WindowSizeEventHandler window_changes,
+                       GNUNET_CADET_DisconnectEventHandler disconnects,
+                       struct GNUNET_MQ_MessageHandler *handlers,
+                       const struct GNUNET_HashCode **ports)
+{
+  struct GNUNET_CADET_TEST_Context *ctx;
+
+  ctx = GNUNET_new (struct GNUNET_CADET_TEST_Context);
+  ctx->num_peers = num_peers;
+  ctx->ops = GNUNET_new_array (num_peers, struct GNUNET_TESTBED_Operation *);
+  ctx->cadets = GNUNET_new_array (num_peers, struct GNUNET_CADET_Handle *);
+  ctx->app_main = tmain;
+  ctx->app_main_cls = tmain_cls;
+  ctx->connects = connects;
+  ctx->window_changes = window_changes;
+  ctx->disconnects = disconnects;
+  ctx->handlers = GNUNET_MQ_copy_handlers (handlers);
+  ctx->ports = ports;
+  ctx->port_count = 0;
+  while (NULL != ctx->ports[ctx->port_count])
+    ctx->port_count++;
+
+  GNUNET_TESTBED_test_run (testname,
+                           cfgfile,
+                           num_peers,
+                           0LL, NULL, NULL,
+                           &cadet_test_run, ctx);
+}
+
+/* end of cadet_test_lib.c */
diff --git a/src/cadet/cadet_test_lib_new.h b/src/cadet/cadet_test_lib_new.h
new file mode 100644
index 000000000..4b3a6b18d
--- /dev/null
+++ b/src/cadet/cadet_test_lib_new.h
@@ -0,0 +1,106 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2012,2017 GNUnet e.V.
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
+*/
+/**
+ * @file cadet/cadet_test_lib.h
+ * @author Bartlomiej Polot
+ * @brief library for writing CADET tests
+ */
+#ifndef CADET_TEST_LIB_H
+#define CADET_TEST_LIB_H
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0                           /* keep Emacsens' auto-indent happy */
+}
+#endif
+#endif
+
+#include "gnunet_testbed_service.h"
+#include "gnunet_cadet_service.h"
+
+/**
+ * Test context for a CADET Test.
+ */
+struct GNUNET_CADET_TEST_Context;
+
+
+/**
+ * Main function of a CADET test.
+ *
+ * @param cls Closure.
+ * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
+ * @param num_peers Number of peers that are running.
+ * @param peers Array of peers.
+ * @param cadets Handle to each of the CADETs of the peers.
+ */
+typedef void (*GNUNET_CADET_TEST_AppMain) (void *cls,
+                                          struct GNUNET_CADET_TEST_Context 
*ctx,
+                                          unsigned int num_peers,
+                                          struct GNUNET_TESTBED_Peer **peers,
+                                          struct GNUNET_CADET_Handle **cadets);
+
+
+/**
+ * Run a test using the given name, configuration file and number of peers.
+ * All cadet callbacks will receive the peer number (long) as the closure.
+ *
+ * @param testname Name of the test (for logging).
+ * @param cfgfile Name of the configuration file.
+ * @param num_peers Number of peers to start.
+ * @param tmain Main function to run once the testbed is ready.
+ * @param tmain_cls Closure for @a tmain.
+ * @param connects Handler for incoming channels.
+ * @param window_changes Handler for the window size change notification.
+ * @param disconnects Cleaner for destroyed incoming channels.
+ * @param handlers Message handlers.
+ * @param ports Ports the peers offer, NULL-terminated.
+ */
+void
+GNUNET_CADET_TEST_ruN (const char *testname,
+                       const char *cfgfile,
+                       unsigned int num_peers,
+                       GNUNET_CADET_TEST_AppMain tmain,
+                       void *tmain_cls,
+                       GNUNET_CADET_ConnectEventHandler connects,
+                       GNUNET_CADET_WindowSizeEventHandler window_changes,
+                       GNUNET_CADET_DisconnectEventHandler disconnects,
+                       struct GNUNET_MQ_MessageHandler *handlers,
+                       const struct GNUNET_HashCode **ports);
+
+/**
+ * Clean up the testbed.
+ *
+ * @param ctx handle for the testbed
+ */
+void
+GNUNET_CADET_TEST_cleanup (struct GNUNET_CADET_TEST_Context *ctx);
+
+
+#if 0                           /* keep Emacsens' auto-indent happy */
+{
+#endif
+#ifdef __cplusplus
+}
+#endif
+
+
+/* ifndef CADET_TEST_LIB_H */
+#endif
diff --git a/src/cadet/test_cadet.c b/src/cadet/test_cadet.c
index f2e639e7a..e57c01be2 100644
--- a/src/cadet/test_cadet.c
+++ b/src/cadet/test_cadet.c
@@ -593,8 +593,8 @@ tmt_rdy (void *cls, size_t size, void *buf)
                 "sending initializer\n");
     msg_size = size_payload + 1000;
     msg->size = htons (msg_size);
-  if (SPEED_ACK == test)
-      data_sent++;
+    if (SPEED_ACK == test)
+        data_sent++;
   }
   else if ( (SPEED == test) ||
             (SPEED_ACK == test) )
diff --git a/src/cadet/test_cadet.c b/src/cadet/test_cadet_new.c
similarity index 52%
copy from src/cadet/test_cadet.c
copy to src/cadet/test_cadet_new.c
index f2e639e7a..622e87ea1 100644
--- a/src/cadet/test_cadet.c
+++ b/src/cadet/test_cadet_new.c
@@ -18,23 +18,34 @@
      Boston, MA 02110-1301, USA.
 */
 /**
- * @file cadet/test_cadet.c
+ * @file cadet/test_cadet_mq.c
  * @author Bart Polot
  * @author Christian Grothoff
- * @brief Test for the cadet service: retransmission of traffic.
+ * @brief Test for the cadet service using mq API.
  */
 #include <stdio.h>
 #include "platform.h"
-#include "cadet_test_lib.h"
+#include "cadet_test_lib_new.h"
 #include "gnunet_cadet_service.h"
 #include "gnunet_statistics_service.h"
 #include <gauger.h>
 
 
 /**
+ * Ugly workaround to unify data handlers on incoming and outgoing channels.
+ */
+struct CadetTestChannelWrapper
+{
+  /**
+   * Channel pointer.
+   */
+  struct GNUNET_CADET_Channel *ch;
+};
+
+/**
  * How many messages to send
  */
-#define TOTAL_PACKETS 500 /* Cannot exceed 64k! */
+#define TOTAL_PACKETS 500       /* Cannot exceed 64k! */
 
 /**
  * How long until we give up on connecting the peers?
@@ -83,9 +94,9 @@ static int ok;
 static int ok_goal;
 
 /**
- * Size of each test packet
+ * Size of each test packet's payload
  */
-static size_t size_payload = sizeof (struct GNUNET_MessageHeader) + sizeof 
(uint32_t);
+static size_t size_payload = sizeof (uint32_t);
 
 /**
  * Operation to get peer ids.
@@ -158,9 +169,9 @@ static struct GNUNET_SCHEDULER_Task *disconnect_task;
 static struct GNUNET_SCHEDULER_Task *test_task;
 
 /**
- * Task runnining #data_task().
+ * Task runnining #send_next_msg().
  */
-static struct GNUNET_SCHEDULER_Task *data_job;
+static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
 
 /**
  * Cadet handle for the root peer
@@ -175,7 +186,7 @@ static struct GNUNET_CADET_Handle *h2;
 /**
  * Channel handle for the root peer
  */
-static struct GNUNET_CADET_Channel *ch;
+static struct GNUNET_CADET_Channel *outgoing_ch;
 
 /**
  * Channel handle for the dest peer
@@ -183,17 +194,6 @@ static struct GNUNET_CADET_Channel *ch;
 static struct GNUNET_CADET_Channel *incoming_ch;
 
 /**
- * Transmit handle for root data calls
- */
-static struct GNUNET_CADET_TransmitHandle *th;
-
-/**
- * Transmit handle for root data calls
- */
-static struct GNUNET_CADET_TransmitHandle *incoming_th;
-
-
-/**
  * Time we started the data transmission (after channel has been established
  * and initilized).
  */
@@ -225,20 +225,26 @@ static unsigned int ka_received;
 static unsigned int msg_dropped;
 
 
+/******************************************************************************/
+
+
+/******************************************************************************/
+
+
 /**
- * Get the client number considered as the "target" or "receiver", depending on
+ * Get the channel considered as the "target" or "receiver", depending on
  * the test type and size.
  *
- * @return Peer # of the target client, either 0 (for backward tests) or
- *         the last peer in the line (for other tests).
+ * @return Channel handle of the target client, either 0 (for backward tests)
+ *         or the last peer in the line (for other tests).
  */
-static unsigned int
-get_expected_target ()
+static struct GNUNET_CADET_Channel *
+get_target_channel ()
 {
   if (SPEED == test && GNUNET_YES == test_backwards)
-    return 0;
+    return outgoing_ch;
   else
-    return peers_requested - 1;
+    return incoming_ch;
 }
 
 
@@ -251,16 +257,13 @@ show_end_data (void)
   static struct GNUNET_TIME_Absolute end_time;
   static struct GNUNET_TIME_Relative total_time;
 
-  end_time = GNUNET_TIME_absolute_get();
-  total_time = GNUNET_TIME_absolute_get_difference(start_time, end_time);
+  end_time = GNUNET_TIME_absolute_get ();
+  total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time);
   FPRINTF (stderr, "\nResults of test \"%s\"\n", test_name);
   FPRINTF (stderr, "Test time %s\n",
-          GNUNET_STRINGS_relative_time_to_string (total_time,
-                                                  GNUNET_YES));
-  FPRINTF (stderr, "Test bandwidth: %f kb/s\n",
-          4 * TOTAL_PACKETS * 1.0 / (total_time.rel_value_us / 1000)); // 
4bytes * ms
-  FPRINTF (stderr, "Test throughput: %f packets/s\n\n",
-          TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000)); // 
packets * ms
+           GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES));
+  FPRINTF (stderr, "Test bandwidth: %f kb/s\n", 4 * TOTAL_PACKETS * 1.0 / 
(total_time.rel_value_us / 1000));    // 4bytes * ms
+  FPRINTF (stderr, "Test throughput: %f packets/s\n\n", TOTAL_PACKETS * 1000.0 
/ (total_time.rel_value_us / 1000));     // packets * ms
   GAUGER ("CADET", test_name,
           TOTAL_PACKETS * 1000.0 / (total_time.rel_value_us / 1000),
           "packets/s");
@@ -281,29 +284,19 @@ disconnect_cadet_peers (void *cls)
 
   disconnect_task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-             "disconnecting cadet service of peers, called from line %ld\n",
-             line);
+              "disconnecting cadet service of peers, called from line %ld\n",
+              line);
   for (i = 0; i < 2; i++)
   {
     GNUNET_TESTBED_operation_done (t_op[i]);
   }
-  if (NULL != ch)
+  if (NULL != outgoing_ch)
   {
-    if (NULL != th)
-    {
-      GNUNET_CADET_notify_transmit_ready_cancel (th);
-      th = NULL;
-    }
-    GNUNET_CADET_channel_destroy (ch);
-    ch = NULL;
+    GNUNET_CADET_channel_destroy (outgoing_ch);
+    outgoing_ch = NULL;
   }
   if (NULL != incoming_ch)
   {
-    if (NULL != incoming_th)
-    {
-      GNUNET_CADET_notify_transmit_ready_cancel (incoming_th);
-      incoming_th = NULL;
-    }
     GNUNET_CADET_channel_destroy (incoming_ch);
     incoming_ch = NULL;
   }
@@ -322,10 +315,10 @@ static void
 shutdown_task (void *cls)
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n");
-  if (NULL != data_job)
+  if (NULL != send_next_msg_task)
   {
-    GNUNET_SCHEDULER_cancel (data_job);
-    data_job = NULL;
+    GNUNET_SCHEDULER_cancel (send_next_msg_task);
+    send_next_msg_task = NULL;
   }
   if (NULL != test_task)
   {
@@ -335,8 +328,8 @@ shutdown_task (void *cls)
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
-                                               (void *) __LINE__);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, (void *) __LINE__);
   }
 }
 
@@ -351,17 +344,11 @@ shutdown_task (void *cls)
  *          operation has executed successfully.
  */
 static void
-stats_cont (void *cls,
-            struct GNUNET_TESTBED_Operation *op,
-            const char *emsg)
+stats_cont (void *cls, struct GNUNET_TESTBED_Operation *op, const char *emsg)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              " KA sent: %u, KA received: %u\n",
-              ka_sent,
-              ka_received);
-  if ( (KEEPALIVE == test) &&
-       ( (ka_sent < 2) ||
-         (ka_sent > ka_received + 1)) )
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, " KA sent: %u, KA received: %u\n",
+              ka_sent, ka_received);
+  if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1)))
   {
     GNUNET_break (0);
     ok--;
@@ -370,8 +357,7 @@ stats_cont (void *cls,
 
   if (NULL != disconnect_task)
     GNUNET_SCHEDULER_cancel (disconnect_task);
-  disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
-                                             cls);
+  disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, cls);
 }
 
 
@@ -387,11 +373,8 @@ stats_cont (void *cls,
  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
  */
 static int
-stats_iterator (void *cls,
-                const struct GNUNET_TESTBED_Peer *peer,
-                const char *subsystem,
-                const char *name,
-                uint64_t value,
+stats_iterator (void *cls, const struct GNUNET_TESTBED_Peer *peer,
+                const char *subsystem, const char *name, uint64_t value,
                 int is_persistent)
 {
   static const char *s_sent = "# keepalives sent";
@@ -401,19 +384,15 @@ stats_iterator (void *cls,
   uint32_t i;
 
   i = GNUNET_TESTBED_get_index (peer);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "STATS PEER %u - %s [%s]: %llu\n",
-              i,
-              subsystem,
-              name,
-              (unsigned long long) value);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
+              subsystem, name, (unsigned long long) value);
   if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i)
     ka_sent = value;
-  if (0 == strncmp(s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
+  if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
     ka_received = value;
-  if (0 == strncmp(rdrops, name, strlen (rdrops)))
+  if (0 == strncmp (rdrops, name, strlen (rdrops)))
     msg_dropped += value;
-  if (0 == strncmp(cdrops, name, strlen (cdrops)))
+  if (0 == strncmp (cdrops, name, strlen (cdrops)))
     msg_dropped += value;
 
   return GNUNET_OK;
@@ -423,7 +402,7 @@ stats_iterator (void *cls,
 /**
  * Task to gather all statistics.
  *
- * @param cls Closure (NULL).
+ * @param cls Closure (line from which the task was scheduled).
  */
 static void
 gather_stats_and_exit (void *cls)
@@ -432,21 +411,20 @@ gather_stats_and_exit (void *cls)
 
   disconnect_task = NULL;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-             "gathering statistics from line %d\n",
-             (int) l);
-  if (NULL != ch)
+              "gathering statistics from line %ld\n",
+              l);
+  if (NULL != outgoing_ch)
   {
-    if (NULL != th)
-    {
-      GNUNET_CADET_notify_transmit_ready_cancel (th);
-      th = NULL;
-    }
-    GNUNET_CADET_channel_destroy (ch);
-    ch = NULL;
+    GNUNET_CADET_channel_destroy (outgoing_ch);
+    outgoing_ch = NULL;
   }
-  stats_op = GNUNET_TESTBED_get_statistics (peers_running, testbed_peers,
-                                            "cadet", NULL,
-                                            &stats_iterator, stats_cont, cls);
+  stats_op = GNUNET_TESTBED_get_statistics (peers_running,
+                                            testbed_peers,
+                                            "cadet",
+                                            NULL,
+                                            &stats_iterator,
+                                            stats_cont,
+                                            cls);
 }
 
 
@@ -462,163 +440,126 @@ abort_test (long line)
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "Aborting test from %ld\n", line);
-    disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
-                                                (void *) line);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Aborting test from %ld\n", line);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers, (void *) line);
   }
 }
 
+
 /**
- * Transmit ready callback.
+ * Send a message on the channel with the appropriate size and payload.
  *
- * @param cls Closure (message type).
- * @param size Size of the tranmist buffer.
- * @param buf Pointer to the beginning of the buffer.
+ * Update the appropriate *_sent counter.
  *
- * @return Number of bytes written to buf.
+ * @param channel Channel to send the message on.
  */
-static size_t
-tmt_rdy (void *cls, size_t size, void *buf);
+static void
+send_test_message (struct GNUNET_CADET_Channel *channel)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_MessageHeader *msg;
+  uint32_t *data;
+  int *counter;
+  int size;
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending test message on channel %p\n",
+              channel);
+  size = size_payload;
+  if (GNUNET_NO == initialized)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
+    size += 1000;
+    counter = &data_sent;
+    if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer
+        data_sent++;
+  }
+  else if (SPEED == test || SPEED_ACK == test)
+  {
+    counter = get_target_channel() == channel ? &ack_sent : &data_sent;
+    size += *counter;
+    *counter = *counter + 1;
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending message %u\n", *counter);
+  }
+  else
+  {
+    counter =  &ack_sent;
+  }
+  env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
+
+  data = (uint32_t *) &msg[1];
+  *data = htonl (*counter);
+  GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env);
+}
 
 /**
- * Task to request a new data transmission.
+ * Task to request a new data transmission in a SPEED test, without waiting
+ * for previous messages to be sent/arrrive.
  *
- * @param cls Closure (peer #).
+ * @param cls Closure (unused).
  */
 static void
-data_task (void *cls)
+send_next_msg (void *cls)
 {
   struct GNUNET_CADET_Channel *channel;
-  static struct GNUNET_CADET_TransmitHandle **pth;
-  long src;
 
-  data_job = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Data task\n");
-  if (GNUNET_YES == test_backwards)
-  {
-    channel = incoming_ch;
-    pth = &incoming_th;
-    src = peers_requested - 1;
-  }
-  else
-  {
-    channel = ch;
-    pth = &th;
-    src = 0;
-  }
+  send_next_msg_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending next message: %d\n", 
data_sent);
 
+  channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
   GNUNET_assert (NULL != channel);
-  GNUNET_assert (NULL == *pth);
-
-  *pth = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                             GNUNET_TIME_UNIT_FOREVER_REL,
-                                             size_payload + data_sent,
-                                             &tmt_rdy, (void *) src);
-  if (NULL == *pth)
+  GNUNET_assert (SPEED == test);
+  send_test_message (channel);
+  if (data_sent < TOTAL_PACKETS)
   {
-    unsigned long i = (unsigned long) cls;
-
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Retransmission\n");
-    if (0 == i)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "  in 1 ms\n");
-      data_job = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS,
-                                              &data_task, (void *) 1L);
-    }
-    else
-    {
-      i++;
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "in %llu ms\n",
-                  (unsigned long long) i);
-      data_job = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MILLISECONDS,
-                                                                             
i),
-                                              &data_task, (void *) i);
-    }
+    /* SPEED test: Send all messages as soon as possible */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Scheduling message %d\n",
+                data_sent + 1);
+    send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg, NULL);
   }
 }
 
 
 /**
- * Transmit ready callback
+ * Every few messages cancel the timeout task and re-schedule it again, to
+ * avoid timing out when traffic keeps coming.
  *
- * @param cls Closure (peer # which is sending the data).
- * @param size Size of the buffer we have.
- * @param buf Buffer to copy data to.
+ * @param line Code line number to log if a timeout occurs.
  */
-static size_t
-tmt_rdy (void *cls, size_t size, void *buf)
+static void
+reschedule_timeout_task (long line)
 {
-  struct GNUNET_MessageHeader *msg = buf;
-  size_t msg_size;
-  uint32_t *data;
-  long id = (long) cls;
-  unsigned int counter;
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "tmt_rdy on %ld, filling buffer\n",
-              id);
-  if (0 == id)
-    th = NULL;
-  else if ((peers_requested - 1) == id)
-    incoming_th = NULL;
-  else
-    GNUNET_assert (0);
-  counter = get_expected_target () == id ? ack_sent : data_sent;
-  msg_size = size_payload + counter;
-  GNUNET_assert (msg_size > sizeof (struct GNUNET_MessageHeader));
-  if ( (size < msg_size) ||
-       (NULL == buf) )
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "size %u, buf %p, data_sent %u, ack_received %u\n",
-                (unsigned int) size,
-                buf,
-                data_sent,
-                ack_received);
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ok %u, ok goal %u\n", ok, ok_goal);
-    GNUNET_break (ok >= ok_goal - 2);
-
-    return 0;
-  }
-  msg->size = htons (msg_size);
-  msg->type = htons (GNUNET_MESSAGE_TYPE_DUMMY);
-  data = (uint32_t *) &msg[1];
-  *data = htonl (counter);
-  if (GNUNET_NO == initialized)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "sending initializer\n");
-    msg_size = size_payload + 1000;
-    msg->size = htons (msg_size);
-  if (SPEED_ACK == test)
-      data_sent++;
-  }
-  else if ( (SPEED == test) ||
-            (SPEED_ACK == test) )
+  if ((ok % 10) == 0)
   {
-    if (get_expected_target() == id)
-      ack_sent++;
-    else
-      data_sent++;
-    counter++;
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                " Sent message %u size %u\n",
-                counter,
-                (unsigned int) msg_size);
-    if ( (data_sent < TOTAL_PACKETS) &&
-         (SPEED == test) )
+    if (NULL != disconnect_task)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  " Scheduling message %d\n",
-                  counter + 1);
-      data_job = GNUNET_SCHEDULER_add_now (&data_task, NULL);
+                  " reschedule timeout every 10 messages\n");
+      GNUNET_SCHEDULER_cancel (disconnect_task);
+      disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
+                                                      &gather_stats_and_exit,
+                                                      (void *) line);
     }
   }
+}
+
 
-  return msg_size;
+/**
+ * Check if payload is sane (size contains payload).
+ *
+ * @param cls should match #ch
+ * @param message The actual message.
+ * @return #GNUNET_OK to keep the channel open,
+ *         #GNUNET_SYSERR to close it (signal serious error).
+ */
+static int
+check_data (void *cls, const struct GNUNET_MessageHeader *message)
+{
+  if (sizeof (struct GNUNET_MessageHeader) >= ntohs (message->size))
+    return GNUNET_SYSERR;
+  return GNUNET_OK;             /* all is well-formed */
 }
 
 
@@ -626,75 +567,49 @@ tmt_rdy (void *cls, size_t size, void *buf)
  * Function is called whenever a message is received.
  *
  * @param cls closure (set from GNUNET_CADET_connect(), peer number)
- * @param channel connection to the other end
- * @param channel_ctx place to store local state associated with the channel
  * @param message the actual message
- * @return #GNUNET_OK to keep the connection open,
- *         #GNUNET_SYSERR to close it (signal serious error)
  */
-static int
-data_callback (void *cls,
-               struct GNUNET_CADET_Channel *channel,
-               void **channel_ctx,
-               const struct GNUNET_MessageHeader *message)
+static void
+handle_data (void *cls, const struct GNUNET_MessageHeader *message)
 {
-  struct GNUNET_CADET_TransmitHandle **pth;
-  long client = (long) cls;
-  long expected_target_client;
+  struct CadetTestChannelWrapper *ch = cls;
+  struct GNUNET_CADET_Channel *channel = ch->ch;
   uint32_t *data;
   uint32_t payload;
-  unsigned int counter;
+  int *counter;
 
   ok++;
-  counter = get_expected_target () == client ? data_received : ack_received;
+  counter = get_target_channel () == channel ? &data_received : &ack_received;
 
-  GNUNET_CADET_receive_done (channel);
+  reschedule_timeout_task ((long) __LINE__);
 
-  if ((ok % 10) == 0)
+  if (channel == outgoing_ch)
   {
-    if (NULL != disconnect_task)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                  " reschedule timeout\n");
-      GNUNET_SCHEDULER_cancel (disconnect_task);
-      disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                      &gather_stats_and_exit,
-                                                      (void *) __LINE__);
-    }
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Root client got a message!\n");
   }
-
-  switch (client)
+  else if (channel == incoming_ch)
   {
-  case 0L:
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Root client got a message!\n");
-    GNUNET_assert (channel == ch);
-    pth = &th;
-    break;
-  case 1L:
-  case 4L:
-    GNUNET_assert (client == peers_requested - 1);
-    GNUNET_assert (channel == incoming_ch);
-    pth = &incoming_th;
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client %ld got a message.\n",
-                client);
-    break;
-  default:
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Client %ld not valid.\n", client);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Leaf client got a message.\n");
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unknown channel %p.\n", channel);
     GNUNET_assert (0);
   }
+
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: (%d/%d)\n", ok, ok_goal);
   data = (uint32_t *) &message[1];
   payload = ntohl (*data);
-  if (payload == counter)
+  if (payload == *counter)
   {
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, " payload as expected: %u\n", payload);
   }
   else
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " payload %u, expected: %u\n",
-                payload, counter);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                " payload %u, expected: %u\n",
+                payload, *counter);
   }
-  expected_target_client = get_expected_target ();
 
   if (GNUNET_NO == initialized)
   {
@@ -702,163 +617,128 @@ data_callback (void *cls,
     start_time = GNUNET_TIME_absolute_get ();
     if (SPEED == test)
     {
-      GNUNET_assert (peers_requested - 1 == client);
-      data_job = GNUNET_SCHEDULER_add_now (&data_task, NULL);
-      return GNUNET_OK;
+      GNUNET_assert (incoming_ch == channel);
+      send_next_msg_task = GNUNET_SCHEDULER_add_now (&send_next_msg, NULL);
+      return;
     }
   }
 
-  counter++;
-  if (client == expected_target_client) /* Normally 4 */
+  (*counter)++;
+  if (get_target_channel () == channel) /* Got "data" */
   {
-    data_received++;
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received data %u\n", data_received);
     if (SPEED != test || (ok_goal - 2) == ok)
     {
       /* Send ACK */
-      GNUNET_assert (NULL == *pth);
-      *pth = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                                 GNUNET_TIME_UNIT_FOREVER_REL,
-                                                 size_payload + ack_sent,
-                                                 &tmt_rdy, (void *) client);
-      return GNUNET_OK;
+      send_test_message (channel);
+      return;
     }
     else
     {
       if (data_received < TOTAL_PACKETS)
-        return GNUNET_OK;
+        return;
     }
   }
-  else /* Normally 0 */
+  else /* Got "ack" */
   {
     if (SPEED_ACK == test || SPEED == test)
     {
-      ack_received++;
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
-      /* send more data */
-      GNUNET_assert (NULL == *pth);
-      *pth = GNUNET_CADET_notify_transmit_ready (channel, GNUNET_NO,
-                                                 GNUNET_TIME_UNIT_FOREVER_REL,
-                                                 size_payload + data_sent,
-                                                 &tmt_rdy, (void *) client);
+      /* Send more data */
+      send_test_message (channel);
       if (ack_received < TOTAL_PACKETS && SPEED != test)
-        return GNUNET_OK;
+        return;
       if (ok == 2 && SPEED == test)
-        return GNUNET_OK;
-      show_end_data();
+        return;
+      show_end_data ();
     }
     if (test == P2P_SIGNAL)
     {
-      if (NULL != incoming_th)
-      {
-        GNUNET_CADET_notify_transmit_ready_cancel (incoming_th);
-        incoming_th = NULL;
-      }
       GNUNET_CADET_channel_destroy (incoming_ch);
       incoming_ch = NULL;
     }
     else
     {
-      if (NULL != th)
-      {
-        GNUNET_CADET_notify_transmit_ready_cancel (th);
-        th = NULL;
-      }
-      GNUNET_CADET_channel_destroy (ch);
-      ch = NULL;
+      GNUNET_CADET_channel_destroy (outgoing_ch);
+      outgoing_ch = NULL;
     }
   }
-
-  return GNUNET_OK;
 }
 
 
 /**
- * Data handlers for every message type of CADET's payload.
- * {callback_function, message_type, size_expected}
- */
-static struct GNUNET_CADET_MessageHandler handlers[] = {
-  {&data_callback,
-   GNUNET_MESSAGE_TYPE_DUMMY,
-   sizeof (struct GNUNET_MessageHeader)},
-  {NULL, 0, 0}
-};
-
-
-/**
- * Method called whenever another peer has added us to a channel
- * the other peer initiated.
+ * Method called whenever a peer connects to a port in MQ-based CADET.
  *
- * @param cls Closure.
+ * @param cls Closure from #GNUNET_CADET_open_porT (peer # as long).
  * @param channel New handle to the channel.
- * @param initiator Peer that started the channel.
- * @param port Port this channel is connected to.
- * @param options channel option flags
- * @return Initial channel context for the channel
- *         (can be NULL -- that's not an error).
+ * @param source Peer that started this channel.
+ * @return Closure for the incoming @a channel. It's given to:
+ *         - The #GNUNET_CADET_DisconnectEventHandler (given to
+ *           #GNUNET_CADET_open_porT) when the channel dies.
+ *         - Each the #GNUNET_MQ_MessageCallback handlers for each message
+ *           received on the @a channel.
  */
 static void *
-incoming_channel (void *cls,
-                  struct GNUNET_CADET_Channel *channel,
-                  const struct GNUNET_PeerIdentity *initiator,
-                  const struct GNUNET_HashCode *port,
-                  enum GNUNET_CADET_ChannelOption options)
+connect_handler (void *cls, struct GNUNET_CADET_Channel *channel,
+                 const struct GNUNET_PeerIdentity *source)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Incoming channel from %s to peer %d:%s\n",
-              GNUNET_i2s (initiator),
-              (int) (long) cls, GNUNET_h2s (port));
+  struct CadetTestChannelWrapper *ch;
+  long peer = (long) cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Incoming channel from %s to peer %ld\n",
+              GNUNET_i2s (source), peer);
   ok++;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok);
-  if ((long) cls == peers_requested - 1)
+  if (peer == peers_requested - 1)
   {
     if (NULL != incoming_ch)
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                  "Duplicate incoming channel for client %lu\n",
-                  (long) cls);
-      GNUNET_break(0);
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Duplicate incoming channel for client %lu\n", (long) cls);
+      GNUNET_assert (0);
     }
     incoming_ch = channel;
   }
   else
   {
     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Incoming channel for unexpected peer #%lu\n",
-                (long) cls);
-    GNUNET_break (0);
+                "Incoming channel for unexpected peer #%lu\n", (long) cls);
+    GNUNET_assert (0);
   }
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                                    &gather_stats_and_exit,
-                                                    (void *) __LINE__);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_delayed (SHORT_TIME, &gather_stats_and_exit,
+                                      (void *) __LINE__);
   }
 
-  return NULL;
+  /* TODO: cannot return channel as-is, in order to unify the data handlers */
+  ch = GNUNET_new (struct CadetTestChannelWrapper);
+  ch->ch = channel;
+
+  return ch;
 }
 
 
 /**
- * Function called whenever an inbound channel is destroyed.  Should clean up
- * any associated state.
+ * Function called whenever an MQ-channel is destroyed, even if the destruction
+ * was requested by #GNUNET_CADET_channel_destroy.
+ * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
+ *
+ * It should clean up any associated state, including cancelling any pending
+ * transmission on this channel.
  *
- * @param cls closure (set from GNUNET_CADET_connect, peer number)
- * @param channel connection to the other end (henceforth invalid)
- * @param channel_ctx place where local state associated
- *                   with the channel is stored
+ * @param cls Channel closure.
+ * @param channel Connection to the other end (henceforth invalid).
  */
 static void
-channel_cleaner (void *cls,
-                 const struct GNUNET_CADET_Channel *channel,
-                 void *channel_ctx)
+disconnect_handler (void *cls, const struct GNUNET_CADET_Channel *channel)
 {
   long i = (long) cls;
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-              "Incoming channel disconnected at peer %ld\n",
-              i);
+              "Channel disconnected at %p\n", cls);
   if (peers_running - 1 == i)
   {
     ok++;
@@ -871,20 +751,18 @@ channel_cleaner (void *cls,
     {
       ok++;
     }
-    GNUNET_break (channel == ch);
-    ch = NULL;
+    GNUNET_break (channel == outgoing_ch);
+    outgoing_ch = NULL;
   }
   else
-    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                "Unknown peer! %d\n",
-                (int) i);
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Unknown peer! %d\n", (int) i);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, " ok: %d\n", ok);
 
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
-    disconnect_task = GNUNET_SCHEDULER_add_now (&gather_stats_and_exit,
-                                                (void *) __LINE__);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&gather_stats_and_exit, (void *) __LINE__);
   }
 }
 
@@ -898,13 +776,20 @@ channel_cleaner (void *cls,
  * @param cls Closure (unused).
  */
 static void
-do_test (void *cls)
+start_test (void *cls)
 {
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DUMMY,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
+  struct CadetTestChannelWrapper *ch;
   enum GNUNET_CADET_ChannelOption flags;
 
   test_task = NULL;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "do_test\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "start_test\n");
   if (NULL != disconnect_task)
   {
     GNUNET_SCHEDULER_cancel (disconnect_task);
@@ -918,30 +803,30 @@ do_test (void *cls)
     flags |= GNUNET_CADET_OPTION_RELIABLE;
   }
 
-  ch = GNUNET_CADET_channel_create (h1,
-                                    NULL,
-                                    p_id[1],
-                                    &port,
-                                    flags);
+  ch = GNUNET_new (struct CadetTestChannelWrapper);
+  outgoing_ch = GNUNET_CADET_channel_creatE (h1,
+                                             ch,
+                                             p_id[1],
+                                             &port,
+                                             flags,
+                                             NULL,
+                                             &disconnect_handler,
+                                             handlers);
+  ch->ch = outgoing_ch;
 
-  disconnect_task
-    = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
-                                    &gather_stats_and_exit,
-                                    (void *) __LINE__);
+  disconnect_task = GNUNET_SCHEDULER_add_delayed (SHORT_TIME,
+                                                  &gather_stats_and_exit,
+                                                  (void *) __LINE__);
   if (KEEPALIVE == test)
-    return; /* Don't send any data. */
+    return;                     /* Don't send any data. */
+
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Sending data initializer...\n");
   data_received = 0;
   data_sent = 0;
   ack_received = 0;
   ack_sent = 0;
-  th = GNUNET_CADET_notify_transmit_ready (ch,
-                                           GNUNET_NO,
-                                           GNUNET_TIME_UNIT_FOREVER_REL,
-                                           size_payload + 1000,
-                                           &tmt_rdy, (void *) 0L);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending data initializer...\n");
+  send_test_message (outgoing_ch);
 }
 
 
@@ -955,35 +840,26 @@ do_test (void *cls)
  *             NULL if the operation is successfull
  */
 static void
-pi_cb (void *cls,
-       struct GNUNET_TESTBED_Operation *op,
-       const struct GNUNET_TESTBED_PeerInformation *pinfo,
-       const char *emsg)
+pi_cb (void *cls, struct GNUNET_TESTBED_Operation *op,
+       const struct GNUNET_TESTBED_PeerInformation *pinfo, const char *emsg)
 {
   long i = (long) cls;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "id callback for %ld\n", i);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "ID callback for %ld\n", i);
 
-  if ( (NULL == pinfo) ||
-       (NULL != emsg) )
+  if ((NULL == pinfo) || (NULL != emsg))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "pi_cb: %s\n", emsg);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "pi_cb: %s\n", emsg);
     abort_test (__LINE__);
     return;
   }
   p_id[i] = pinfo->result.id;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "  id: %s\n", GNUNET_i2s (p_id[i]));
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "  id: %s\n", GNUNET_i2s (p_id[i]));
   p_ids++;
   if (p_ids < 2)
     return;
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "Got all IDs, starting test\n");
-  test_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
-                                            &do_test,
-                                            NULL);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got all IDs, starting test\n");
+  test_task = GNUNET_SCHEDULER_add_now (&start_test, NULL);
 }
 
 
@@ -994,7 +870,7 @@ pi_cb (void *cls,
  * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
  * @param num_peers Number of peers that are running.
  * @param peers Array of peers.
- * @param cadetes Handle to each of the CADETs of the peers.
+ * @param cadets Handle to each of the CADETs of the peers.
  */
 static void
 tmain (void *cls,
@@ -1017,10 +893,12 @@ tmain (void *cls,
   GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
   t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0],
                                                  GNUNET_TESTBED_PIT_IDENTITY,
-                                                 &pi_cb, (void *) 0L);
+                                                 &pi_cb,
+                                                 (void *) 0L);
   t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1],
                                                  GNUNET_TESTBED_PIT_IDENTITY,
-                                                 &pi_cb, (void *) 1L);
+                                                 &pi_cb,
+                                                 (void *) 1L);
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "requested peer ids\n");
 }
 
@@ -1031,10 +909,19 @@ tmain (void *cls,
 int
 main (int argc, char *argv[])
 {
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DUMMY,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
+
   initialized = GNUNET_NO;
   static const struct GNUNET_HashCode *ports[2];
   const char *config_file;
   char port_id[] = "test port";
+
   GNUNET_CRYPTO_hash (port_id, sizeof (port_id), &port);
 
   GNUNET_log_setup ("test", "DEBUG", NULL);
@@ -1137,22 +1024,22 @@ main (int argc, char *argv[])
   p_ids = 0;
   ports[0] = &port;
   ports[1] = NULL;
-  GNUNET_CADET_TEST_run ("test_cadet_small",
-                        config_file,
-                        peers_requested,
-                        &tmain,
-                        NULL, /* tmain cls */
-                        &incoming_channel,
-                        &channel_cleaner,
-                        handlers,
-                        ports);
+  GNUNET_CADET_TEST_ruN ("test_cadet_small",
+                         config_file,
+                         peers_requested,
+                         &tmain,
+                         NULL,        /* tmain cls */
+                         &connect_handler,
+                         NULL,
+                         &disconnect_handler,
+                         handlers,
+                         ports);
   if (NULL != strstr (argv[0], "_reliable"))
-    msg_dropped = 0; /* dropped should be retransmitted */
+    msg_dropped = 0;            /* dropped should be retransmitted */
 
   if (ok_goal > ok - msg_dropped)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "FAILED! (%d/%d)\n", ok, ok_goal);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "FAILED! (%d/%d)\n", ok, ok_goal);
     return 1;
   }
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "success\n");

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]