gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (fbc5f3876 -> 39af429db)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (fbc5f3876 -> 39af429db)
Date: Fri, 23 Nov 2018 00:47:30 +0100

This is an automated email from the git hooks/post-receive script.

julius-buenger pushed a change to branch master
in repository gnunet.

    from fbc5f3876 add design sketch for new ATS API
     new 7b3d14aa4 RPS profiler: Dump more statistics
     new 101694a71 RPS api: Schedule callback
     new 5de6a9d7e fixes for DLL management and indentation
     new caef232e9 RPS API: Fix whitespaces
     new d8505f5da RPS tests: Get rid of warning (unused argument)
     new 39af429db RPS profiler: Dump more statistics to file

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/rps/gnunet-rps-profiler.c           | 210 +++++++++++++++++++++++++++++---
 src/rps/rps_api.c                       | 172 +++++++++++++++++---------
 src/rps/test_service_rps_custommap.c    |   5 +-
 src/rps/test_service_rps_sampler_elem.c |   5 +-
 4 files changed, 315 insertions(+), 77 deletions(-)

diff --git a/src/rps/gnunet-rps-profiler.c b/src/rps/gnunet-rps-profiler.c
index b17fb6a50..5ccf1017e 100644
--- a/src/rps/gnunet-rps-profiler.c
+++ b/src/rps/gnunet-rps-profiler.c
@@ -83,14 +83,28 @@ enum STAT_TYPE
   STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL,   /*   6 */
   STAT_TYPE_ISSUED_PUSH_SEND,         /*   7 */
   STAT_TYPE_ISSUED_PULL_REQ,          /*   8 */
-  STAT_TYPE_ISSUED_PULL_REP,          /*  9 */
-  STAT_TYPE_SENT_PUSH_SEND,           /*  10 */
-  STAT_TYPE_SENT_PULL_REQ,            /*  11 */
-  STAT_TYPE_SENT_PULL_REP,            /*  12 */
-  STAT_TYPE_RECV_PUSH_SEND,           /*  13 */
-  STAT_TYPE_RECV_PULL_REQ,            /*  14 */
-  STAT_TYPE_RECV_PULL_REP,            /*  15 */
-  STAT_TYPE_MAX,                      /*  16 */
+  STAT_TYPE_ISSUED_PULL_REQ_MH,       /*   9 */
+  STAT_TYPE_ISSUED_PULL_REP,          /*  10 */
+  STAT_TYPE_SENT_PUSH_SEND,           /*  11 */
+  STAT_TYPE_SENT_PULL_REQ,            /*  12 */
+  STAT_TYPE_SENT_PULL_REQ_MH,         /*  13 */
+  STAT_TYPE_SENT_PULL_REP,            /*  14 */
+  STAT_TYPE_RECV_PUSH_SEND,           /*  15 */
+  STAT_TYPE_RECV_PULL_REQ,            /*  16 */
+  STAT_TYPE_RECV_PULL_REQ_MH,         /*  17 */
+  STAT_TYPE_RECV_PULL_REP,            /*  18 */
+  STAT_TYPE_RECV_PULL_REP_MH,         /*  19 */
+  STAT_TYPE_VIEW_SIZE,                /*  20 */
+  STAT_TYPE_KNOWN_PEERS,              /*  21 */
+  STAT_TYPE_VALID_PEERS,              /*  22 */
+  STAT_TYPE_LEARND_PEERS,             /*  23 */
+  STAT_TYPE_PENDING_ONLINE_CHECKS,    /*  24 */
+  STAT_TYPE_UNREQUESTED_PULL_REPLIES, /*  25 */
+  STAT_TYPE_PEERS_IN_PUSH_MAP,        /*  26 */
+  STAT_TYPE_PEERS_IN_PULL_MAP,        /*  27 */
+  STAT_TYPE_PEERS_IN_VIEW,            /*  28 */
+  STAT_TYPE_VIEW_SIZE_AIM,            /*  29 */
+  STAT_TYPE_MAX,                      /*  30 */
 };
 
 static char* stat_type_strings[] = {
@@ -103,13 +117,27 @@ static char* stat_type_strings[] = {
   "# rounds blocked - no pushes, no pull replies",
   "# push send issued",
   "# pull request send issued",
+  "# pull request send issued (multi-hop peer)",
   "# pull reply send issued",
   "# pushes sent",
   "# pull requests sent",
+  "# pull requests sent (multi-hop peer)",
   "# pull replys sent",
   "# push message received",
   "# pull request message received",
+  "# pull request message received (multi-hop peer)",
   "# pull reply messages received",
+  "# pull reply messages received (multi-hop peer)",
+  "view size",
+  "# known peers",
+  "# valid peers",
+  "# learnd peers",
+  "# pending online checks",
+  "# unrequested pull replies",
+  "# peers in push map at end of round",
+  "# peers in pull map at end of round",
+  "# peers in view at end of round",
+  "view size aim",
 };
 
 struct STATcls
@@ -182,6 +210,12 @@ enum STAT_TYPE stat_str_2_type (const char *stat_str)
   {
     return STAT_TYPE_ISSUED_PULL_REQ;
   }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_ISSUED_PULL_REQ_MH],
+                         stat_str,
+                         strlen 
(stat_type_strings[STAT_TYPE_ISSUED_PULL_REQ_MH])))
+  {
+    return STAT_TYPE_ISSUED_PULL_REQ_MH;
+  }
   else if (0 == strncmp (stat_type_strings[STAT_TYPE_ISSUED_PULL_REP],
                          stat_str,
                          strlen 
(stat_type_strings[STAT_TYPE_ISSUED_PULL_REP])))
@@ -200,6 +234,12 @@ enum STAT_TYPE stat_str_2_type (const char *stat_str)
   {
     return STAT_TYPE_SENT_PULL_REQ;
   }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_SENT_PULL_REQ_MH],
+                         stat_str,
+                         strlen 
(stat_type_strings[STAT_TYPE_SENT_PULL_REQ_MH])))
+  {
+    return STAT_TYPE_SENT_PULL_REQ_MH;
+  }
   else if (0 == strncmp (stat_type_strings[STAT_TYPE_SENT_PULL_REP],
                          stat_str,
                          strlen (stat_type_strings[STAT_TYPE_SENT_PULL_REP])))
@@ -218,12 +258,84 @@ enum STAT_TYPE stat_str_2_type (const char *stat_str)
   {
     return STAT_TYPE_RECV_PULL_REQ;
   }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_RECV_PULL_REQ_MH],
+                         stat_str,
+                         strlen 
(stat_type_strings[STAT_TYPE_RECV_PULL_REQ_MH])))
+  {
+    return STAT_TYPE_RECV_PULL_REQ_MH;
+  }
   else if (0 == strncmp (stat_type_strings[STAT_TYPE_RECV_PULL_REP],
                          stat_str,
                          strlen (stat_type_strings[STAT_TYPE_RECV_PULL_REP])))
   {
     return STAT_TYPE_RECV_PULL_REP;
   }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_RECV_PULL_REP_MH],
+                         stat_str,
+                         strlen 
(stat_type_strings[STAT_TYPE_RECV_PULL_REP_MH])))
+  {
+    return STAT_TYPE_RECV_PULL_REP_MH;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_VIEW_SIZE],
+                         stat_str,
+                         strlen (stat_type_strings[STAT_TYPE_VIEW_SIZE])))
+  {
+    return STAT_TYPE_VIEW_SIZE;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_KNOWN_PEERS],
+                         stat_str,
+                         strlen (stat_type_strings[STAT_TYPE_KNOWN_PEERS])))
+  {
+    return STAT_TYPE_KNOWN_PEERS;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_VALID_PEERS],
+                         stat_str,
+                         strlen (stat_type_strings[STAT_TYPE_VALID_PEERS])))
+  {
+    return STAT_TYPE_VALID_PEERS;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_LEARND_PEERS],
+                         stat_str,
+                         strlen (stat_type_strings[STAT_TYPE_LEARND_PEERS])))
+  {
+    return STAT_TYPE_LEARND_PEERS;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_PENDING_ONLINE_CHECKS],
+                         stat_str,
+                         strlen 
(stat_type_strings[STAT_TYPE_PENDING_ONLINE_CHECKS])))
+  {
+    return STAT_TYPE_PENDING_ONLINE_CHECKS;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_UNREQUESTED_PULL_REPLIES],
+                         stat_str,
+                         strlen 
(stat_type_strings[STAT_TYPE_UNREQUESTED_PULL_REPLIES])))
+  {
+    return STAT_TYPE_UNREQUESTED_PULL_REPLIES;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_PEERS_IN_PUSH_MAP],
+                         stat_str,
+                         strlen 
(stat_type_strings[STAT_TYPE_PEERS_IN_PUSH_MAP])))
+  {
+    return STAT_TYPE_PEERS_IN_PUSH_MAP;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_PEERS_IN_PULL_MAP],
+                         stat_str,
+                         strlen 
(stat_type_strings[STAT_TYPE_PEERS_IN_PULL_MAP])))
+  {
+    return STAT_TYPE_PEERS_IN_PULL_MAP;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_PEERS_IN_VIEW],
+                         stat_str,
+                         strlen (stat_type_strings[STAT_TYPE_PEERS_IN_VIEW])))
+  {
+    return STAT_TYPE_PEERS_IN_VIEW;
+  }
+  else if (0 == strncmp (stat_type_strings[STAT_TYPE_VIEW_SIZE_AIM],
+                         stat_str,
+                         strlen (stat_type_strings[STAT_TYPE_VIEW_SIZE_AIM])))
+  {
+    return STAT_TYPE_VIEW_SIZE_AIM;
+  }
   return STAT_TYPE_MAX;
 }
 
@@ -2337,13 +2449,23 @@ void write_final_stats (void){
   for (uint32_t i = 0; i < num_peers; i++)
   {
     to_file ("/tmp/rps/final_stats.csv",
-             ", %" PRIu32 ", " /* index */
+             "%" PRIu32 ", " /* index */
              "%s, %" /* id */
              PRIu64 ", %" /* rounds */
              PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" 
PRIu64 ", %" /* blocking */
-             PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" /* issued */
-             PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" /* sent */
-             PRIu64 ", %" PRIu64 ", %" PRIu64 /* recv */,
+             PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" /* issued */
+             PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" /* sent */
+             PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" 
/* recv */
+             PRIu64 ", %" /* view size */
+             PRIu64 ", %" /* known peers */
+             PRIu64 ", %" /* valid peers */
+             PRIu64 ", %" /* learned peers */
+             PRIu64 ", %" /* pending online checks */
+             PRIu64 ", %" /* unrequested pull replies */
+             PRIu64 ", %" /* peers in push map */
+             PRIu64 ", %" /* peers in pull map */
+             PRIu64 ", %" /* peers in view */
+             PRIu64 "\n"/* view size aim */,
              i,
              GNUNET_i2s (rps_peers[i].peer_id),
              rps_peers[i].stats[STAT_TYPE_ROUNDS],
@@ -2355,13 +2477,27 @@ void write_final_stats (void){
              rps_peers[i].stats[STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL],
              rps_peers[i].stats[STAT_TYPE_ISSUED_PUSH_SEND],
              rps_peers[i].stats[STAT_TYPE_ISSUED_PULL_REQ],
+             rps_peers[i].stats[STAT_TYPE_ISSUED_PULL_REQ_MH],
              rps_peers[i].stats[STAT_TYPE_ISSUED_PULL_REP],
              rps_peers[i].stats[STAT_TYPE_SENT_PUSH_SEND],
              rps_peers[i].stats[STAT_TYPE_SENT_PULL_REQ],
+             rps_peers[i].stats[STAT_TYPE_SENT_PULL_REQ_MH],
              rps_peers[i].stats[STAT_TYPE_SENT_PULL_REP],
              rps_peers[i].stats[STAT_TYPE_RECV_PUSH_SEND],
              rps_peers[i].stats[STAT_TYPE_RECV_PULL_REQ],
-             rps_peers[i].stats[STAT_TYPE_RECV_PULL_REP]);
+             rps_peers[i].stats[STAT_TYPE_RECV_PULL_REQ_MH],
+             rps_peers[i].stats[STAT_TYPE_RECV_PULL_REP_MH],
+             rps_peers[i].stats[STAT_TYPE_RECV_PULL_REP],
+             rps_peers[i].stats[STAT_TYPE_VIEW_SIZE],
+             rps_peers[i].stats[STAT_TYPE_KNOWN_PEERS],
+             rps_peers[i].stats[STAT_TYPE_VALID_PEERS],
+             rps_peers[i].stats[STAT_TYPE_LEARND_PEERS],
+             rps_peers[i].stats[STAT_TYPE_PENDING_ONLINE_CHECKS],
+             rps_peers[i].stats[STAT_TYPE_UNREQUESTED_PULL_REPLIES],
+             rps_peers[i].stats[STAT_TYPE_PEERS_IN_PUSH_MAP],
+             rps_peers[i].stats[STAT_TYPE_PEERS_IN_PULL_MAP],
+             rps_peers[i].stats[STAT_TYPE_PEERS_IN_VIEW],
+             rps_peers[i].stats[STAT_TYPE_VIEW_SIZE_AIM]);
     for (enum STAT_TYPE stat_type = STAT_TYPE_ROUNDS;
          stat_type < STAT_TYPE_MAX;
          stat_type++)
@@ -2373,9 +2509,19 @@ void write_final_stats (void){
            "SUM %"
            PRIu64 " %" /* rounds */
            PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 
" %" /* blocking */
-           PRIu64 " %" PRIu64 " %" PRIu64 " %" /* issued */
-           PRIu64 " %" PRIu64 " %" PRIu64 " %" /* sent */
-           PRIu64 " %" PRIu64 " %" PRIu64 /* recv */,
+           PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" /* issued */
+           PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" /* sent */
+           PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" PRIu64 ", %" /* 
recv */
+           PRIu64 ", %" /* view size */
+           PRIu64 ", %" /* known peers */
+           PRIu64 ", %" /* valid peers */
+           PRIu64 ", %" /* learned peers */
+           PRIu64 ", %" /* pending online checks */
+           PRIu64 ", %" /* unrequested pull replies */
+           PRIu64 ", %" /* peers in push map */
+           PRIu64 ", %" /* peers in pull map */
+           PRIu64 ", %" /* peers in view */
+           PRIu64 "\n"/* view size aim */,
            sums[STAT_TYPE_ROUNDS],
            sums[STAT_TYPE_BLOCKS],
            sums[STAT_TYPE_BLOCKS_MANY_PUSH],
@@ -2385,13 +2531,27 @@ void write_final_stats (void){
            sums[STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL],
            sums[STAT_TYPE_ISSUED_PUSH_SEND],
            sums[STAT_TYPE_ISSUED_PULL_REQ],
+           sums[STAT_TYPE_ISSUED_PULL_REQ_MH],
            sums[STAT_TYPE_ISSUED_PULL_REP],
            sums[STAT_TYPE_SENT_PUSH_SEND],
            sums[STAT_TYPE_SENT_PULL_REQ],
+           sums[STAT_TYPE_SENT_PULL_REQ_MH],
            sums[STAT_TYPE_SENT_PULL_REP],
            sums[STAT_TYPE_RECV_PUSH_SEND],
            sums[STAT_TYPE_RECV_PULL_REQ],
-           sums[STAT_TYPE_RECV_PULL_REP]);
+           sums[STAT_TYPE_RECV_PULL_REQ_MH],
+           sums[STAT_TYPE_RECV_PULL_REP],
+           sums[STAT_TYPE_RECV_PULL_REP_MH],
+           sums[STAT_TYPE_VIEW_SIZE],
+           sums[STAT_TYPE_KNOWN_PEERS],
+           sums[STAT_TYPE_VALID_PEERS],
+           sums[STAT_TYPE_LEARND_PEERS],
+           sums[STAT_TYPE_PENDING_ONLINE_CHECKS],
+           sums[STAT_TYPE_UNREQUESTED_PULL_REPLIES],
+           sums[STAT_TYPE_PEERS_IN_PUSH_MAP],
+           sums[STAT_TYPE_PEERS_IN_PULL_MAP],
+           sums[STAT_TYPE_PEERS_IN_VIEW],
+           sums[STAT_TYPE_VIEW_SIZE_AIM]);
 }
 
 /**
@@ -2681,13 +2841,27 @@ run (void *cls,
                                     BIT(STAT_TYPE_BLOCKS_NO_PUSH_NO_PULL) |
                                     BIT(STAT_TYPE_ISSUED_PUSH_SEND) |
                                     BIT(STAT_TYPE_ISSUED_PULL_REQ) |
+                                    BIT(STAT_TYPE_ISSUED_PULL_REQ_MH) |
                                     BIT(STAT_TYPE_ISSUED_PULL_REP) |
                                     BIT(STAT_TYPE_SENT_PUSH_SEND) |
                                     BIT(STAT_TYPE_SENT_PULL_REQ) |
+                                    BIT(STAT_TYPE_SENT_PULL_REQ_MH) |
                                     BIT(STAT_TYPE_SENT_PULL_REP) |
                                     BIT(STAT_TYPE_RECV_PUSH_SEND) |
                                     BIT(STAT_TYPE_RECV_PULL_REQ) |
-                                    BIT(STAT_TYPE_RECV_PULL_REP);
+                                    BIT(STAT_TYPE_RECV_PULL_REQ_MH) |
+                                    BIT(STAT_TYPE_RECV_PULL_REP) |
+                                    BIT(STAT_TYPE_RECV_PULL_REP_MH) |
+                                    BIT(STAT_TYPE_VIEW_SIZE) |
+                                    BIT(STAT_TYPE_KNOWN_PEERS) |
+                                    BIT(STAT_TYPE_VALID_PEERS) |
+                                    BIT(STAT_TYPE_LEARND_PEERS) |
+                                    BIT(STAT_TYPE_PENDING_ONLINE_CHECKS) |
+                                    BIT(STAT_TYPE_UNREQUESTED_PULL_REPLIES) |
+                                    BIT(STAT_TYPE_PEERS_IN_PUSH_MAP) |
+                                    BIT(STAT_TYPE_PEERS_IN_PULL_MAP) |
+                                    BIT(STAT_TYPE_PEERS_IN_VIEW) |
+                                    BIT(STAT_TYPE_VIEW_SIZE_AIM);
   cur_test_run.have_collect_view = COLLECT_VIEW;
 
   /* 'Clean' directory */
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index cfab06f17..a489b4ff1 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -11,7 +11,7 @@
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      Affero General Public License for more details.
-    
+
      You should have received a copy of the GNU Affero General Public License
      along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
@@ -52,6 +52,11 @@ struct GNUNET_RPS_StreamRequestHandle
   void *ready_cb_cls;
 
   /**
+   * @brief Scheduler task for scheduled callback
+   */
+  struct GNUNET_SCHEDULER_Task *callback_task;
+
+  /**
    * @brief Next element of the DLL
    */
   struct GNUNET_RPS_StreamRequestHandle *next;
@@ -172,6 +177,19 @@ struct cb_cls_pack
 
 
 /**
+ * @brief Peers received from the biased stream to be passed to all
+ * srh_handlers
+ */
+static struct GNUNET_PeerIdentity *srh_callback_peers;
+
+/**
+ * @brief Number of peers in the biased stream that are to be passed to all
+ * srh_handlers
+ */
+static uint64_t srh_callback_num_peers;
+
+
+/**
  * @brief Create a new handle for a stream request
  *
  * @param rps_handle The rps handle
@@ -189,7 +207,6 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
   struct GNUNET_RPS_StreamRequestHandle *srh;
 
   srh = GNUNET_new (struct GNUNET_RPS_StreamRequestHandle);
-
   srh->rps_handle = rps_handle;
   srh->ready_cb = ready_cb;
   srh->ready_cb_cls = cls;
@@ -205,18 +222,21 @@ new_stream_request (struct GNUNET_RPS_Handle *rps_handle,
  * @brief Remove the given stream request from the list of requests and memory
  *
  * @param srh The request to be removed
- * @param srh_head Head of the DLL to remove request from
- * @param srh_tail Tail of the DLL to remove request from
  */
 static void
-remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh,
-                       struct GNUNET_RPS_StreamRequestHandle *srh_head,
-                       struct GNUNET_RPS_StreamRequestHandle *srh_tail)
+remove_stream_request (struct GNUNET_RPS_StreamRequestHandle *srh)
 {
-  GNUNET_CONTAINER_DLL_remove (srh_head,
-                               srh_tail,
-                               srh);
+  struct GNUNET_RPS_Handle *rps_handle = srh->rps_handle;
 
+  GNUNET_assert (NULL != srh);
+  if (NULL != srh->callback_task)
+  {
+    GNUNET_SCHEDULER_cancel (srh->callback_task);
+    srh->callback_task = NULL;
+  }
+  GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
+                               rps_handle->stream_requests_tail,
+                               srh);
   GNUNET_free (srh);
 }
 
@@ -230,7 +250,7 @@ remove_stream_request (struct 
GNUNET_RPS_StreamRequestHandle *srh,
  * @param num_peers The number of peers that have been returned
  * @param cls The #GNUNET_RPS_Request_Handle
  */
-void
+static void
 peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
                 uint32_t num_peers,
                 void *cls)
@@ -256,7 +276,7 @@ peers_ready_cb (const struct GNUNET_PeerIdentity *peers,
  * @param num_peers The number of peer that have been returned
  * @param peers The array of @a num_peers that have been returned
  */
-void
+static void
 collect_peers_cb (void *cls,
                   uint64_t num_peers,
                   const struct GNUNET_PeerIdentity *peers)
@@ -425,13 +445,10 @@ GNUNET_RPS_stream_cancel (struct 
GNUNET_RPS_StreamRequestHandle *srh)
 {
   struct GNUNET_RPS_Handle *rps_handle;
 
-  GNUNET_assert (NULL != srh);
   rps_handle = srh->rps_handle;
-  GNUNET_CONTAINER_DLL_remove (rps_handle->stream_requests_head,
-                               rps_handle->stream_requests_tail,
-                               srh);
-  GNUNET_free (srh);
-  if (NULL == rps_handle->stream_requests_head) cancel_stream (rps_handle);
+  remove_stream_request (srh);
+  if (NULL == rps_handle->stream_requests_head)
+    cancel_stream (rps_handle);
 }
 
 
@@ -463,6 +480,24 @@ check_stream_input (void *cls,
   return GNUNET_OK;
 }
 
+
+/**
+ * @brief Called by the scheduler to call the callbacks of the srh handlers
+ *
+ * @param cls Stream request handle
+ */
+static void
+srh_callback_scheduled (void *cls)
+{
+  struct GNUNET_RPS_StreamRequestHandle *srh = cls;
+
+  srh->callback_task = NULL;
+  srh->ready_cb (srh->ready_cb_cls,
+                 srh_callback_num_peers,
+                 srh_callback_peers);
+}
+
+
 /**
  * This function is called, when the service sends another peer from the biased
  * stream.
@@ -476,26 +511,35 @@ handle_stream_input (void *cls,
                      const struct GNUNET_RPS_CS_DEBUG_StreamReply *msg)
 {
   struct GNUNET_RPS_Handle *h = cls;
-  const struct GNUNET_PeerIdentity *peers;
+  //const struct GNUNET_PeerIdentity *peers;
   uint64_t num_peers;
   struct GNUNET_RPS_StreamRequestHandle *srh_iter;
   struct GNUNET_RPS_StreamRequestHandle *srh_next;
 
-  peers = (struct GNUNET_PeerIdentity *) &msg[1];
+  //peers = (struct GNUNET_PeerIdentity *) &msg[1];
   num_peers = ntohl (msg->num_peers);
+  srh_callback_num_peers = num_peers;
+  GNUNET_free_non_null (srh_callback_peers);
+  srh_callback_peers = GNUNET_new_array (num_peers,
+                                        struct GNUNET_PeerIdentity);
+  GNUNET_memcpy (srh_callback_peers,
+                 &msg[1],
+                 num_peers * sizeof (struct GNUNET_PeerIdentity));
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Received %" PRIu64 " peer(s) from stream input.\n",
        num_peers);
-  srh_iter = h->stream_requests_head;
-  while (NULL != srh_iter)
+  for (srh_iter = h->stream_requests_head;
+       NULL != srh_iter;
+       srh_iter = srh_next)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Calling srh \n");
     /* Store next pointer - srh might be removed/freed in callback */
     srh_next = srh_iter->next;
-    srh_iter->ready_cb (srh_iter->ready_cb_cls,
-                        num_peers,
-                        peers);
-    srh_iter = srh_next;
+    if (NULL != srh_iter->callback_task)
+      GNUNET_SCHEDULER_cancel (srh_iter->callback_task);
+    srh_iter->callback_task =
+      GNUNET_SCHEDULER_add_now (&srh_callback_scheduled,
+                               srh_iter);
   }
 
   if (NULL == h->stream_requests_head)
@@ -531,6 +575,7 @@ mq_error_handler (void *cls,
        1: READ,\n\
        2: WRITE,\n\
        4: TIMEOUT\n",
+       // TODO: write GNUNET_MQ_strerror (error)
        error);
   reconnect (h);
   /* Resend all pending request as the service destroyed its knowledge
@@ -542,17 +587,20 @@ mq_error_handler (void *cls,
  * @brief Create the hash value from the share value that defines the sub
  * (-group)
  *
- * @param share_val Share value - strings longer than 508 (512 - 4) will be
- *        truncated.
- * @param hash Pointer to the location in which the hash will be stored.
+ * @param share_val Share value
+ * @param hash[out] Pointer to the location in which the hash will be stored.
  */
 static void
-hash_from_share_val (const char *share_val, struct GNUNET_HashCode *hash)
+hash_from_share_val (const char *share_val,
+                    struct GNUNET_HashCode *hash)
 {
-  char hash_port_string[512] = "rps";
-
-  (void) strncat (hash_port_string, share_val, 508);
-  GNUNET_CRYPTO_hash (hash_port_string, strlen (hash_port_string), hash);
+  GNUNET_CRYPTO_kdf (hash,
+                    sizeof (struct GNUNET_HashCode),
+                    "rps",
+                    strlen ("rps"),
+                    share_val,
+                    strlen (share_val),
+                    NULL, 0);
 }
 
 
@@ -707,12 +755,10 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
   struct GNUNET_MQ_Envelope *ev;
   struct GNUNET_RPS_CS_SeedMessage *msg;
 
-  unsigned int i;
-
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Client wants to seed %" PRIu32 " peers:\n",
        n);
-  for (i = 0 ; i < n ; i++)
+  for (unsigned int i = 0 ; i < n ; i++)
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "%u. peer: %s\n",
          i,
@@ -730,12 +776,15 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
 
   while (GNUNET_MAX_MESSAGE_SIZE < size_needed)
   {
-    ev = GNUNET_MQ_msg_extra (msg, num_peers_max * sizeof (struct 
GNUNET_PeerIdentity),
-        GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
+    ev = GNUNET_MQ_msg_extra (msg,
+                             num_peers_max * sizeof (struct 
GNUNET_PeerIdentity),
+                             GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
     msg->num_peers = htonl (num_peers_max);
-    GNUNET_memcpy (&msg[1], tmp_peer_pointer, num_peers_max * sizeof (struct 
GNUNET_PeerIdentity));
-    GNUNET_MQ_send (h->mq, ev);
-
+    GNUNET_memcpy (&msg[1],
+                  tmp_peer_pointer,
+                  num_peers_max * sizeof (struct GNUNET_PeerIdentity));
+    GNUNET_MQ_send (h->mq,
+                   ev);
     n -= num_peers_max;
     size_needed = sizeof (struct GNUNET_RPS_CS_SeedMessage) +
                   n * sizeof (struct GNUNET_PeerIdentity);
@@ -743,12 +792,15 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h,
     tmp_peer_pointer = &ids[num_peers_max];
   }
 
-  ev = GNUNET_MQ_msg_extra (msg, n * sizeof (struct GNUNET_PeerIdentity),
+  ev = GNUNET_MQ_msg_extra (msg,
+                           n * sizeof (struct GNUNET_PeerIdentity),
                             GNUNET_MESSAGE_TYPE_RPS_CS_SEED);
   msg->num_peers = htonl (n);
-  GNUNET_memcpy (&msg[1], tmp_peer_pointer, n * sizeof (struct 
GNUNET_PeerIdentity));
-
-  GNUNET_MQ_send (h->mq, ev);
+  GNUNET_memcpy (&msg[1],
+                tmp_peer_pointer,
+                n * sizeof (struct GNUNET_PeerIdentity));
+  GNUNET_MQ_send (h->mq,
+                 ev);
 }
 
 
@@ -836,7 +888,9 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h,
   if ( (2 == type) ||
        (3 == type) )
     msg->attacked_peer = *target_peer;
-  GNUNET_memcpy (&msg[1], tmp_peer_pointer, num_peers * sizeof (struct 
GNUNET_PeerIdentity));
+  GNUNET_memcpy (&msg[1],
+                tmp_peer_pointer,
+                num_peers * sizeof (struct GNUNET_PeerIdentity));
 
   GNUNET_MQ_send (h->mq, ev);
 }
@@ -855,10 +909,9 @@ GNUNET_RPS_request_cancel (struct 
GNUNET_RPS_Request_Handle *rh)
 
   h = rh->rps_handle;
   GNUNET_assert (NULL != rh);
-  GNUNET_assert (NULL != rh->srh);
-  remove_stream_request (rh->srh,
-                         h->stream_requests_head,
-                         h->stream_requests_tail);
+  GNUNET_assert (h == rh->srh->rps_handle);
+  GNUNET_RPS_stream_cancel (rh->srh);
+  rh->srh = NULL;
   if (NULL == h->stream_requests_head) cancel_stream(h);
   if (NULL != rh->sampler_rh)
   {
@@ -879,18 +932,23 @@ GNUNET_RPS_disconnect (struct GNUNET_RPS_Handle *h)
 {
   if (NULL != h->stream_requests_head)
   {
-    struct GNUNET_RPS_StreamRequestHandle *srh_iter;
+    struct GNUNET_RPS_StreamRequestHandle *srh_next;
 
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Still waiting for replies\n");
-    srh_iter = h->stream_requests_head;
-    while (NULL != srh_iter)
+    for (struct GNUNET_RPS_StreamRequestHandle *srh_iter = 
h->stream_requests_head;
+        NULL != srh_iter;
+        srh_iter = srh_next)
     {
-      struct GNUNET_RPS_StreamRequestHandle *srh_tmp = srh_iter;
-      srh_iter = srh_iter->next;
-      GNUNET_RPS_stream_cancel (srh_tmp);
+      srh_next = srh_iter->next;
+      GNUNET_RPS_stream_cancel (srh_iter);
     }
   }
+  if (NULL != srh_callback_peers)
+  {
+    GNUNET_free (srh_callback_peers);
+    srh_callback_peers = NULL;
+  }
   if (NULL != h->view_update_cb)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
diff --git a/src/rps/test_service_rps_custommap.c 
b/src/rps/test_service_rps_custommap.c
index 8ce03070e..003523087 100644
--- a/src/rps/test_service_rps_custommap.c
+++ b/src/rps/test_service_rps_custommap.c
@@ -11,7 +11,7 @@
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      Affero General Public License for more details.
-    
+
      You should have received a copy of the GNU Affero General Public License
      along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
@@ -112,6 +112,9 @@ check ()
 int
 main (int argc, char *argv[])
 {
+  (void) argc;
+  (void) argv;
+
   GNUNET_log_setup ("test_service_rps_peers", 
                    "WARNING",
                    NULL);
diff --git a/src/rps/test_service_rps_sampler_elem.c 
b/src/rps/test_service_rps_sampler_elem.c
index 43efc8691..c68a3e384 100644
--- a/src/rps/test_service_rps_sampler_elem.c
+++ b/src/rps/test_service_rps_sampler_elem.c
@@ -11,7 +11,7 @@
      WITHOUT ANY WARRANTY; without even the implied warranty of
      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
      Affero General Public License for more details.
-    
+
      You should have received a copy of the GNU Affero General Public License
      along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
@@ -194,6 +194,9 @@ check ()
 int
 main (int argc, char *argv[])
 {
+  (void) argc;
+  (void) argv;
+
   GNUNET_log_setup ("test_service_rps_peers", 
                    "WARNING",
                    NULL);

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]