gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r26526 - in gnunet/src: consensus include


From: gnunet
Subject: [GNUnet-SVN] r26526 - in gnunet/src: consensus include
Date: Thu, 21 Mar 2013 02:06:40 +0100

Author: dold
Date: 2013-03-21 02:06:40 +0100 (Thu, 21 Mar 2013)
New Revision: 26526

Added:
   gnunet/src/consensus/consensus_flout.h
Modified:
   gnunet/src/consensus/consensus_protocol.h
   gnunet/src/consensus/gnunet-consensus-ibf.c
   gnunet/src/consensus/gnunet-consensus.c
   gnunet/src/consensus/gnunet-service-consensus.c
   gnunet/src/consensus/ibf.c
   gnunet/src/consensus/ibf.h
   gnunet/src/consensus/test_consensus.conf
   gnunet/src/include/gnunet_container_lib.h
Log:
fixed consensus multi-peer communication, memory leaks, various bugs


Added: gnunet/src/consensus/consensus_flout.h
===================================================================
--- gnunet/src/consensus/consensus_flout.h                              (rev 0)
+++ gnunet/src/consensus/consensus_flout.h      2013-03-21 01:06:40 UTC (rev 
26526)
@@ -0,0 +1,60 @@
+/*
+      This file is part of GNUnet
+      (C) 2012 Christian Grothoff (and other contributing authors)
+
+      GNUnet is free software; you can redistribute it and/or modify
+      it under the terms of the GNU General Public License as published
+      by the Free Software Foundation; either version 2, or (at your
+      option) any later version.
+
+      GNUnet is distributed in the hope that it will be useful, but
+      WITHOUT ANY WARRANTY; without even the implied warranty of
+      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+      General Public License for more details.
+
+      You should have received a copy of the GNU General Public License
+      along with GNUnet; see the file COPYING.  If not, write to the
+      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+      Boston, MA 02111-1307, USA.
+ */
+
+/**
+ * @file consensus/consensus_flout.h
+ * @brief intentionally misbehave in certain ways for testing
+ * @author Florian Dold
+ */
+
+#ifndef GNUNET_CONSENSUS_FLOUT_H
+#define GNUNET_CONSENSUS_FLOUT_H
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0                           /* keep Emacsens' auto-indent happy */
+}
+#endif
+#endif
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_consensus_service.h"
+
+void
+GNUNET_CONSENSUS_flout_disable_peer (struct GNUNET_CONSENSUS_Handle 
*consensus);
+
+void
+GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle 
*consensus, struct GNUNET_HashCode *element_hash);
+
+void
+GNUNET_CONSENSUS_flout_ignore_element_hash (struct GNUNET_CONSENSUS_Handle 
*consensus, struct GNUNET_HashCode *element_hash);
+
+
+
+#if 0                           /* keep Emacsens' auto-indent happy */
+{
+#endif
+#ifdef __cplusplus
+}
+#endif
+
+#endif

Modified: gnunet/src/consensus/consensus_protocol.h
===================================================================
--- gnunet/src/consensus/consensus_protocol.h   2013-03-20 19:15:55 UTC (rev 
26525)
+++ gnunet/src/consensus/consensus_protocol.h   2013-03-21 01:06:40 UTC (rev 
26526)
@@ -76,12 +76,10 @@
   struct GNUNET_HashCode global_id;
 };
 
-struct ConsensusRoundHeader
+struct ConsensusRoundMessage
 {
   struct GNUNET_MessageHeader header;
   uint8_t round;
-  uint8_t exp_round;
-  uint8_t exp_subround;
 };
 
 

Modified: gnunet/src/consensus/gnunet-consensus-ibf.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-ibf.c 2013-03-20 19:15:55 UTC (rev 
26525)
+++ gnunet/src/consensus/gnunet-consensus-ibf.c 2013-03-21 01:06:40 UTC (rev 
26526)
@@ -19,7 +19,7 @@
 */
 
 /**
- * @file consensus/gnunet-consensus-ibf
+ * @file consensus/gnunet-consensus-ibf.c
  * @brief tool for reconciling data with invertible bloom filters
  * @author Florian Dold
  */

Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c     2013-03-20 19:15:55 UTC (rev 
26525)
+++ gnunet/src/consensus/gnunet-consensus.c     2013-03-21 01:06:40 UTC (rev 
26526)
@@ -39,6 +39,8 @@
 
 static struct GNUNET_CONSENSUS_Handle **consensus_handles;
 
+static struct GNUNET_TESTBED_Operation **testbed_operations;
+
 static unsigned int num_connected_handles;
 
 static struct GNUNET_TESTBED_Peer **peers;
@@ -49,7 +51,9 @@
 
 static struct GNUNET_HashCode session_id;
 
+static unsigned int peers_done = 0;
 
+
 /**
  * Signature of the event handler function called by the
  * respective event controller.
@@ -64,7 +68,6 @@
   GNUNET_assert (0);
 }
 
-
 static void
 destroy (void *cls, const struct GNUNET_SCHEDULER_TaskContext *ctx)
 {
@@ -72,14 +75,21 @@
   consensus = cls;
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying consensus\n");
   GNUNET_CONSENSUS_destroy (consensus);
+  peers_done++;
+  if (peers_done == num_peers)
+  {
+    int i;
+    for (i = 0; i < num_peers; i++)
+      GNUNET_TESTBED_operation_done (testbed_operations[i]);
+    GNUNET_SCHEDULER_shutdown ();
+  }
 }
 
 
 /**
  * Called when a conclusion was successful.
  *
- * @param cls
- * @param group
+ * @param cls closure, the consensus handle
  * @return GNUNET_YES if more consensus groups should be offered, GNUNET_NO if 
not
  */
 static void
@@ -255,8 +265,9 @@
     num_retrieved_peer_ids++;
     if (num_retrieved_peer_ids == num_peers)
       for (i = 0; i < num_peers; i++)
-        GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus", 
connect_complete, &consensus_handles[i],
-                                        connect_adapter, disconnect_adapter, 
NULL);
+        testbed_operations[i] =
+            GNUNET_TESTBED_service_connect (NULL, peers[i], "consensus", 
connect_complete, &consensus_handles[i],
+                                            connect_adapter, 
disconnect_adapter, NULL);
   }
   else
   {
@@ -272,7 +283,6 @@
 {
   int i;
 
-
   GNUNET_log_setup ("gnunet-consensus", "INFO", NULL);
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
@@ -283,6 +293,7 @@
   peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
 
   consensus_handles = GNUNET_malloc (num_peers * sizeof (struct 
ConsensusHandle *));
+  testbed_operations = GNUNET_malloc (num_peers * sizeof (struct 
ConsensusHandle *));
 
   for (i = 0; i < num_peers; i++)
     GNUNET_TESTBED_peer_get_information (peers[i],

Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2013-03-20 19:15:55 UTC 
(rev 26525)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2013-03-21 01:06:40 UTC 
(rev 26526)
@@ -124,7 +124,43 @@
 };
 
 
+struct ElementList
+{
+  struct ElementList *next;
+  struct GNUNET_CONSENSUS_Element *element;
+  struct GNUNET_HashCode *element_hash;
+};
+
+
 /**
+ * Describes the current round a consensus session is in.
+ */
+enum ConsensusRound
+{
+  /**
+   * Not started the protocol yet.
+   */
+  CONSENSUS_ROUND_BEGIN=0,
+  /**
+   * Distribution of elements with the exponential scheme.
+   */
+  CONSENSUS_ROUND_EXCHANGE,
+  /**
+   * Exchange which elements each peer has, but not the elements.
+   */
+  CONSENSUS_ROUND_INVENTORY,
+  /**
+   * Collect and distribute missing values.
+   */
+  CONSENSUS_ROUND_STOCK,
+  /**
+   * Consensus concluded.
+   */
+  CONSENSUS_ROUND_FINISH
+};
+
+
+/**
  * Information about a peer that is in a consensus session.
  */
 struct ConsensusPeerInformation
@@ -148,8 +184,6 @@
    */
   int is_outgoing;
 
-  int connected;
-
   /**
    * Did we receive/send a consensus hello?
    */
@@ -246,6 +280,15 @@
    */
   int exp_subround_finished;
 
+  int inventory_synced;
+
+  /**
+   * Round this peer seems to be in, according to the last SE we got.
+   * Necessary to store this, as we sometimes need to respond to a request 
from an
+   * older round, while we are already in the next round.
+   */
+  enum ConsensusRound apparent_round;
+
 };
 
 typedef void (*QueuedMessageCallback) (void *msg);
@@ -272,32 +315,6 @@
   void *cls;
 };
 
-/**
- * Describes the current round a consensus session is in.
- */
-enum ConsensusRound
-{
-  /**
-   * Not started the protocol yet.
-   */
-  CONSENSUS_ROUND_BEGIN=0,
-  /**
-   * Distribution of elements with the exponential scheme.
-   */
-  CONSENSUS_ROUND_EXCHANGE,
-  /**
-   * Exchange which elements each peer has, but not the elements.
-   */
-  CONSENSUS_ROUND_INVENTORY,
-  /**
-   * Collect and distribute missing values.
-   */
-  CONSENSUS_ROUND_STOCK,
-  /**
-   * Consensus concluded.
-   */
-  CONSENSUS_ROUND_FINISH
-};
 
 struct StrataEstimator
 {
@@ -342,7 +359,8 @@
   /**
    * Elements in the consensus set of this session,
    * all of them either have been sent by or approved by the client.
-   * Contains GNUNET_CONSENSUS_Element.
+   * Contains ElementList.
+   * Used as a unique-key hashmap.
    */
   struct GNUNET_CONTAINER_MultiHashMap *values;
 
@@ -544,6 +562,8 @@
  *
  * @param cpi peer
  * @param msg message we want to queue
+ * @param cb callback, called when the message is given to strem
+ * @param cls closure for cb
  */
 static void
 queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct 
GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls)
@@ -572,14 +592,14 @@
 }
 
 
-
+/*
 static void
 clear_peer_messages (struct ConsensusPeerInformation *cpi)
 {
-  /* FIXME: deallocate */
   cpi->messages_head = NULL;
   cpi->messages_tail = NULL;
 }
+*/
 
 
 /**
@@ -592,8 +612,8 @@
  * @return the estimated difference
  */
 static int
-estimate_difference (struct StrataEstimator *se1,
-                     struct StrataEstimator *se2)
+estimate_difference (const struct StrataEstimator *se1,
+                     const struct StrataEstimator *se2)
 {
   int i;
   int count;
@@ -701,42 +721,36 @@
 }
 
 
-/**
- * Iterator over hash map entries.
- * Queue elements to be sent to the peer in cls.
- *
- * @param cls closure
- * @param key current key code
- * @param value value in the hash map
- * @return GNUNET_YES if we should continue to
- *         iterate,
- *         GNUNET_NO if not.
- */
-static int
-send_element_iter (void *cls,
-                   const struct GNUNET_HashCode *key,
-                   void *value)
+static void
+send_elements (struct ConsensusPeerInformation *cpi, struct ElementList *head)
 {
-  struct ConsensusPeerInformation *cpi;
   struct GNUNET_CONSENSUS_Element *element;
   struct GNUNET_MessageHeader *element_msg;
   size_t msize;
 
-  cpi = cls;
-  element = value;
-  msize = sizeof (struct GNUNET_MessageHeader) + element->size;
-  element_msg = GNUNET_malloc (msize);
-  element_msg->size = htons (msize);
-  if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round)
-    element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
-  else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)
-    element_msg->type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
-  else
-    GNUNET_assert (0);
-  GNUNET_assert (NULL != element->data);
-  memcpy (&element_msg[1], element->data, element->size);
-  queue_peer_message (cpi, element_msg);
-  return GNUNET_YES;
+  while (NULL != head)
+  {
+    element = head->element;
+    msize = sizeof (struct GNUNET_MessageHeader) + element->size;
+    element_msg = GNUNET_malloc (msize);
+    element_msg->size = htons (msize);
+    switch (cpi->apparent_round)
+    {
+      case CONSENSUS_ROUND_STOCK:
+      case CONSENSUS_ROUND_EXCHANGE:
+        element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS);
+        break;
+      case CONSENSUS_ROUND_INVENTORY:
+        element_msg->type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
+        break;
+      default:
+        GNUNET_break (0);
+    }
+    GNUNET_assert (NULL != element->data);
+    memcpy (&element_msg[1], element->data, element->size);
+    queue_peer_message (cpi, element_msg);
+    head = head->next;
+  }
 }
 
 /**
@@ -755,8 +769,13 @@
                      void *value)
 {
   struct ConsensusPeerInformation *cpi;
+  struct ElementList *head;
+  struct IBF_Key ibf_key;
   cpi = cls;
-  ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key_from_hashcode (key));
+  head = value;
+  ibf_key = ibf_key_from_hashcode (head->element_hash);
+  GNUNET_assert (ibf_key.key_val == ibf_key_from_hashcode (key).key_val);
+  ibf_insert (cpi->session->ibfs[cpi->ibf_order], ibf_key);
   return GNUNET_YES;
 }
 
@@ -764,7 +783,7 @@
  * Create and populate an IBF for the specified peer,
  * if it does not already exist.
  *
- * @param peer to create the ibf for
+ * @param cpi peer to create the ibf for
  */
 static void
 prepare_ibf (struct ConsensusPeerInformation *cpi)
@@ -791,25 +810,71 @@
   GNUNET_assert (0);
 }
 
-static void
-fin_sent_cb (void *cls)
+
+static int
+exp_subround_finished (const struct ConsensusSession *session)
 {
-  struct ConsensusPeerInformation *cpi;
   int not_finished;
-  cpi = cls;
-  cpi->exp_subround_finished = GNUNET_YES;
-  /* the subround is only really over if *both* partners are done */
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", 
cpi->session->local_peer_idx);
   not_finished = 0;
-  if ((cpi->session->partner_outgoing != NULL) && 
(cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO))
+  if ((session->partner_outgoing != NULL) && 
(session->partner_outgoing->exp_subround_finished == GNUNET_NO))
       not_finished++;
-  if ((cpi->session->partner_incoming != NULL) && 
(cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO))
+  if ((session->partner_incoming != NULL) && 
(session->partner_incoming->exp_subround_finished == GNUNET_NO))
       not_finished++;
   if (0 == not_finished)
-    subround_over (cpi->session, NULL);
+    return GNUNET_YES;
+  return GNUNET_NO;
 }
 
+static int
+inventory_round_finished (struct ConsensusSession *session)
+{
+  int i;
+  int finished;
+  finished = 0;
+  for (i = 0; i < session->num_peers; i++)
+    if (GNUNET_YES == session->info[i].inventory_synced)
+      finished++;
+  if (finished >= (session->num_peers / 2))
+    return GNUNET_YES;
+  return GNUNET_NO;
+}
 
+
+
+static void
+fin_sent_cb (void *cls)
+{
+  struct ConsensusPeerInformation *cpi;
+  cpi = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sent FIN\n", 
cpi->session->local_peer_idx);
+  switch (cpi->session->current_round)
+  {
+    case CONSENSUS_ROUND_EXCHANGE:
+    case CONSENSUS_ROUND_STOCK:
+      /* the subround is only really over if *both* partners are done */
+      if (cpi->session->current_round != cpi->apparent_round)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: FIN to SYNC from the 
past\n", cpi->session->local_peer_idx);
+        break;
+      }
+      cpi->exp_subround_finished = GNUNET_YES;
+      if (GNUNET_YES == exp_subround_finished (cpi->session))
+        subround_over (cpi->session, NULL);
+      else
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after 
FIN sent\n", cpi->session->local_peer_idx);
+      break;
+    case CONSENSUS_ROUND_INVENTORY:
+      cpi->inventory_synced = GNUNET_YES;
+      if (inventory_round_finished (cpi->session) && 
cpi->session->current_round == cpi->apparent_round)
+        round_over (cpi->session, NULL);
+      /* FIXME: maybe go to next round */
+      break;
+    default:
+      GNUNET_break (0);
+  }
+}
+
+
 /**
  * Gets called when the other peer wants us to inform that
  * it has decoded our ibf and sent us all elements / requests
@@ -817,17 +882,23 @@
 static int
 handle_p2p_synced (struct ConsensusPeerInformation *cpi, const struct 
GNUNET_MessageHeader *msg)
 {
-  struct GNUNET_MessageHeader *fin_msg;
+  struct ConsensusRoundMessage *fin_msg;
+
   switch (cpi->session->current_round)
   {
+    case CONSENSUS_ROUND_INVENTORY:
+      cpi->inventory_synced = GNUNET_YES;
+    case CONSENSUS_ROUND_STOCK:
     case CONSENSUS_ROUND_EXCHANGE:
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SYNC from P%d\n", 
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
       fin_msg = GNUNET_malloc (sizeof *fin_msg);
-      fin_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
-      fin_msg->size = htons (sizeof *fin_msg);
+      fin_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_FIN);
+      fin_msg->header.size = htons (sizeof *fin_msg);
+      fin_msg->round = cpi->apparent_round;
       /* the subround os over once we kicked off sending the fin msg */
       /* FIXME: assert we are talking to the right peer! */
-      queue_peer_message_with_cls (cpi, fin_msg, fin_sent_cb, cpi);
+      queue_peer_message_with_cls (cpi, (struct GNUNET_MessageHeader *) 
fin_msg, fin_sent_cb, cpi);
+      /* FIXME: mark peer as synced */
       break;
     default:
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected SYNCED message the 
current round\n");
@@ -836,30 +907,40 @@
   return GNUNET_YES;
 }
 
+
 /**
  * The other peer wants us to inform that he sent us all the elements we 
requested.
  */
 static int
 handle_p2p_fin (struct ConsensusPeerInformation *cpi, const struct 
GNUNET_MessageHeader *msg)
 {
+  struct ConsensusRoundMessage *round_msg;
+  round_msg = (struct ConsensusRoundMessage *) msg;
   /* FIXME: only call subround_over if round is the current one! */
   switch (cpi->session->current_round)
   {
     case CONSENSUS_ROUND_EXCHANGE:
-    {
-      int not_finished;
+    case CONSENSUS_ROUND_STOCK:
+      if (cpi->session->current_round != round_msg->round)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (past 
round)\n", cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
+        cpi->ibf_state = IBF_STATE_NONE;
+        cpi->ibf_bucket_counter = 0;
+        break;
+      }
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (exp)\n", 
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
       cpi->exp_subround_finished = GNUNET_YES;
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d\n", 
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
-      /* the subround is only really over if *both* partners are done */
-      not_finished = 0;
-      if ((cpi->session->partner_outgoing != NULL) && 
(cpi->session->partner_outgoing->exp_subround_finished == GNUNET_NO))
-          not_finished++;
-      if ((cpi->session->partner_incoming != NULL) && 
(cpi->session->partner_incoming->exp_subround_finished == GNUNET_NO))
-          not_finished++;
-      if (0 == not_finished)
+      if (GNUNET_YES == exp_subround_finished (cpi->session))
         subround_over (cpi->session, NULL);
-    }
+      else
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: still waiting for more after 
got FIN\n", cpi->session->local_peer_idx);
     break;
+    case CONSENSUS_ROUND_INVENTORY:
+      cpi->inventory_synced = GNUNET_YES;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got FIN from P%d (a2a)\n", 
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
+      if (inventory_round_finished (cpi->session))
+        round_over (cpi->session, NULL);
+      break;
     default:
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "unexpected FIN message the 
current round\n");
       break;
@@ -884,7 +965,39 @@
   return se;
 }
 
+static void
+strata_estimator_destroy (struct StrataEstimator *se)
+{
+  int i;
+  for (i = 0; i < STRATA_COUNT; i++)
+    ibf_destroy (se->strata[i]);
+  GNUNET_free (se->strata);
+  GNUNET_free (se);
+}
 
+
+static int
+is_premature_strata_message (const struct ConsensusSession *session, const 
struct StrataMessage *strata_msg)
+{
+  switch (strata_msg->round)
+  {
+    case CONSENSUS_ROUND_STOCK:
+    case CONSENSUS_ROUND_EXCHANGE:
+      /* here, we also have to compare subrounds */
+      if ( (strata_msg->round != session->current_round) ||
+           (strata_msg->exp_round != session->exp_round) ||
+           (strata_msg->exp_subround != session->exp_subround))
+        return GNUNET_YES;
+      break;
+    default:
+      if (session->current_round != strata_msg->round)
+        return GNUNET_YES;
+    break;
+  }
+  return GNUNET_NO;
+}
+
+
 /**
  * Called when a peer sends us its strata estimator.
  * In response, we sent out IBF of appropriate size back.
@@ -900,32 +1013,28 @@
   void *buf;
   size_t size;
 
-
-
-  switch (cpi->session->current_round)
+  if ((cpi->session->current_round == CONSENSUS_ROUND_STOCK) && 
(strata_msg->round == CONSENSUS_ROUND_INVENTORY))
   {
-    case CONSENSUS_ROUND_EXCHANGE:
-      if ( (strata_msg->round != CONSENSUS_ROUND_EXCHANGE) ||
-           (strata_msg->exp_round != cpi->session->exp_round) ||
-           (strata_msg->exp_subround != cpi->session->exp_subround))
-      {
-        if (GNUNET_NO == cpi->replaying_strata_message)
-        {
-          GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE 
from P%d, (%d,%d)\n",
-                      cpi->session->local_peer_idx, (int) (cpi - 
cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround);
-          cpi->premature_strata_message = (struct StrataMessage *) 
GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
-        }
-        return GNUNET_YES;
-      }
-      break;
-    default:
-      GNUNET_assert (0);
-      break;
+    /* we still have to handle this request appropriately */
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got inventory SE from P%d, we 
are already further alog\n",
+                cpi->session->local_peer_idx, (int) (cpi - 
cpi->session->info));
   }
+  else if (is_premature_strata_message (cpi->session, strata_msg))
+  {
+    if (GNUNET_NO == cpi->replaying_strata_message)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got probably premature SE from 
P%d, (%d,%d)\n",
+                  cpi->session->local_peer_idx, (int) (cpi - 
cpi->session->info), strata_msg->exp_round, strata_msg->exp_subround);
+      cpi->premature_strata_message = (struct StrataMessage *) 
GNUNET_copy_message ((struct GNUNET_MessageHeader *) strata_msg);
+    }
+    return GNUNET_YES;
+  }
 
   if (NULL == cpi->se)
     cpi->se = strata_estimator_create ();
 
+  cpi->apparent_round = strata_msg->round;
+
   size = ntohs (strata_msg->header.size);
   buf = (void *) &strata_msg[1];
   for (i = 0; i < STRATA_COUNT; i++)
@@ -937,25 +1046,32 @@
 
   diff = estimate_difference (cpi->session->se, cpi->se);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d\n",
-              cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got SE from P%d, diff=%d\n",
+              cpi->session->local_peer_idx, (int) (cpi - cpi->session->info), 
diff);
 
-  if ( (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round) ||
-       (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round))
+  switch (cpi->session->current_round)
   {
-    /* send IBF of the right size */
-    cpi->ibf_order = 0;
-    while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << 
cpi->ibf_order) )
-      cpi->ibf_order++;
-    if (cpi->ibf_order > MAX_IBF_ORDER)
-      cpi->ibf_order = MAX_IBF_ORDER;
-    cpi->ibf_order += 1;
-    /* create ibf if not already pre-computed */
-    prepare_ibf (cpi);
-    cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
-    cpi->ibf_state = IBF_STATE_TRANSMITTING;
-    cpi->ibf_bucket_counter = 0;
-    send_ibf (cpi);
+    case CONSENSUS_ROUND_EXCHANGE:
+    case CONSENSUS_ROUND_INVENTORY:
+    case CONSENSUS_ROUND_STOCK:
+      /* send IBF of the right size */
+      cpi->ibf_order = 0;
+      while (((1 << cpi->ibf_order) < diff) || STRATA_HASH_NUM > (1 << 
cpi->ibf_order) )
+        cpi->ibf_order++;
+      if (cpi->ibf_order > MAX_IBF_ORDER)
+        cpi->ibf_order = MAX_IBF_ORDER;
+      cpi->ibf_order += 1;
+      /* create ibf if not already pre-computed */
+      prepare_ibf (cpi);
+      cpi->ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]);
+      cpi->ibf_state = IBF_STATE_TRANSMITTING;
+      cpi->ibf_bucket_counter = 0;
+      send_ibf (cpi);
+      break;
+    default:
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got unexpected SE from P%d\n",
+                  cpi->session->local_peer_idx, (int) (cpi - 
cpi->session->info));
+      break;
   }
   return GNUNET_YES;
 }
@@ -973,7 +1089,7 @@
   switch (cpi->ibf_state)
   {
     case IBF_STATE_NONE:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving first ibf of order %d\n", 
digest->order);
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d\n", 
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
       cpi->ibf_state = IBF_STATE_RECEIVING;
       cpi->ibf_order = digest->order;
       cpi->ibf_bucket_counter = 0;
@@ -984,7 +1100,8 @@
       }
       break;
     case IBF_STATE_ANTICIPATE_DIFF:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving decode fail ibf of order 
%d\n", digest->order);
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: receiving IBF from P%d 
(probably out IBF did not decode)\n",
+                  cpi->session->local_peer_idx, (int) (cpi - 
cpi->session->info));
       cpi->ibf_state = IBF_STATE_RECEIVING;
       cpi->ibf_order = digest->order;
       cpi->ibf_bucket_counter = 0;
@@ -997,18 +1114,16 @@
     case IBF_STATE_RECEIVING:
       break;
     default:
-      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "received ibf unexpectedly in 
state %d\n", cpi->ibf_state);
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: unexpected IBF from P%d\n", 
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
       return GNUNET_YES;
   }
 
   if (cpi->ibf_bucket_counter + num_buckets > (1 << cpi->ibf_order))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "received malformed ibf\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: overfull IBF from P%d\n", 
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
     return GNUNET_YES;
   }
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", 
num_buckets,
-              cpi->ibf_bucket_counter, (1 << cpi->ibf_order));
 
   if (NULL == cpi->ibf)
     cpi->ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0);
@@ -1041,6 +1156,17 @@
   struct GNUNET_CONSENSUS_ElementMessage *client_element_msg;
   size_t size;
 
+  switch (cpi->session->current_round)
+  {
+    case CONSENSUS_ROUND_STOCK:
+      /* FIXME: check if we really expect the element */
+    case CONSENSUS_ROUND_EXCHANGE:
+      break;
+    default:
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "got unexpected element, 
ignoring\n");
+      return GNUNET_YES;
+  }
+
   size = ntohs (element_msg->size) - sizeof *element_msg;
 
   element = GNUNET_malloc (size + sizeof *element);
@@ -1069,7 +1195,9 @@
 
 /**
  * Handle a request for elements.
- * Only allowed in exchange-rounds.
+ * 
+ * @param cpi peer that is requesting the element
+ * @param msg the element request message
  */
 static int
 handle_p2p_element_request (struct ConsensusPeerInformation *cpi, const struct 
ElementRequest *msg)
@@ -1078,14 +1206,18 @@
   struct IBF_Key *ibf_key;
   unsigned int num;
 
+  /* element requests are allowed in every round */
+
   num = ntohs (msg->header.size) / sizeof (struct IBF_Key);
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "handling element request for %u 
elements\n", num);
   
   ibf_key = (struct IBF_Key *) &msg[1];
   while (num--)
   {
+    struct ElementList *head;
     ibf_hashcode_from_key (*ibf_key, &hashcode);
-    GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, 
&hashcode, send_element_iter, cpi);
+    head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &hashcode);
+    send_elements (cpi, head);
     ibf_key++;
   }
   return GNUNET_YES;
@@ -1162,11 +1294,13 @@
   size_t msize;
   int i;
 
+  cpi->apparent_round = cpi->session->current_round;
+  cpi->ibf_state = IBF_STATE_NONE;
+  cpi->ibf_bucket_counter = 0;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE to P%d\n",
-              cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending SE(%d) to P%d\n",
+              cpi->session->local_peer_idx, cpi->session->current_round, (int) 
(cpi - cpi->session->info));
 
-
   msize = (sizeof *strata_msg) + (STRATA_COUNT * IBF_BUCKET_SIZE * 
STRATA_IBF_BUCKETS);
 
   strata_msg = GNUNET_malloc (msize);
@@ -1185,21 +1319,20 @@
   queue_peer_message (cpi, (struct GNUNET_MessageHeader *) strata_msg);
 }
 
+
 /**
- * Send an IBF of the order specified in cpi
+ * Send an IBF of the order specified in cpi.
  *
  * @param cpi the peer
  */
 static void
 send_ibf (struct ConsensusPeerInformation *cpi)
 {
-  int sent_buckets;
-
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: sending IBF to P%d\n",
               cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
 
-  sent_buckets = 0;
-  while (sent_buckets < (1 << cpi->ibf_order))
+  cpi->ibf_bucket_counter = 0;
+  while (cpi->ibf_bucket_counter < (1 << cpi->ibf_order))
   {
     int num_buckets;
     void *buf;
@@ -1223,12 +1356,18 @@
 
     queue_peer_message (cpi, (struct GNUNET_MessageHeader *) digest);
 
-    sent_buckets += num_buckets;
+    cpi->ibf_bucket_counter += num_buckets;
   }
+  cpi->ibf_bucket_counter = 0;
   cpi->ibf_state = IBF_STATE_ANTICIPATE_DIFF;
 }
 
 
+/**
+ * Decode the current diff ibf, and send elements/requests/reports/
+ *
+ * @param cpi partner peer
+ */
 static void
 decode (struct ConsensusPeerInformation *cpi)
 {
@@ -1236,11 +1375,12 @@
   struct GNUNET_HashCode hashcode;
   int side;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "decoding\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: decoding ibf from P%d\n", 
cpi->session->local_peer_idx, (int) (cpi - cpi->session->info));
 
   for (;;)
   {
     int res;
+
     res = ibf_decode (cpi->ibf, &side, &key);
     if (GNUNET_SYSERR == res)
     {
@@ -1256,19 +1396,22 @@
     }
     if (GNUNET_NO == res)
     {
-      struct GNUNET_MessageHeader *msg;
+      struct ConsensusRoundMessage *msg;
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: transmitted all values, 
sending SYNC\n", cpi->session->local_peer_idx);
       msg = GNUNET_malloc (sizeof *msg);
-      msg->size = htons (sizeof *msg);
-      msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
-      queue_peer_message (cpi, msg);
+      msg->header.size = htons (sizeof *msg);
+      msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_SYNCED);
+      msg->round = cpi->apparent_round;
+      queue_peer_message (cpi, (struct GNUNET_MessageHeader *) msg);
       return;
     }
     if (-1 == side)
     {
+      struct ElementList *head;
       /* we have the element(s), send it to the other peer */
       ibf_hashcode_from_key (key, &hashcode);
-      GNUNET_CONTAINER_multihashmap_get_multiple (cpi->session->values, 
&hashcode, send_element_iter, cpi);
+      head = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, 
&hashcode);
+      send_elements (cpi, head);
     }
     else
     {
@@ -1278,18 +1421,19 @@
 
       msize = (sizeof *msg) + sizeof (struct IBF_Key);
       msg = GNUNET_malloc (msize);
-      if (CONSENSUS_ROUND_EXCHANGE == cpi->session->current_round)
+      switch (cpi->apparent_round)
       {
-        msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
+        case CONSENSUS_ROUND_STOCK:
+          /* FIXME: check if we really want to request the element */
+        case CONSENSUS_ROUND_EXCHANGE:
+          msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
+          break;
+        case CONSENSUS_ROUND_INVENTORY:
+          msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REPORT);
+          break;
+        default:
+          GNUNET_assert (0);
       }
-      else if (CONSENSUS_ROUND_INVENTORY == cpi->session->current_round)
-      {
-        msg->header.type = htons 
(GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS_REQUEST);
-      }
-      else
-      {
-        GNUNET_assert (0);
-      }
       msg->header.size = htons (msize);
       p = (struct IBF_Key *) &msg[1];
       *p = key;
@@ -1308,7 +1452,6 @@
  * @param cls closure
  * @param client identification of the client
  * @param message the actual message
- *
  * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
  */
 static int
@@ -1403,6 +1546,36 @@
 
 
 /**
+ * Iterator over hash map entries.
+ *
+ * @param cls closure
+ * @param key current key code
+ * @param value value in the hash map
+ * @return GNUNET_YES if we should continue to
+ *         iterate,
+ *         GNUNET_NO if not.
+ */
+static int
+destroy_element_list_iter (void *cls,
+                           const struct GNUNET_HashCode * key,
+                           void *value)
+{
+  struct ElementList *el;
+  el = value;
+  while (NULL != el)
+  {
+    struct ElementList *el_old;
+    el_old = el;
+    el = el->next;
+    GNUNET_free (el_old->element_hash);
+    GNUNET_free (el_old->element);
+    GNUNET_free (el_old);
+  }
+  return GNUNET_YES;
+}
+
+
+/**
  * Destroy a session, free all resources associated with it.
  * 
  * @param session the session to destroy
@@ -1410,9 +1583,80 @@
 static void
 destroy_session (struct ConsensusSession *session)
 {
-  /* FIXME: more stuff to free! */
+  int i;
+
   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
   GNUNET_SERVER_client_drop (session->client);
+  session->client = NULL;
+  if (NULL != session->shuffle)
+  {
+    GNUNET_free (session->shuffle);
+    session->shuffle = NULL;
+  }
+  if (NULL != session->se)
+  {
+    strata_estimator_destroy (session->se);
+    session->se = NULL;
+  }
+  if (NULL != session->info)
+  {
+    for (i = 0; i < session->num_peers; i++)
+    {
+      struct ConsensusPeerInformation *cpi;
+      cpi = &session->info[i];
+      if ((NULL != cpi) && (NULL != cpi->socket))
+      {
+        if (NULL != cpi->rh)
+        {
+          GNUNET_STREAM_read_cancel (cpi->rh);
+          cpi->rh = NULL;
+        } 
+        if (NULL != cpi->wh)
+        {
+          GNUNET_STREAM_write_cancel (cpi->wh);
+          cpi->wh = NULL;
+        } 
+        GNUNET_STREAM_close (cpi->socket);
+        cpi->socket = NULL;
+      }
+      if (NULL != cpi->se)
+      {
+        strata_estimator_destroy (cpi->se);
+        cpi->se = NULL;
+      }
+      if (NULL != cpi->ibf)
+      {
+        ibf_destroy (cpi->ibf);
+        cpi->ibf = NULL;
+      }
+      if (NULL != cpi->mst)
+      {
+        GNUNET_SERVER_mst_destroy (cpi->mst);
+        cpi->mst = NULL;
+      }
+    }
+    GNUNET_free (session->info);
+    session->info = NULL;
+  }
+  if (NULL != session->ibfs)
+  {
+    for (i = 0; i <= MAX_IBF_ORDER; i++)
+    {
+      if (NULL != session->ibfs[i])
+      {
+        ibf_destroy (session->ibfs[i]);
+        session->ibfs[i] = NULL;
+      }
+    }
+    GNUNET_free (session->ibfs);
+    session->ibfs = NULL;
+  }
+  if (NULL != session->values)
+  {
+    GNUNET_CONTAINER_multihashmap_iterate (session->values, 
destroy_element_list_iter, NULL);
+    GNUNET_CONTAINER_multihashmap_destroy (session->values);
+    session->values = NULL;
+  }
   GNUNET_free (session);
 }
 
@@ -1448,6 +1692,7 @@
  * Thus, if the local id of two consensus sessions coincide, but are not 
comprised of
  * exactly the same peers, the global id will be different.
  *
+ * @param session session to generate the global id for
  * @param session_id local id of the consensus session
  */
 static void
@@ -1511,9 +1756,9 @@
 
 
 /**
- * Schedule transmitting the next queued message (if any) to a client.
+ * Schedule transmitting the next queued message (if any) to the inhabiting 
client of a session.
  *
- * @param cli the client to send the next message to
+ * @param session the consensus session
  */
 static void
 client_send_next (struct ConsensusSession *session)
@@ -1545,9 +1790,9 @@
  * @return 1 if h1 > h2, -1 if h1 < h2 and 0 if h1 == h2.
  */
 static int
-hash_cmp (const void *a, const void *b)
+hash_cmp (const void *h1, const void *h2)
 {
-  return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) a, (struct 
GNUNET_HashCode *) b);
+  return GNUNET_CRYPTO_hash_cmp ((struct GNUNET_HashCode *) h1, (struct 
GNUNET_HashCode *) h2);
 }
 
 
@@ -1607,6 +1852,7 @@
   cpi->mst = GNUNET_SERVER_mst_create (mst_session_callback, cpi);
   cpi->wh =
       GNUNET_STREAM_write (socket, hello, sizeof *hello, 
GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi);
+  GNUNET_free (hello);
   cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL,
                                 &session_stream_data_processor, cpi);
 }
@@ -1821,12 +2067,52 @@
 }
 
 
+static void
+insert_element (struct ConsensusSession *session, struct 
GNUNET_CONSENSUS_Element *element)
+{
+  struct GNUNET_HashCode hash;
+  struct ElementList *head;
+
+  hash_for_ibf (element->data, element->size, &hash);
+
+  head = GNUNET_CONTAINER_multihashmap_get (session->values, &hash);
+
+  if (NULL == head)
+  {
+    int i;
+
+    head = GNUNET_malloc (sizeof *head);
+    head->element = element;
+    head->next = NULL;
+    head->element_hash = GNUNET_memdup (&hash, sizeof hash);
+    GNUNET_CONTAINER_multihashmap_put (session->values, &hash, head,
+                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
+    strata_estimator_insert (session->se, &hash);
+
+    for (i = 0; i <= MAX_IBF_ORDER; i++)
+      if (NULL != session->ibfs[i])
+        ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&hash));
+  }
+  else
+  {
+    struct ElementList *el;
+    el = GNUNET_malloc (sizeof *el);
+    head->element = element;
+    head->next = NULL;
+    head->element_hash = GNUNET_memdup (&hash, sizeof hash);
+    while (NULL != head->next)
+      head = head->next;
+    head->next = el;
+  }
+}
+
+
 /**
  * Called when a client performs an insert operation.
  *
  * @param cls (unused)
  * @param client client handle
- * @param message message sent by the client
+ * @param m message sent by the client
  */
 void
 client_insert (void *cls,
@@ -1836,7 +2122,6 @@
   struct ConsensusSession *session;
   struct GNUNET_CONSENSUS_ElementMessage *msg;
   struct GNUNET_CONSENSUS_Element *element;
-  struct GNUNET_HashCode hash;
   int element_size;
 
   session = sessions_head;
@@ -1865,13 +2150,8 @@
 
   GNUNET_assert (NULL != element->data);
 
-  hash_for_ibf (element->data, element_size, &hash);
+  insert_element (session, element);
 
-  GNUNET_CONTAINER_multihashmap_put (session->values, &hash, element,
-                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
-  strata_estimator_insert (session->se, &hash);
-
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 
   client_send_next (session);
@@ -1951,18 +2231,41 @@
     {
       GNUNET_assert (NULL == session->partner_outgoing);
       session->partner_outgoing = &session->info[session->shuffle[arc]];
+      session->partner_outgoing->exp_subround_finished = GNUNET_NO;
     }
     if (arc == session->local_peer_idx)
     {
       GNUNET_assert (NULL == session->partner_incoming);
       session->partner_incoming = &session->info[session->shuffle[i]];
+      session->partner_incoming->exp_subround_finished = GNUNET_NO;
     }
   }
 }
 
 
+static void
+replay_premature_message (struct ConsensusPeerInformation *cpi)
+{
+  if (NULL != cpi->premature_strata_message)
+  {
+    struct StrataMessage *sm;
+
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
+    sm = cpi->premature_strata_message;
+    cpi->premature_strata_message = NULL;
+
+    cpi->replaying_strata_message = GNUNET_YES;
+    handle_p2p_strata (cpi, sm);
+    cpi->replaying_strata_message = GNUNET_NO;
+
+    GNUNET_free (sm);
+  }
+}
+
+
 /**
  * Do the next subround in the exp-scheme.
+ * This function can be invoked as a timeout task, or called manually (tc will 
be NULL then).
  *
  * @param cls the session
  * @param tc task context, for when this task is invoked by the scheduler,
@@ -1977,35 +2280,29 @@
   /* don't kick off next subround if we're shutting down */
   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-
   session = cls;
-
-
+  /* don't send any messages from the last round */
+  /*
+  clear_peer_messages (session->partner_outgoing);
+  clear_peer_messages (session->partner_incoming);
   for (i = 0; i < session->num_peers; i++)
     clear_peer_messages (&session->info[i]);
-
+  */
+  /* cancel timeout */
   if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
-  {
     GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
-  }
   session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
-
-  if ((session->num_peers == 2) &&  (session->exp_round == 1))
+  /* check if we are done with the log phase, 2-peer consensus only does one 
log round */
+  if ( (session->exp_round == NUM_EXP_ROUNDS) ||
+       ((session->num_peers == 2) && (session->exp_round == 1)))
   {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", 
session->local_peer_idx);
     round_over (session, NULL);
     return;
   }
-
-  if (session->exp_round == NUM_EXP_ROUNDS)
-  {
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exp-round over (2-peer)\n");
-    round_over (session, NULL);
-    return;
-  }
-
   if (session->exp_round == 0)
   {
+    /* initialize everything for the log-rounds */
     session->exp_round = 1;
     session->exp_subround = 0;
     if (NULL == session->shuffle)
@@ -2015,6 +2312,7 @@
   }
   else if (session->exp_subround + 1 >= (int) ceil (log2 (session->num_peers)))
   {
+    /* subrounds done, start new log-round */
     session->exp_round++;
     session->exp_subround = 0;
     shuffle (session);
@@ -2026,6 +2324,7 @@
 
   find_partners (session);
 
+#ifdef GNUNET_EXTRA_LOGGING
   {
     int in;
     int out;
@@ -2040,15 +2339,8 @@
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: doing exp-round, r=%d, sub=%d, 
in: %d, out: %d\n", session->local_peer_idx,
                 session->exp_round, session->exp_subround, in, out);
   }
+#endif /* GNUNET_EXTRA_LOGGING */
 
-
-  if (NULL != session->partner_outgoing)
-  {
-    session->partner_outgoing->ibf_state = IBF_STATE_NONE;
-    session->partner_outgoing->ibf_bucket_counter = 0;
-    session->partner_outgoing->exp_subround_finished = GNUNET_NO;
-  }
-
   if (NULL != session->partner_incoming)
   {
     session->partner_incoming->ibf_state = IBF_STATE_NONE;
@@ -2056,24 +2348,15 @@
     session->partner_incoming->ibf_bucket_counter = 0;
 
     /* maybe there's an early strata estimator? */
-    if (NULL != session->partner_incoming->premature_strata_message)
-    {
-      struct StrataMessage *sm;
-
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "replaying premature SE\n");
-      sm = session->partner_incoming->premature_strata_message;
-      session->partner_incoming->premature_strata_message = NULL;
-
-      session->partner_incoming->replaying_strata_message = GNUNET_YES;
-      handle_p2p_strata (session->partner_incoming, sm);
-      session->partner_incoming->replaying_strata_message = GNUNET_NO;
-
-      GNUNET_free (sm);
-    }
+    replay_premature_message (session->partner_incoming);
   }
 
   if (NULL != session->partner_outgoing)
   {
+    session->partner_outgoing->ibf_state = IBF_STATE_NONE;
+    session->partner_outgoing->ibf_bucket_counter = 0;
+    session->partner_outgoing->exp_subround_finished = GNUNET_NO;
+
     if (NULL == session->partner_outgoing->socket)
     {
       session->partner_outgoing->socket =
@@ -2092,7 +2375,6 @@
   session->round_timeout_tid = GNUNET_SCHEDULER_add_delayed 
(GNUNET_TIME_relative_divide (session->conclude_timeout, 3 * NUM_EXP_ROUNDS),
                                                                    
subround_over, session);
   */
-
 }
 
 static void
@@ -2110,31 +2392,63 @@
   }
 }
 
+/**
+ * Start the inventory round, contact all peers we are supposed to contact.
+ *
+ * @param session the current session
+ */
 static void
 start_inventory (struct ConsensusSession *session)
 {
   int i;
   int last;
 
+  for (i = 0; i < session->num_peers; i++)
+  {
+    session->info[i].ibf_bucket_counter = 0;
+    session->info[i].ibf_state = IBF_STATE_NONE;
+    session->info[i].is_outgoing = GNUNET_NO;
+  }
+
   last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % 
session->num_peers;
   i = (session->local_peer_idx + 1) % session->num_peers;
   while (i != last)
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", 
session->local_peer_idx, i);
     contact_peer_a2a (&session->info[i]);
+    session->info[i].is_outgoing = GNUNET_YES;
     i = (i + 1) % session->num_peers;
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", 
session->local_peer_idx, i);
   }
   // tie-breaker for even number of peers
   if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
   {
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all 
(tie-breaker)\n", session->local_peer_idx, i);
+    session->info[last].is_outgoing = GNUNET_YES;
     contact_peer_a2a (&session->info[last]);
-    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d 
(tiebreaker)\n", session->local_peer_idx, last);
   }
+
+  for (i = 0; i < session->num_peers; i++)
+  {
+    if (GNUNET_NO == session->info[i].is_outgoing)
+      replay_premature_message (&session->info[i]);
+  }
 }
 
+static void
+send_client_conclude_done (struct ConsensusSession *session)
+{
+  struct GNUNET_MessageHeader *msg;
+  session->current_round = CONSENSUS_ROUND_FINISH;
+  msg = GNUNET_malloc (sizeof *msg);
+  msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
+  msg->size = htons (sizeof *msg);
+  queue_client_message (session, msg);
+  client_send_next (session);
+}
 
 /**
- * Select and kick off the next round, based on the current round.
+ * Start the next round.
+ * This function can be invoked as a timeout task, or called manually (tc will 
be NULL then).
  *
  * @param cls the session
  * @param tc task context, for when this task is invoked by the scheduler,
@@ -2144,17 +2458,18 @@
 round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   struct ConsensusSession *session;
-  int i;
 
   /* don't kick off next round if we're shutting down */
   if ((NULL != tc) && (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "round over\n");
   session = cls;
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: round over\n", 
session->local_peer_idx);
 
+  /*
   for (i = 0; i < session->num_peers; i++)
     clear_peer_messages (&session->info[i]);
+  */
 
   if ((NULL == tc) && (session->round_timeout_tid != GNUNET_SCHEDULER_NO_TASK))
   {
@@ -2162,8 +2477,6 @@
     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
   }
 
-  /* FIXME: cancel current round */
-
   switch (session->current_round)
   {
     case CONSENSUS_ROUND_BEGIN:
@@ -2172,31 +2485,24 @@
       subround_over (session, NULL);
       break;
     case CONSENSUS_ROUND_EXCHANGE:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", 
session->local_peer_idx);
-
-      if (0)
+      /* handle two peers specially */
+      if (session->num_peers <= 2)
       {
-        struct GNUNET_MessageHeader *msg;
-        msg = GNUNET_malloc (sizeof *msg);
-        msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE);
-        msg->size = htons (sizeof *msg);
-        queue_client_message (session, msg);
-        client_send_next (session);
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: done\n", 
session->local_peer_idx);
+        send_client_conclude_done (session);
+        return;
       }
-
-      if (0)
-      {
-        session->current_round = CONSENSUS_ROUND_INVENTORY;
-        start_inventory (session);
-      }
+      session->current_round = CONSENSUS_ROUND_INVENTORY;
+      start_inventory (session);
       break;
     case CONSENSUS_ROUND_INVENTORY:
       session->current_round = CONSENSUS_ROUND_STOCK;
-      /* FIXME: exchange stock */
+      session->exp_round = 0;
+      subround_over (session, NULL);
       break;
     case CONSENSUS_ROUND_STOCK:
       session->current_round = CONSENSUS_ROUND_FINISH;
-      /* FIXME: send elements to client */
+      send_client_conclude_done (session);
       break;
     default:
       GNUNET_assert (0);
@@ -2241,11 +2547,17 @@
     return;
   }
 
-  session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
+  if (session->num_peers <= 1)
+  {
+    send_client_conclude_done (session);
+  }
+  else
+  {
+    session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
+    /* the 'begin' round is over, start with the next, real round */
+    round_over (session, NULL);
+  }
 
-  /* the 'begin' round is over, start with the next, real round */
-  round_over (session, NULL);
-
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
   client_send_next (session);
 }
@@ -2267,7 +2579,6 @@
   struct GNUNET_CONSENSUS_AckMessage *msg;
   struct PendingElement *pending;
   struct GNUNET_CONSENSUS_Element *element;
-  struct GNUNET_HashCode key;
 
   session = sessions_head;
   while (NULL != session)
@@ -2291,25 +2602,17 @@
 
   if (msg->keep)
   {
-    int i;
     element = pending->element;
-    hash_for_ibf (element->data, element->size, &key);
-
-    GNUNET_CONTAINER_multihashmap_put (session->values, &key, element,
-                                       
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+    insert_element (session, element);
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got client ack\n");
-    strata_estimator_insert (session->se, &key);
-    
-    for (i = 0; i <= MAX_IBF_ORDER; i++)
-    {
-      if (NULL != session->ibfs[i])
-        ibf_insert (session->ibfs[i], ibf_key_from_hashcode (&key));
-    }
   }
 
+  GNUNET_free (pending);
+
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 }
 
+
 /**
  * Task that disconnects from core.
  *
@@ -2366,7 +2669,18 @@
     socket = incoming_sockets_head;
     if (NULL == socket->cpi)
     {
+      if (NULL != socket->rh)
+      {
+        GNUNET_STREAM_read_cancel (socket->rh);
+        socket->rh = NULL;
+      } 
       GNUNET_STREAM_close (socket->socket);
+      socket->socket = NULL;
+      if (NULL != socket->mst)
+      {
+        GNUNET_SERVER_mst_destroy (socket->mst);
+        socket->mst = NULL;
+      }
     }
     incoming_sockets_head = incoming_sockets_head->next;
     GNUNET_free (socket);
@@ -2375,26 +2689,9 @@
   while (NULL != sessions_head)
   {
     struct ConsensusSession *session;
-    int i;
-
-    session = sessions_head;
-
-    if (NULL != session->info)
-      for (i = 0; i < session->num_peers; i++)
-      {
-        struct ConsensusPeerInformation *cpi;
-        cpi = &session->info[i];
-        if ((NULL != cpi) && (NULL != cpi->socket))
-        {
-          GNUNET_STREAM_close (cpi->socket);
-        }
-      }
-
-    if (NULL != session->client)
-      GNUNET_SERVER_client_disconnect (session->client);
-
-    sessions_head = sessions_head->next;
-    GNUNET_free (session);
+    session = sessions_head->next;
+    destroy_session (sessions_head);
+    sessions_head = session;
   }
 
   if (NULL != core)

Modified: gnunet/src/consensus/ibf.c
===================================================================
--- gnunet/src/consensus/ibf.c  2013-03-20 19:15:55 UTC (rev 26525)
+++ gnunet/src/consensus/ibf.c  2013-03-21 01:06:40 UTC (rev 26526)
@@ -136,7 +136,7 @@
  * Insert an element into an IBF.
  *
  * @param ibf the IBF
- * @param id the element's hash code
+ * @param key the element's hash code
  */
 void
 ibf_insert (struct InvertibleBloomFilter *ibf, struct IBF_Key key)
@@ -171,10 +171,10 @@
  * Decode and remove an element from the IBF, if possible.
  *
  * @param ibf the invertible bloom filter to decode
- * @param side sign of the cell's count where the decoded element came from.
- *             A negative sign indicates that the element was recovered
- *             resides in an IBF that was previously subtracted from.
- * @param ret_id the hash code of the decoded element, if successful
+ * @param ret_side sign of the cell's count where the decoded element came 
from.
+ *                 A negative sign indicates that the element was recovered
+ *                 resides in an IBF that was previously subtracted from.
+ * @param ret_key receives the hash code of the decoded element, if successful
  * @return GNUNET_YES if decoding an element was successful,
  *         GNUNET_NO if the IBF is empty,
  *         GNUNET_SYSERR if the decoding has failed
@@ -284,7 +284,7 @@
  * @param size size of the buffer, will be updated
  * @param start which bucket to start at
  * @param count how many buckets to read
- * @param dst ibf to write buckets to
+ * @param ibf the ibf to read from
  * @return GNUNET_OK on success
  */
 int
@@ -325,8 +325,6 @@
  * Write an ibf.
  * 
  * @param ibf the ibf to write
- * @param start with which bucket to start
- * @param count how many buckets to write
  * @param buf buffer to write the data to, will be updated to point to the
  *            first byte after the written data
  * @param size pointer to the size of the buffer, will be updated, can be NULL
@@ -344,8 +342,6 @@
  * @param buf pointer to the buffer to write to, will point to first
  *            byte after the written data
  * @param size size of the buffer, will be updated
- * @param start which bucket to start at
- * @param count how many buckets to read
  * @param dst ibf to write buckets to
  * @return GNUNET_OK on success
  */

Modified: gnunet/src/consensus/ibf.h
===================================================================
--- gnunet/src/consensus/ibf.h  2013-03-20 19:15:55 UTC (rev 26525)
+++ gnunet/src/consensus/ibf.h  2013-03-21 01:06:40 UTC (rev 26526)
@@ -128,7 +128,7 @@
  * @param size size of the buffer, will be updated
  * @param start which bucket to start at
  * @param count how many buckets to read
- * @param dst ibf to write buckets to
+ * @param ibf the ibf to read from
  * @return GNUNET_OK on success
  */
 int
@@ -139,8 +139,6 @@
  * Write an ibf.
  * 
  * @param ibf the ibf to write
- * @param start with which bucket to start
- * @param count how many buckets to write
  * @param buf buffer to write the data to, will be updated to point to the
  *            first byte after the written data
  * @param size pointer to the size of the buffer, will be updated, can be NULL
@@ -149,14 +147,13 @@
 ibf_write (const struct InvertibleBloomFilter *ibf, void **buf, size_t *size);
 
 
+
 /**
  * Read an ibf.
  *
  * @param buf pointer to the buffer to write to, will point to first
  *            byte after the written data
  * @param size size of the buffer, will be updated
- * @param start which bucket to start at
- * @param count how many buckets to read
  * @param dst ibf to write buckets to
  * @return GNUNET_OK on success
  */
@@ -223,15 +220,16 @@
  * Decode and remove an element from the IBF, if possible.
  *
  * @param ibf the invertible bloom filter to decode
- * @param side sign of the cell's count where the decoded element came from.
- *             A negative sign indicates that the element was recovered 
resides in an IBF
- *             that was previously subtracted from.
- * @param ret_id the hash code of the decoded element, if successful
- * @return GNUNET_YES if decoding an element was successful, GNUNET_NO if the 
IBF is empty,
- *         GNUNET_SYSERR if the decoding has faile
+ * @param ret_side sign of the cell's count where the decoded element came 
from.
+ *                 A negative sign indicates that the element was recovered
+ *                 resides in an IBF that was previously subtracted from.
+ * @param ret_key receives the hash code of the decoded element, if successful
+ * @return GNUNET_YES if decoding an element was successful,
+ *         GNUNET_NO if the IBF is empty,
+ *         GNUNET_SYSERR if the decoding has failed
  */
 int
-ibf_decode (struct InvertibleBloomFilter *ibf, int *side, struct IBF_Key 
*ret_key);
+ibf_decode (struct InvertibleBloomFilter *ibf, int *ret_side, struct IBF_Key 
*ret_key);
 
 
 /**

Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf    2013-03-20 19:15:55 UTC (rev 
26525)
+++ gnunet/src/consensus/test_consensus.conf    2013-03-21 01:06:40 UTC (rev 
26526)
@@ -5,7 +5,7 @@
 HOME = $SERVICEHOME
 BINARY = gnunet-service-consensus
 #PREFIX = gdbserver :12345
-#PREFIX = valgrind
+PREFIX = valgrind --leak-check=full
 ACCEPT_FROM = 127.0.0.1;
 ACCEPT_FROM6 = ::1;
 UNIXPATH = /tmp/gnunet-service-consensus.sock

Modified: gnunet/src/include/gnunet_container_lib.h
===================================================================
--- gnunet/src/include/gnunet_container_lib.h   2013-03-20 19:15:55 UTC (rev 
26525)
+++ gnunet/src/include/gnunet_container_lib.h   2013-03-21 01:06:40 UTC (rev 
26526)
@@ -1355,7 +1355,7 @@
  */
 void *
 GNUNET_CONTAINER_slist_get (const struct GNUNET_CONTAINER_SList_Iterator *i,
-                            size_t * len);
+                            size_t *len);
 
 
 /**




reply via email to

[Prev in Thread] Current Thread [Next in Thread]