gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated: Add API call to receive unb


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated: Add API call to receive unbiased peer stream for debugging and profiling
Date: Fri, 14 Sep 2018 00:39:57 +0200

This is an automated email from the git hooks/post-receive script.

julius-buenger pushed a commit to branch master
in repository gnunet.

The following commit(s) were added to refs/heads/master by this push:
     new e91d46cdb Add API call to receive unbiased peer stream for debugging 
and profiling
e91d46cdb is described below

commit e91d46cdbbc97414968fa751fd1f596757c56875
Author: Julius Bünger <address@hidden>
AuthorDate: Fri Sep 14 00:38:45 2018 +0200

    Add API call to receive unbiased peer stream for debugging and profiling
---
 src/include/gnunet_protocols.h   |  16 +++
 src/include/gnunet_rps_service.h |  25 ++++
 src/rps/gnunet-rps.c             |  51 ++++++-
 src/rps/gnunet-service-rps.c     | 278 ++++++++++++++++++++++++++++-----------
 src/rps/rps.h                    |  41 ++++++
 src/rps/rps_api.c                |  71 +++++++++-
 6 files changed, 399 insertions(+), 83 deletions(-)

diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 4400db7e1..56e049608 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2993,6 +2993,22 @@ extern "C"
 #define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_CANCEL  1132
 
 
+/**
+ * @brief Request biased input stream
+ */
+#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST 1133
+
+/**
+ * @brief Send peer of biased stream
+ */
+#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY   1134
+
+/**
+ * @brief Cancel getting biased strem
+ */
+#define GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL  1135
+
+
 
 /**
  * Next available: 1200
diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h
index b0c8c9867..252188c62 100644
--- a/src/include/gnunet_rps_service.h
+++ b/src/include/gnunet_rps_service.h
@@ -74,6 +74,14 @@ typedef void (* GNUNET_RPS_ViewUpdateCB) (void *cls,
     const struct GNUNET_PeerIdentity *peers);
 
 /**
+ * Callback called when a peer from the biased stream was received
+ *
+ * @param peer The received peer
+ */
+typedef void (* GNUNET_RPS_StreamInputCB) (void *cls,
+    const struct GNUNET_PeerIdentity *peer);
+
+/**
  * Connect to the rps service
  *
  * @param cfg configuration to use
@@ -161,6 +169,23 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle 
*rps_handle,
                          GNUNET_RPS_ViewUpdateCB view_update_cb,
                          void *cls);
 
+
+/**
+ * Request biased stream of peers that are being put into the sampler
+ *
+ * @param rps_handle handle to the rps service
+ * @param num_req_peers number of peers we want to receive
+ *        (0 for infinite updates)
+ * @param cls a closure that will be given to the callback
+ * @param ready_cb the callback called when the peers are available
+ */
+void
+GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
+                           uint32_t num_updates,
+                           GNUNET_RPS_StreamInputCB stream_input_cb,
+                           void *cls);
+
+
 /**
  * Disconnect from the rps service
  *
diff --git a/src/rps/gnunet-rps.c b/src/rps/gnunet-rps.c
index 03b2c8ab7..d2c497fd4 100644
--- a/src/rps/gnunet-rps.c
+++ b/src/rps/gnunet-rps.c
@@ -49,10 +49,20 @@ static struct GNUNET_PeerIdentity peer_id;
 static int view_update;
 
 /**
+ * @brief Do we want to receive updates of the view? (Option --view)
+ */
+static int stream_input;
+
+/**
  * @brief Number of updates we want to receive
  */
 static uint64_t num_view_updates;
 
+/**
+ * @brief Number of updates we want to receive
+ */
+static uint64_t num_stream_peers;
+
 
 /**
  * Task run when user presses CTRL-C to abort.
@@ -137,6 +147,22 @@ view_update_handle (void *cls,
 
 
 /**
+ * Callback called on receipt of peer from biased stream
+ *
+ * @param n number of peers
+ * @param recv_peers the received peers
+ */
+static void
+stream_input_handle (void *cls,
+                     const struct GNUNET_PeerIdentity *recv_peer)
+{
+  // TODO when source of peer is sent, also print source
+  FPRINTF (stdout, "%s\n",
+           GNUNET_i2s_full (recv_peer));
+}
+
+
+/**
  * Main function that will be run by the scheduler.
  *
  * @param cls closure
@@ -163,7 +189,8 @@ run (void *cls,
   }
 
   if ((0 == memcmp (&zero_pid, &peer_id, sizeof (peer_id))) &&
-      (!view_update))
+      (!view_update) &&
+      (!stream_input))
   { /* Request n PeerIDs */
     /* If number was specified use it, else request single peer. */
     if (NULL == args[0] ||
@@ -189,7 +216,23 @@ run (void *cls,
           "Requesting %" PRIu64 " view updates\n", num_view_updates);
     else
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-          "Requesting contiuous view updates\n");
+          "Requesting continuous view updates\n");
+    GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
+  } else if (stream_input)
+  {
+    /* Get updates of view */
+    if (NULL == args[0] ||
+        0 == sscanf (args[0], "%lu", &num_stream_peers))
+    {
+      num_stream_peers = 0;
+    }
+    GNUNET_RPS_stream_request (rps_handle, num_stream_peers, 
stream_input_handle, NULL);
+    if (0 != num_stream_peers)
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+          "Requesting %" PRIu64 " peers from biased stream\n", 
num_stream_peers);
+    else
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+          "Requesting continuous peers from biased stream\n");
     GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL);
   }
   else
@@ -223,6 +266,10 @@ main (int argc, char *const *argv)
                                "view",
                                gettext_noop ("Get updates of view (0 for 
infinite updates)"),
                                &view_update),
+    GNUNET_GETOPT_option_flag ('S',
+                               "stream",
+                               gettext_noop ("Get peers from biased stream"),
+                               &stream_input),
     GNUNET_GETOPT_OPTION_END
   };
   return (GNUNET_OK ==
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index 3e30041e8..5b78bb4a8 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -1769,6 +1769,12 @@ struct ClientContext
   int64_t view_updates_left;
 
   /**
+   * @brief How many peers from the biased
+   * stream this client expects to receive.
+   */
+  int64_t stream_peers_left;
+
+  /**
    * The client handle to send the reply to
    */
   struct GNUNET_SERVICE_Client *client;
@@ -2174,11 +2180,146 @@ insert_in_view (const struct GNUNET_PeerIdentity *peer)
   return ret;
 }
 
+
+/**
+ * @brief Send view to client
+ *
+ * @param cli_ctx the context of the client
+ * @param view_array the peerids of the view as array (can be empty)
+ * @param view_size the size of the view array (can be 0)
+ */
+void
+send_view (const struct ClientContext *cli_ctx,
+           const struct GNUNET_PeerIdentity *view_array,
+           uint64_t view_size)
+{
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
+
+  if (NULL == view_array)
+  {
+    view_size = View_size ();
+    view_array = View_get_as_array();
+  }
+
+  ev = GNUNET_MQ_msg_extra (out_msg,
+                            view_size * sizeof (struct GNUNET_PeerIdentity),
+                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
+  out_msg->num_peers = htonl (view_size);
+
+  GNUNET_memcpy (&out_msg[1],
+          view_array,
+          view_size * sizeof (struct GNUNET_PeerIdentity));
+  GNUNET_MQ_send (cli_ctx->mq, ev);
+}
+
+
+/**
+ * @brief Send peer from biased stream to client.
+ *
+ * @param cli_ctx the context of the client
+ * @param view_array the peerids of the view as array (can be empty)
+ * @param view_size the size of the view array (can be 0)
+ */
+void
+send_stream_peer (const struct ClientContext *cli_ctx,
+                  const struct GNUNET_PeerIdentity *peer)
+{
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_RPS_CS_DEBUG_StreamReply *out_msg;
+
+  GNUNET_assert (NULL != peer);
+
+  ev = GNUNET_MQ_msg (out_msg,
+                      GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY);
+
+  GNUNET_memcpy (&out_msg->peer,
+          peer,
+          sizeof (struct GNUNET_PeerIdentity));
+  GNUNET_MQ_send (cli_ctx->mq, ev);
+}
+
+
 /**
  * @brief sends updates to clients that are interested
  */
 static void
-clients_notify_view_update (void);
+clients_notify_view_update (void)
+{
+  struct ClientContext *cli_ctx_iter;
+  uint64_t num_peers;
+  const struct GNUNET_PeerIdentity *view_array;
+
+  num_peers = View_size ();
+  view_array = View_get_as_array();
+  /* check size of view is small enough */
+  if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                "View is too big to send\n");
+    return;
+  }
+
+  for (cli_ctx_iter = cli_ctx_head;
+       NULL != cli_ctx_iter;
+       cli_ctx_iter = cli_ctx_head->next)
+  {
+    if (1 < cli_ctx_iter->view_updates_left)
+    {
+      /* Client wants to receive limited amount of updates */
+      cli_ctx_iter->view_updates_left -= 1;
+    } else if (1 == cli_ctx_iter->view_updates_left)
+    {
+      /* Last update of view for client */
+      cli_ctx_iter->view_updates_left = -1;
+    } else if (0 > cli_ctx_iter->view_updates_left) {
+      /* Client is not interested in updates */
+      continue;
+    }
+    /* else _updates_left == 0 - infinite amount of updates */
+
+    /* send view */
+    send_view (cli_ctx_iter, view_array, num_peers);
+  }
+}
+
+
+/**
+ * @brief sends updates to clients that are interested
+ */
+static void
+clients_notify_stream_peer (const struct GNUNET_PeerIdentity *peer)
+                            //enum StreamPeerSource)
+{
+  struct ClientContext *cli_ctx_iter;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+      "Got peer (%s) from biased stream - update all clients\n",
+      GNUNET_i2s (peer));
+
+  /* check size of view is small enough */
+  for (cli_ctx_iter = cli_ctx_head;
+       NULL != cli_ctx_iter;
+       cli_ctx_iter = cli_ctx_head->next)
+  {
+    if (1 < cli_ctx_iter->stream_peers_left)
+    {
+      /* Client wants to receive limited amount of updates */
+      cli_ctx_iter->stream_peers_left -= 1;
+    } else if (1 == cli_ctx_iter->stream_peers_left)
+    {
+      /* Last update of view for client */
+      cli_ctx_iter->stream_peers_left = -1;
+    } else if (0 > cli_ctx_iter->stream_peers_left) {
+      /* Client is not interested in updates */
+      continue;
+    }
+    /* else _updates_left == 0 - infinite amount of updates */
+
+    /* send view */
+    send_stream_peer (cli_ctx_iter, peer);
+  }
+}
 
 /**
  * Put random peer from sampler into the view as history update.
@@ -2193,7 +2334,12 @@ hist_update (void *cls,
 
   for (i = 0; i < num_peers; i++)
   {
-    (void) insert_in_view (&ids[i]);
+    int inserted;
+    inserted = insert_in_view (&ids[i]);
+    if (GNUNET_OK == inserted)
+    {
+      clients_notify_stream_peer (&ids[i]);
+    }
     to_file (file_name_view_log,
              "+%s\t(hist)",
              GNUNET_i2s_full (ids));
@@ -2398,7 +2544,13 @@ insert_in_view_op (void *cls,
                    const struct GNUNET_PeerIdentity *peer)
 {
   (void) cls;
-  (void) insert_in_view (peer);
+  int inserted;
+
+  inserted = insert_in_view (peer);
+  if (GNUNET_OK == inserted)
+  {
+    clients_notify_stream_peer (peer);
+  }
 }
 
 
@@ -2860,104 +3012,54 @@ handle_client_seed (void *cls,
   GNUNET_SERVICE_client_continue (cli_ctx->client);
 }
 
-/**
- * @brief Send view to client
- *
- * @param cli_ctx the context of the client
- * @param view_array the peerids of the view as array (can be empty)
- * @param view_size the size of the view array (can be 0)
- */
-void
-send_view (const struct ClientContext *cli_ctx,
-           const struct GNUNET_PeerIdentity *view_array,
-           uint64_t view_size)
-{
-  struct GNUNET_MQ_Envelope *ev;
-  struct GNUNET_RPS_CS_DEBUG_ViewReply *out_msg;
-
-  if (NULL == view_array)
-  {
-    view_size = View_size ();
-    view_array = View_get_as_array();
-  }
-
-  ev = GNUNET_MQ_msg_extra (out_msg,
-                            view_size * sizeof (struct GNUNET_PeerIdentity),
-                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY);
-  out_msg->num_peers = htonl (view_size);
-
-  GNUNET_memcpy (&out_msg[1],
-          view_array,
-          view_size * sizeof (struct GNUNET_PeerIdentity));
-  GNUNET_MQ_send (cli_ctx->mq, ev);
-}
 
 /**
- * @brief sends updates to clients that are interested
+ * Handle RPS request from the client.
+ *
+ * @param cls closure
+ * @param message the actual message
  */
 static void
-clients_notify_view_update (void)
+handle_client_view_request (void *cls,
+                            const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
 {
-  struct ClientContext *cli_ctx_iter;
-  uint64_t num_peers;
-  const struct GNUNET_PeerIdentity *view_array;
+  struct ClientContext *cli_ctx = cls;
+  uint64_t num_updates;
 
-  num_peers = View_size ();
-  view_array = View_get_as_array();
-  /* check size of view is small enough */
-  if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                "View is too big to send\n");
-    return;
-  }
+  num_updates = ntohl (msg->num_updates);
 
-  for (cli_ctx_iter = cli_ctx_head;
-       NULL != cli_ctx_iter;
-       cli_ctx_iter = cli_ctx_head->next)
-  {
-    if (1 < cli_ctx_iter->view_updates_left)
-    {
-      /* Client wants to receive limited amount of updates */
-      cli_ctx_iter->view_updates_left -= 1;
-    } else if (1 == cli_ctx_iter->view_updates_left)
-    {
-      /* Last update of view for client */
-      cli_ctx_iter->view_updates_left = -1;
-    } else if (0 > cli_ctx_iter->view_updates_left) {
-      /* Client is not interested in updates */
-      continue;
-    }
-    /* else _updates_left == 0 - infinite amount of updates */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Client requested %" PRIu64 " updates of view.\n",
+       num_updates);
 
-    /* send view */
-    send_view (cli_ctx_iter, view_array, num_peers);
-  }
+  GNUNET_assert (NULL != cli_ctx);
+  cli_ctx->view_updates_left = num_updates;
+  send_view (cli_ctx, NULL, 0);
+  GNUNET_SERVICE_client_continue (cli_ctx->client);
 }
 
 
 /**
- * Handle RPS request from the client.
+ * Handle RPS request for biased stream from the client.
  *
  * @param cls closure
  * @param message the actual message
  */
 static void
-handle_client_view_request (void *cls,
-                            const struct GNUNET_RPS_CS_DEBUG_ViewRequest *msg)
+handle_client_stream_request (void *cls,
+                              const struct GNUNET_RPS_CS_DEBUG_StreamRequest 
*msg)
 {
   struct ClientContext *cli_ctx = cls;
-  uint64_t num_updates;
+  uint64_t num_peers;
 
-  num_updates = ntohl (msg->num_updates);
+  num_peers = ntohl (msg->num_peers);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Client requested %" PRIu64 " updates of view.\n",
-       num_updates);
+       "Client requested %" PRIu64 " peers from biased stream.\n",
+       num_peers);
 
   GNUNET_assert (NULL != cli_ctx);
-  cli_ctx->view_updates_left = num_updates;
-  send_view (cli_ctx, NULL, 0);
+  cli_ctx->stream_peers_left = num_peers;
   GNUNET_SERVICE_client_continue (cli_ctx->client);
 }
 
@@ -3727,8 +3829,14 @@ do_round (void *cls)
                                            CustomPeerMap_size (push_map));
     for (i = 0; i < first_border; i++)
     {
-      (void) insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
-                                                              permut[i]));
+      int inserted;
+      inserted = insert_in_view (CustomPeerMap_get_peer_by_index (push_map,
+                                                                  permut[i]));
+      if (GNUNET_OK == inserted)
+      {
+        clients_notify_stream_peer (
+            CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+      }
       to_file (file_name_view_log,
                "+%s\t(push list)",
                GNUNET_i2s_full (&view_array[i]));
@@ -3742,8 +3850,14 @@ do_round (void *cls)
                                            CustomPeerMap_size (pull_map));
     for (i = first_border; i < second_border; i++)
     {
-      (void) insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
+      int inserted;
+      inserted = insert_in_view (CustomPeerMap_get_peer_by_index (pull_map,
             permut[i - first_border]));
+      if (GNUNET_OK == inserted)
+      {
+        clients_notify_stream_peer (
+            CustomPeerMap_get_peer_by_index (push_map, permut[i]));
+      }
       to_file (file_name_view_log,
                "+%s\t(pull list)",
                GNUNET_i2s_full (&view_array[i]));
@@ -4296,6 +4410,10 @@ GNUNET_SERVICE_MAIN
    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REQUEST,
    struct GNUNET_RPS_CS_DEBUG_ViewRequest,
    NULL),
+ GNUNET_MQ_hd_fixed_size (client_stream_request,
+   GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST,
+   struct GNUNET_RPS_CS_DEBUG_StreamRequest,
+   NULL),
  GNUNET_MQ_handler_end());
 
 /* end of gnunet-service-rps.c */
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 58ba79082..66b2dd962 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -216,6 +216,47 @@ struct GNUNET_RPS_CS_DEBUG_ViewReply
 };
   /* Followed by num_peers * GNUNET_PeerIdentity */
 
+/**
+ * Message from client to service indicating that
+ * clients wants to get stream of biased peers
+ */
+struct GNUNET_RPS_CS_DEBUG_StreamRequest
+{
+  /**
+   * Header including size and type in NBO
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Number of peers
+   * 0 for sending updates until cancellation
+   */
+  uint32_t num_peers GNUNET_PACKED;
+};
+
+/**
+ * Message from service to client containing peer from biased stream
+ */
+struct GNUNET_RPS_CS_DEBUG_StreamReply
+{
+  /**
+   * Header including size and type in NBO
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Identifyer of the message.
+   */
+  uint32_t id GNUNET_PACKED;
+
+  /**
+   * @brief The peer of the biased stream
+   */
+  struct GNUNET_PeerIdentity peer;
+
+  // TODO maybe source of peer (pull/push list, peerinfo, ...)
+};
+
 GNUNET_NETWORK_STRUCT_END
 
 /***********************************************************************
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index ac462f3a0..b7644540d 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -61,9 +61,19 @@ struct GNUNET_RPS_Handle
   GNUNET_RPS_ViewUpdateCB view_update_cb;
 
   /**
-   * @brief Callback called on each update of the view
+   * @brief Closure to each requested update of the view
    */
   void *view_update_cls;
+
+  /**
+   * @brief Callback called on each peer of the biased input stream
+   */
+  GNUNET_RPS_StreamInputCB stream_input_cb;
+
+  /**
+   * @brief Closure to each requested peer from the biased stream
+   */
+  void *stream_input_cls;
 };
 
 
@@ -277,6 +287,37 @@ GNUNET_RPS_view_request (struct GNUNET_RPS_Handle 
*rps_handle,
   GNUNET_MQ_send (rps_handle->mq, ev);
 }
 
+
+/**
+ * Request biased stream of peers that are being put into the sampler
+ *
+ * @param rps_handle handle to the rps service
+ * @param num_req_peers number of peers we want to receive
+ *        (0 for infinite updates)
+ * @param cls a closure that will be given to the callback
+ * @param ready_cb the callback called when the peers are available
+ */
+void
+GNUNET_RPS_stream_request (struct GNUNET_RPS_Handle *rps_handle,
+                           uint32_t num_peers,
+                           GNUNET_RPS_StreamInputCB stream_input_cb,
+                           void *cls)
+{
+  struct GNUNET_MQ_Envelope *ev;
+  struct GNUNET_RPS_CS_DEBUG_StreamRequest *msg;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Client requests %" PRIu32 " biased stream updates\n",
+       num_peers);
+  rps_handle->stream_input_cb = stream_input_cb;
+  rps_handle->stream_input_cls = cls;
+
+  ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REQUEST);
+  msg->num_peers = htonl (num_peers);
+  GNUNET_MQ_send (rps_handle->mq, ev);
+}
+
+
 /**
  * This function is called, when the service updates the view.
  * It verifies that @a msg is well-formed.
@@ -303,6 +344,7 @@ check_view_update (void *cls,
   return GNUNET_OK;
 }
 
+
 /**
  * This function is called, when the service updated its view.
  * It calls the callback the caller provided
@@ -329,6 +371,29 @@ handle_view_update (void *cls,
 }
 
 
+/**
+ * This function is called, when the service sends another peer from the biased
+ * stream.
+ * It calls the callback the caller provided
+ * and disconnects afterwards.
+ *
+ * @param msg the message
+ */
+static void
+handle_stream_input (void *cls,
+                     const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
+{
+  struct GNUNET_RPS_Handle *h = cls;
+
+  /* Give the peers back */
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "New peer of biased input stream\n");
+
+  GNUNET_assert (NULL != h);
+  GNUNET_assert (NULL != h->stream_input_cb);
+  h->stream_input_cb (h->stream_input_cb, &msg->peer);
+}
+
 
 /**
  * Reconnect to the service
@@ -379,6 +444,10 @@ reconnect (struct GNUNET_RPS_Handle *h)
                            GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_VIEW_REPLY,
                            struct GNUNET_RPS_CS_DEBUG_ViewReply,
                            h),
+    GNUNET_MQ_hd_fixed_size (stream_input,
+                             GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_REPLY,
+                             struct GNUNET_RPS_CS_DEBUG_StreamReply,
+                             h),
     GNUNET_MQ_handler_end ()
   };
 

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]