gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] 04/04: starting with another CADET testcase (unfin


From: gnunet
Subject: [GNUnet-SVN] [gnunet] 04/04: starting with another CADET testcase (unfinished)
Date: Wed, 23 May 2018 00:01:04 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository gnunet.

commit 8d7ab16402048793275b6feebd744e5e17af63fb
Author: Christian Grothoff <address@hidden>
AuthorDate: Wed May 23 00:00:58 2018 +0200

    starting with another CADET testcase (unfinished)
---
 src/cadet/cadet_test_lib.c      |  65 +--
 src/cadet/test_cadet_flow.c     | 888 ++++++++++++++++++++++++++++++++++++++++
 src/cadet/test_cadet_local_mq.c |   6 +-
 3 files changed, 928 insertions(+), 31 deletions(-)

diff --git a/src/cadet/cadet_test_lib.c b/src/cadet/cadet_test_lib.c
index 20ef028b2..0efb81ab4 100644
--- a/src/cadet/cadet_test_lib.c
+++ b/src/cadet/cadet_test_lib.c
@@ -110,7 +110,7 @@ struct GNUNET_CADET_TEST_AdapterContext
    * Port handlers for open ports.
    */
   struct GNUNET_CADET_Port **ports;
- 
+
   /**
    * General context.
    */
@@ -135,14 +135,13 @@ cadet_connect_adapter (void *cls,
   struct GNUNET_CADET_TEST_AdapterContext *actx = cls;
   struct GNUNET_CADET_TEST_Context *ctx = actx->ctx;
   struct GNUNET_CADET_Handle *h;
-  unsigned int i;
 
   h = GNUNET_CADET_connect (cfg);
   if (NULL == ctx->ports)
     return h;
-
-  actx->ports = GNUNET_new_array (ctx->port_count, struct GNUNET_CADET_Port *);
-  for (i = 0; i < ctx->port_count; i++)
+  actx->ports = GNUNET_new_array (ctx->port_count,
+                                  struct GNUNET_CADET_Port *);
+  for (unsigned int i = 0; i < ctx->port_count; i++)
   {
     actx->ports[i] = GNUNET_CADET_open_port (h,
                                              ctx->ports[i],
@@ -165,14 +164,14 @@ cadet_connect_adapter (void *cls,
  */
 static void
 cadet_disconnect_adapter (void *cls,
-                         void *op_result)
+                          void *op_result)
 {
   struct GNUNET_CADET_Handle *cadet = op_result;
   struct GNUNET_CADET_TEST_AdapterContext *actx = cls;
 
   if (NULL != actx->ports)
   {
-    for (int i = 0; i < actx->ctx->port_count; i++)
+    for (unsigned int i = 0; i < actx->ctx->port_count; i++)
     {
       GNUNET_CADET_close_port (actx->ports[i]);
       actx->ports[i] = NULL;
@@ -201,22 +200,24 @@ cadet_connect_cb (void *cls,
                  const char *emsg)
 {
   struct GNUNET_CADET_TEST_Context *ctx = cls;
-  unsigned int i;
 
   if (NULL != emsg)
   {
-    fprintf (stderr, "Failed to connect to CADET service: %s\n",
+    fprintf (stderr,
+             "Failed to connect to CADET service: %s\n",
              emsg);
     GNUNET_SCHEDULER_shutdown ();
     return;
   }
-  for (i = 0; i < ctx->num_peers; i++)
+  for (unsigned int i = 0; i < ctx->num_peers; i++)
     if (op == ctx->ops[i])
     {
       ctx->cadets[i] = ca_result;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "...cadet %u connected\n", i);
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                  "...cadet %u connected\n",
+                  i);
     }
-  for (i = 0; i < ctx->num_peers; i++)
+  for (unsigned int i = 0; i < ctx->num_peers; i++)
     if (NULL == ctx->cadets[i])
       return; /* still some CADET connections missing */
   /* all CADET connections ready! */
@@ -231,9 +232,7 @@ cadet_connect_cb (void *cls,
 void
 GNUNET_CADET_TEST_cleanup (struct GNUNET_CADET_TEST_Context *ctx)
 {
-  unsigned int i;
-
-  for (i = 0; i < ctx->num_peers; i++)
+  for (unsigned int i = 0; i < ctx->num_peers; i++)
   {
     GNUNET_assert (NULL != ctx->ops[i]);
     GNUNET_TESTBED_operation_done (ctx->ops[i]);
@@ -269,33 +268,37 @@ cadet_test_run (void *cls,
                unsigned int links_failed)
 {
   struct GNUNET_CADET_TEST_Context *ctx = cls;
-  unsigned int i;
 
   if (0 != links_failed)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Some links failed (%u), ending\n",
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Some links failed (%u), ending\n",
                 links_failed);
     exit (2);
   }
-
   if  (num_peers != ctx->num_peers)
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peers started %u/%u, ending\n",
-                num_peers, ctx->num_peers);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Peers started %u/%u, ending\n",
+                num_peers,
+                ctx->num_peers);
     exit (1);
   }
-
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "Testbed up, %u peers and %u links\n",
-              num_peers, links_succeeded);
+              num_peers,
+              links_succeeded);
   ctx->peers = peers;
-  for (i = 0; i < num_peers; i++)
+  for (unsigned int i = 0; i < num_peers; i++)
   {
     struct GNUNET_CADET_TEST_AdapterContext *newctx;
+
     newctx = GNUNET_new (struct GNUNET_CADET_TEST_AdapterContext);
     newctx->peer = i;
     newctx->ctx = ctx;
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connecting to cadet %u\n", i);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Connecting to cadet %u\n",
+                i);
     ctx->ops[i] = GNUNET_TESTBED_service_connect (ctx,
                                                   peers[i],
                                                   "cadet",
@@ -304,7 +307,9 @@ cadet_test_run (void *cls,
                                                   &cadet_connect_adapter,
                                                   &cadet_disconnect_adapter,
                                                   newctx);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "op handle %p\n", ctx->ops[i]);
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "op handle %p\n",
+                ctx->ops[i]);
   }
 }
 
@@ -340,8 +345,10 @@ GNUNET_CADET_TEST_ruN (const char *testname,
 
   ctx = GNUNET_new (struct GNUNET_CADET_TEST_Context);
   ctx->num_peers = num_peers;
-  ctx->ops = GNUNET_new_array (num_peers, struct GNUNET_TESTBED_Operation *);
-  ctx->cadets = GNUNET_new_array (num_peers, struct GNUNET_CADET_Handle *);
+  ctx->ops = GNUNET_new_array (num_peers,
+                               struct GNUNET_TESTBED_Operation *);
+  ctx->cadets = GNUNET_new_array (num_peers,
+                                  struct GNUNET_CADET_Handle *);
   ctx->app_main = tmain;
   ctx->app_main_cls = tmain_cls;
   ctx->connects = connects;
@@ -352,12 +359,12 @@ GNUNET_CADET_TEST_ruN (const char *testname,
   ctx->port_count = 0;
   while (NULL != ctx->ports[ctx->port_count])
     ctx->port_count++;
-
   GNUNET_TESTBED_test_run (testname,
                            cfgfile,
                            num_peers,
                            0LL, NULL, NULL,
-                           &cadet_test_run, ctx);
+                           &cadet_test_run,
+                           ctx);
 }
 
 /* end of cadet_test_lib.c */
diff --git a/src/cadet/test_cadet_flow.c b/src/cadet/test_cadet_flow.c
new file mode 100644
index 000000000..554ee1d85
--- /dev/null
+++ b/src/cadet/test_cadet_flow.c
@@ -0,0 +1,888 @@
+/*
+     This file is part of GNUnet.
+     Copyright (C) 2011, 2017 GNUnet e.V.
+
+     GNUnet is free software; you can redistribute it and/or modify
+     it under the terms of the GNU General Public License as published
+     by the Free Software Foundation; either version 3, or (at your
+     option) any later version.
+
+     GNUnet is distributed in the hope that it will be useful, but
+     WITHOUT ANY WARRANTY; without even the implied warranty of
+     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+     General Public License for more details.
+
+     You should have received a copy of the GNU General Public License
+     along with GNUnet; see the file COPYING.  If not, write to the
+     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+     Boston, MA 02110-1301, USA.
+*/
+/**
+ * @file cadet/test_cadet_flow.c
+ * @author Bart Polot
+ * @author Christian Grothoff
+ * @brief Test for flow control of CADET service
+ */
+#include <stdio.h>
+#include "platform.h"
+#include "cadet_test_lib.h"
+#include "gnunet_cadet_service.h"
+#include "gnunet_statistics_service.h"
+#include <gauger.h>
+
+
+/**
+ * Ugly workaround to unify data handlers on incoming and outgoing channels.
+ */
+struct CadetTestChannelWrapper
+{
+  /**
+   * Channel pointer.
+   */
+  struct GNUNET_CADET_Channel *ch;
+};
+
+/**
+ * How many messages to send by default.
+ */
+#define TOTAL_PACKETS_DEFAULT 500
+
+/**
+ * How long until we give up on connecting the peers?
+ */
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 120)
+
+/**
+ * Time to wait by default  for stuff that should be rather fast.
+ */
+#define SHORT_TIME GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 20)
+
+/**
+ * How fast do we send messages?
+ */
+#define SEND_INTERVAL GNUNET_TIME_relative_multiply 
(GNUNET_TIME_UNIT_MILLISECONDS, 10)
+
+
+/**
+ * How many packets to send.
+ */
+static unsigned int total_packets = TOTAL_PACKETS_DEFAULT;
+
+/**
+ * Time to wait for fast operations.
+ */
+static struct GNUNET_TIME_Relative short_time;
+
+/**
+ * Size of each test packet's payload
+ */
+static size_t size_payload = sizeof (uint32_t);
+
+/**
+ * Operation to get peer ids.
+ */
+static struct GNUNET_TESTBED_Operation *t_op[2];
+
+/**
+ * Peer ids.
+ */
+static struct GNUNET_PeerIdentity *p_id[2];
+
+/**
+ * Port ID
+ */
+static struct GNUNET_HashCode port;
+
+/**
+ * Peer ids counter.
+ */
+static unsigned int p_ids;
+
+/**
+ * Is the setup initialized?
+ */
+static int initialized;
+
+/**
+ * Number of payload packes sent.
+ */
+static int data_sent;
+
+/**
+ * Number of payload packets received.
+ */
+static int data_received;
+
+/**
+ * Number of payload packed acknowledgements sent.
+ */
+static int ack_sent;
+
+/**
+ * Number of payload packed explicitly (app level) acknowledged.
+ */
+static int ack_received;
+
+/**
+ * Total number of peers asked to run.
+ */
+static unsigned int peers_requested = 2;
+
+/**
+ * Number of currently running peers (should be same as @c peers_requested).
+ */
+static unsigned int peers_running;
+
+/**
+ * Test context (to shut down).
+ */
+struct GNUNET_CADET_TEST_Context *test_ctx;
+
+/**
+ * Task called to disconnect peers.
+ */
+static struct GNUNET_SCHEDULER_Task *disconnect_task;
+
+/**
+ * Task To perform tests
+ */
+static struct GNUNET_SCHEDULER_Task *test_task;
+
+/**
+ * Task runnining #send_next_msg().
+ */
+static struct GNUNET_SCHEDULER_Task *send_next_msg_task;
+
+/**
+ * Cadet handle for the root peer
+ */
+static struct GNUNET_CADET_Handle *h1;
+
+/**
+ * Cadet handle for the first leaf peer
+ */
+static struct GNUNET_CADET_Handle *h2;
+
+/**
+ * Channel handle for the root peer
+ */
+static struct GNUNET_CADET_Channel *outgoing_ch;
+
+/**
+ * Channel handle for the dest peer
+ */
+static struct GNUNET_CADET_Channel *incoming_ch;
+
+/**
+ * Time we started the data transmission (after channel has been established
+ * and initilized).
+ */
+static struct GNUNET_TIME_Absolute start_time;
+
+/**
+ * Peers handle.
+ */
+static struct GNUNET_TESTBED_Peer **testbed_peers;
+
+/**
+ * Statistics operation handle.
+ */
+static struct GNUNET_TESTBED_Operation *stats_op;
+
+/**
+ * Keepalives sent.
+ */
+static unsigned int ka_sent;
+
+/**
+ * Keepalives received.
+ */
+static unsigned int ka_received;
+
+/**
+ * How many messages were dropped by CADET because of full buffers?
+ */
+static unsigned int msg_dropped;
+
+
+/**
+ * Show the results of the test (banwidth acheived) and log them to GAUGER
+ */
+static void
+show_end_data (void)
+{
+  static struct GNUNET_TIME_Absolute end_time;
+  static struct GNUNET_TIME_Relative total_time;
+
+  end_time = GNUNET_TIME_absolute_get ();
+  total_time = GNUNET_TIME_absolute_get_difference (start_time, end_time);
+  FPRINTF (stderr,
+          "\nResults of test \"%s\"\n",
+          test_name);
+  FPRINTF (stderr,
+          "Test time %s\n",
+           GNUNET_STRINGS_relative_time_to_string (total_time, GNUNET_YES));
+  FPRINTF (stderr,
+          "Test bandwidth: %f kb/s\n",
+          4 * total_packets * 1.0 / (total_time.rel_value_us / 1000));    // 
4bytes * ms
+  FPRINTF (stderr,
+          "Test throughput: %f packets/s\n\n",
+          total_packets * 1000.0 / (total_time.rel_value_us / 1000));     // 
packets * ms
+  GAUGER ("CADET",
+         test_name,
+          total_packets * 1000.0 / (total_time.rel_value_us / 1000),
+          "packets/s");
+}
+
+
+/**
+ * Shut down peergroup, clean up.
+ *
+ * @param cls Closure (unused).
+ * @param tc Task Context.
+ */
+static void
+shutdown_task (void *cls)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Ending test.\n");
+  if (NULL != send_next_msg_task)
+  {
+    GNUNET_SCHEDULER_cancel (send_next_msg_task);
+    send_next_msg_task = NULL;
+  }
+  if (NULL != test_task)
+  {
+    GNUNET_SCHEDULER_cancel (test_task);
+    test_task = NULL;
+  }
+  for (unsigned int i = 0; i < 2; i++)
+    GNUNET_TESTBED_operation_done (t_op[i]);
+  if (NULL != outgoing_ch)
+  {
+    GNUNET_CADET_channel_destroy (outgoing_ch);
+    outgoing_ch = NULL;
+  }
+  if (NULL != incoming_ch)
+  {
+    GNUNET_CADET_channel_destroy (incoming_ch);
+    incoming_ch = NULL;
+  }
+  GNUNET_CADET_TEST_cleanup (test_ctx);
+}
+
+
+/**
+ * Stats callback. Finish the stats testbed operation and when all stats have
+ * been iterated, shutdown the test.
+ *
+ * @param cls Closure (line number from which termination was requested).
+ * @param op the operation that has been finished
+ * @param emsg error message in case the operation has failed; will be NULL if
+ *          operation has executed successfully.
+ */
+static void
+stats_cont (void *cls,
+           struct GNUNET_TESTBED_Operation *op,
+           const char *emsg)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+             "KA sent: %u, KA received: %u\n",
+              ka_sent,
+             ka_received);
+  if ((KEEPALIVE == test) && ((ka_sent < 2) || (ka_sent > ka_received + 1)))
+  {
+    GNUNET_break (0);
+    ok--;
+  }
+  GNUNET_TESTBED_operation_done (stats_op);
+
+  if (NULL != disconnect_task)
+    GNUNET_SCHEDULER_cancel (disconnect_task);
+  disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
+                                             cls);
+}
+
+
+/**
+ * Process statistic values.
+ *
+ * @param cls closure (line number, unused)
+ * @param peer the peer the statistic belong to
+ * @param subsystem name of subsystem that created the statistic
+ * @param name the name of the datum
+ * @param value the current value
+ * @param is_persistent #GNUNET_YES if the value is persistent, #GNUNET_NO if 
not
+ * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
+ */
+static int
+stats_iterator (void *cls,
+                const struct GNUNET_TESTBED_Peer *peer,
+                const char *subsystem,
+                const char *name,
+                uint64_t value,
+                int is_persistent)
+{
+  static const char *s_sent = "# keepalives sent";
+  static const char *s_recv = "# keepalives received";
+  static const char *rdrops = "# messages dropped due to full buffer";
+  static const char *cdrops = "# messages dropped due to slow client";
+  uint32_t i;
+
+  i = GNUNET_TESTBED_get_index (peer);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "STATS PEER %u - %s [%s]: %llu\n", i,
+              subsystem, name, (unsigned long long) value);
+  if (0 == strncmp (s_sent, name, strlen (s_sent)) && 0 == i)
+    ka_sent = value;
+  if (0 == strncmp (s_recv, name, strlen (s_recv)) && peers_requested - 1 == i)
+    ka_received = value;
+  if (0 == strncmp (rdrops, name, strlen (rdrops)))
+    msg_dropped += value;
+  if (0 == strncmp (cdrops, name, strlen (cdrops)))
+    msg_dropped += value;
+
+  return GNUNET_OK;
+}
+
+
+/**
+ * Task to gather all statistics.
+ *
+ * @param cls Closure (line from which the task was scheduled).
+ */
+static void
+gather_stats_and_exit (void *cls)
+{
+  long l = (long) cls;
+
+  disconnect_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "gathering statistics from line %ld\n",
+              l);
+  if (NULL != outgoing_ch)
+  {
+    GNUNET_CADET_channel_destroy (outgoing_ch);
+    outgoing_ch = NULL;
+  }
+  stats_op = GNUNET_TESTBED_get_statistics (peers_running,
+                                            testbed_peers,
+                                            "cadet",
+                                            NULL,
+                                            &stats_iterator,
+                                            stats_cont,
+                                            cls);
+}
+
+
+/**
+ * Abort test: schedule disconnect and shutdown immediately
+ *
+ * @param line Line in the code the abort is requested from (__LINE__).
+ */
+static void
+abort_test (long line)
+{
+  if (NULL != disconnect_task)
+  {
+    GNUNET_SCHEDULER_cancel (disconnect_task);
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+               "Aborting test from %ld\n",
+               line);
+    disconnect_task =
+        GNUNET_SCHEDULER_add_now (&disconnect_cadet_peers,
+                                 (void *) line);
+  }
+}
+
+
+/**
+ * Send a message on the channel with the appropriate size and payload.
+ *
+ * Update the appropriate *_sent counter.
+ *
+ * @param channel Channel to send the message on.
+ */
+static void
+send_test_message (struct GNUNET_CADET_Channel *channel)
+{
+  struct GNUNET_MQ_Envelope *env;
+  struct GNUNET_MessageHeader *msg;
+  uint32_t *data;
+  int payload;
+  int size;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending test message on channel %p\n",
+              channel);
+  size = size_payload;
+  if (GNUNET_NO == initialized)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending INITIALIZER\n");
+    size += 1000;
+    payload = data_sent;
+    if (SPEED_ACK == test) // FIXME unify SPEED_ACK with an initializer
+        data_sent++;
+  }
+  else if (SPEED == test || SPEED_ACK == test)
+  {
+    if (get_target_channel() == channel)
+    {
+      payload = ack_sent;
+      size += ack_sent;
+      ack_sent++;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Sending ACK %u [%d bytes]\n",
+                  payload, size);
+    }
+    else
+    {
+      payload = data_sent;
+      size += data_sent;
+      data_sent++;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Sending DATA %u [%d bytes]\n",
+                  data_sent, size);
+    }
+  }
+  else if (FORWARD == test)
+  {
+    payload = ack_sent;
+  }
+  else if (P2P_SIGNAL == test)
+  {
+    payload = data_sent;
+  }
+  else
+  {
+    GNUNET_assert (0);
+  }
+  env = GNUNET_MQ_msg_extra (msg, size, GNUNET_MESSAGE_TYPE_DUMMY);
+
+  data = (uint32_t *) &msg[1];
+  *data = htonl (payload);
+  GNUNET_MQ_send (GNUNET_CADET_get_mq (channel), env);
+}
+
+
+/**
+ * Task to request a new data transmission in a SPEED test, without waiting
+ * for previous messages to be sent/arrrive.
+ *
+ * @param cls Closure (unused).
+ */
+static void
+send_next_msg (void *cls)
+{
+  struct GNUNET_CADET_Channel *channel;
+
+  send_next_msg_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Sending next message: %d\n",
+             data_sent);
+
+  channel = GNUNET_YES == test_backwards ? incoming_ch : outgoing_ch;
+  GNUNET_assert (NULL != channel);
+  GNUNET_assert (SPEED == test);
+  send_test_message (channel);
+  if (data_sent < total_packets)
+  {
+    /* SPEED test: Send all messages as soon as possible */
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "Scheduling message %d\n",
+                data_sent + 1);
+    send_next_msg_task =
+      GNUNET_SCHEDULER_add_delayed (SEND_INTERVAL,
+                                     &send_next_msg,
+                                     NULL);
+  }
+}
+
+
+/**
+ * Check if payload is sane (size contains payload).
+ *
+ * @param cls should match #ch
+ * @param message The actual message.
+ * @return #GNUNET_OK to keep the channel open,
+ *         #GNUNET_SYSERR to close it (signal serious error).
+ */
+static int
+check_data (void *cls,
+            const struct GNUNET_MessageHeader *message)
+{
+  return GNUNET_OK;             /* all is well-formed */
+}
+
+
+/**
+ * Function is called whenever a message is received.
+ *
+ * @param cls closure (set from GNUNET_CADET_connect(), peer number)
+ * @param message the actual message
+ */
+static void
+handle_data (void *cls,
+            const struct GNUNET_MessageHeader *message)
+{
+  struct CadetTestChannelWrapper *ch = cls;
+  struct GNUNET_CADET_Channel *channel = ch->ch;
+  uint32_t *data;
+  uint32_t payload;
+  int *counter;
+
+  GNUNET_CADET_receive_done (channel);
+  counter = get_target_channel () == channel ? &data_received : &ack_received;
+  if (channel == outgoing_ch)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Root client got a message.\n");
+  }
+  else if (channel == incoming_ch)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Leaf client got a message.\n");
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Unknown channel %p.\n",
+                channel);
+    GNUNET_assert (0);
+  }
+
+  data = (uint32_t *) &message[1];
+  payload = ntohl (*data);
+  if (payload == *counter)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                "Payload as expected: %u\n",
+                payload);
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "Received payload %u, expected: %u\n",
+                payload, *counter);
+  }
+  (*counter)++;
+  if (get_target_channel () == channel) /* Got "data" */
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                " received data %u\n",
+                data_received);
+    if (data_received < total_packets)
+      return;
+  }
+  else /* Got "ack" */
+  {
+    if (SPEED_ACK == test || SPEED == test)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, " received ack %u\n", ack_received);
+      /* Send more data */
+      send_test_message (channel);
+      if (ack_received < total_packets && SPEED != test)
+        return;
+      if (ok == 2 && SPEED == test)
+        return;
+      show_end_data ();
+    }
+    if (test == P2P_SIGNAL)
+    {
+      GNUNET_CADET_channel_destroy (incoming_ch);
+      incoming_ch = NULL;
+    }
+    else
+    {
+      GNUNET_CADET_channel_destroy (outgoing_ch);
+      outgoing_ch = NULL;
+    }
+  }
+}
+
+
+/**
+ * Method called whenever a peer connects to a port in MQ-based CADET.
+ *
+ * @param cls Closure from #GNUNET_CADET_open_port (peer # as long).
+ * @param channel New handle to the channel.
+ * @param source Peer that started this channel.
+ * @return Closure for the incoming @a channel. It's given to:
+ *         - The #GNUNET_CADET_DisconnectEventHandler (given to
+ *           #GNUNET_CADET_open_port) when the channel dies.
+ *         - Each the #GNUNET_MQ_MessageCallback handlers for each message
+ *           received on the @a channel.
+ */
+static void *
+connect_handler (void *cls,
+                 struct GNUNET_CADET_Channel *channel,
+                 const struct GNUNET_PeerIdentity *source)
+{
+  struct CadetTestChannelWrapper *ch;
+  long peer = (long) cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+              "Incoming channel from %s to %ld: %p\n",
+              GNUNET_i2s (source),
+              peer,
+              channel);
+  if (peer == peers_requested - 1)
+  {
+    if (NULL != incoming_ch)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                  "Duplicate incoming channel for client %lu\n",
+                  (long) cls);
+      GNUNET_assert (0);
+    }
+    incoming_ch = channel;
+  }
+  else
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                "Incoming channel for unexpected peer #%lu\n",
+                (long) cls);
+    GNUNET_assert (0);
+  }
+  ch = GNUNET_new (struct CadetTestChannelWrapper);
+  ch->ch = channel;
+
+  return ch;
+}
+
+
+/**
+ * Function called whenever an MQ-channel is destroyed, even if the destruction
+ * was requested by #GNUNET_CADET_channel_destroy.
+ * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
+ *
+ * It should clean up any associated state, including cancelling any pending
+ * transmission on this channel.
+ *
+ * @param cls Channel closure (channel wrapper).
+ * @param channel Connection to the other end (henceforth invalid).
+ */
+static void
+disconnect_handler (void *cls,
+                   const struct GNUNET_CADET_Channel *channel)
+{
+  struct CadetTestChannelWrapper *ch_w = cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+             "Channel disconnected at %d\n",
+             ok);
+  GNUNET_assert (ch_w->ch == channel);
+  if (channel == incoming_ch)
+    incoming_ch = NULL;
+  else if (outgoing_ch == channel)
+    outgoing_ch = NULL;
+  else
+    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+               "Disconnect on unknown channel %p\n",
+               channel);
+  if (NULL != disconnect_task)
+    GNUNET_SCHEDULER_cancel (disconnect_task);
+  disconnect_task = GNUNET_SCHEDULER_add_now (&gather_stats_and_exit,
+                                              (void *) __LINE__);
+  GNUNET_free (ch_w);
+}
+
+
+/**
+ * Start the testcase, we know the peers and have handles to CADET.
+ *
+ * Testcase continues when the root receives confirmation of connected peers,
+ * on callback function ch.
+ *
+ * @param cls Closure (unused).
+ */
+static void
+start_test (void *cls)
+{
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DUMMY,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
+  struct CadetTestChannelWrapper *ch;
+  enum GNUNET_CADET_ChannelOption flags;
+
+  test_task = NULL;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "In start_test\n");
+  start_time = GNUNET_TIME_absolute_get ();
+  ch = GNUNET_new (struct CadetTestChannelWrapper);
+  outgoing_ch = GNUNET_CADET_channel_create (h1,
+                                             ch,
+                                             p_id[1],
+                                             &port,
+                                             flags,
+                                             NULL,
+                                             &disconnect_handler,
+                                             handlers);
+  ch->ch = outgoing_ch;
+  GNUNET_assert (NULL == disconnect_task);
+  disconnect_task
+    = GNUNET_SCHEDULER_add_delayed (short_time,
+                                    &gather_stats_and_exit,
+                                    (void *) __LINE__);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Sending data initializer on channel %p...\n",
+              outgoing_ch);
+  send_test_message (outgoing_ch);
+}
+
+
+/**
+ * Callback to be called when the requested peer information is available
+ *
+ * @param cls the closure from GNUNET_TESTBED_peer_get_information()
+ * @param op the operation this callback corresponds to
+ * @param pinfo the result; will be NULL if the operation has failed
+ * @param emsg error message if the operation has failed;
+ *             NULL if the operation is successfull
+ */
+static void
+pi_cb (void *cls,
+       struct GNUNET_TESTBED_Operation *op,
+       const struct GNUNET_TESTBED_PeerInformation *pinfo,
+       const char *emsg)
+{
+  long i = (long) cls;
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "ID callback for %ld\n",
+             i);
+  if ( (NULL == pinfo) ||
+       (NULL != emsg) )
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+               "pi_cb: %s\n",
+               emsg);
+    abort_test (__LINE__);
+    return;
+  }
+  p_id[i] = pinfo->result.id;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "id: %s\n",
+             GNUNET_i2s (p_id[i]));
+  p_ids++;
+  if (p_ids < 2)
+    return;
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Got all IDs, starting test\n");
+  test_task = GNUNET_SCHEDULER_add_now (&start_test,
+                                        NULL);
+}
+
+
+/**
+ * test main: start test when all peers are connected
+ *
+ * @param cls Closure.
+ * @param ctx Argument to give to GNUNET_CADET_TEST_cleanup on test end.
+ * @param num_peers Number of peers that are running.
+ * @param peers Array of peers.
+ * @param cadets Handle to each of the CADETs of the peers.
+ */
+static void
+tmain (void *cls,
+       struct GNUNET_CADET_TEST_Context *ctx,
+       unsigned int num_peers,
+       struct GNUNET_TESTBED_Peer **peers,
+       struct GNUNET_CADET_Handle **cadets)
+{
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "test main\n");
+  test_ctx = ctx;
+  peers_running = num_peers;
+  GNUNET_assert (peers_running == peers_requested);
+  testbed_peers = peers;
+  h1 = cadets[0];
+  h2 = cadets[num_peers - 1];
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+                                NULL);
+  p_ids = 0;
+  t_op[0] = GNUNET_TESTBED_peer_get_information (peers[0],
+                                                 GNUNET_TESTBED_PIT_IDENTITY,
+                                                 &pi_cb,
+                                                 (void *) 0L);
+  t_op[1] = GNUNET_TESTBED_peer_get_information (peers[num_peers - 1],
+                                                 GNUNET_TESTBED_PIT_IDENTITY,
+                                                 &pi_cb,
+                                                 (void *) 1L);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "requested peer ids\n");
+}
+
+
+/**
+ * Main: start test
+ */
+int
+main (int argc,
+      char *argv[])
+{
+  static const struct GNUNET_HashCode *ports[2];
+  struct GNUNET_MQ_MessageHandler handlers[] = {
+    GNUNET_MQ_hd_var_size (data,
+                           GNUNET_MESSAGE_TYPE_DUMMY,
+                           struct GNUNET_MessageHeader,
+                           NULL),
+    GNUNET_MQ_handler_end ()
+  };
+  const char *config_file = "test_cadet.conf";
+  char port_id[] = "test port";
+  struct GNUNET_GETOPT_CommandLineOption options[] = {
+    GNUNET_GETOPT_option_relative_time ('t',
+                                       "time",
+                                       "short_time",
+                                       gettext_noop ("set short timeout"),
+                                       &short_time),
+    GNUNET_GETOPT_option_uint ('m',
+                              "messages",
+                              "NUM_MESSAGES",
+                              gettext_noop ("set number of messages to send"),
+                              &total_packets),
+    GNUNET_GETOPT_option_uint ('p',
+                              "peers",
+                              "NUM_PEERS",
+                              gettext_noop ("number of peers to launch"),
+                              &peers_requested),
+    GNUNET_GETOPT_OPTION_END
+  };
+
+  GNUNET_log_setup ("test-cadet-flow",
+                    "DEBUG",
+                    NULL);
+  total_packets = TOTAL_PACKETS;
+  short_time = SHORT_TIME;
+  if (-1 == GNUNET_GETOPT_run (argv[0],
+                               options,
+                               argc,
+                               argv))
+  {
+    FPRINTF (stderr,
+             "test failed: problem with CLI parameters\n");
+    return 1;
+  }
+  GNUNET_CRYPTO_hash (port_id,
+                      sizeof (port_id),
+                      &port);
+  ports[0] = &port;
+  ports[1] = NULL;
+  GNUNET_CADET_TEST_ruN ("test_cadet_flow",
+                         config_file,
+                         peers_requested,
+                         &tmain,
+                         NULL,        /* tmain cls */
+                         &connect_handler,
+                         NULL,
+                         &disconnect_handler,
+                         handlers,
+                         ports);
+  return 0;
+}
+
+/* end of test_cadet_flow.c */
diff --git a/src/cadet/test_cadet_local_mq.c b/src/cadet/test_cadet_local_mq.c
index eab35c217..2ea754743 100644
--- a/src/cadet/test_cadet_local_mq.c
+++ b/src/cadet/test_cadet_local_mq.c
@@ -150,7 +150,8 @@ connected (void *cls,
 {
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "connected %s, cls: %p\n",
-              GNUNET_i2s(source), cls);
+              GNUNET_i2s(source),
+              cls);
   return channel;
 }
 
@@ -208,7 +209,8 @@ handle_data_received (void *cls,
 static void
 message_sent (void *cls)
 {
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "message sent\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "message sent\n");
 }
 
 

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]