gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28537 - gnunet/src/consensus


From: gnunet
Subject: [GNUnet-SVN] r28537 - gnunet/src/consensus
Date: Mon, 12 Aug 2013 16:35:51 +0200

Author: dold
Date: 2013-08-12 16:35:51 +0200 (Mon, 12 Aug 2013)
New Revision: 28537

Modified:
   gnunet/src/consensus/consensus_api.c
   gnunet/src/consensus/gnunet-consensus.c
   gnunet/src/consensus/gnunet-service-consensus.c
   gnunet/src/consensus/test_consensus.conf
Log:
- fixed consensus for >2 peers


Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c        2013-08-12 14:35:24 UTC (rev 
28536)
+++ gnunet/src/consensus/consensus_api.c        2013-08-12 14:35:51 UTC (rev 
28537)
@@ -266,6 +266,7 @@
     i->cls = idc_cls;
     GNUNET_MQ_notify_sent (ev, idc_adapter, i);
   }
+  GNUNET_MQ_send (consensus->mq, ev);
 }
 
 

Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c     2013-08-12 14:35:24 UTC (rev 
28536)
+++ gnunet/src/consensus/gnunet-consensus.c     2013-08-12 14:35:51 UTC (rev 
28537)
@@ -95,6 +95,7 @@
 static void
 conclude_cb (void *cls)
 {
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus done\n");
   GNUNET_SCHEDULER_add_now (destroy, cls);
 }
 
@@ -274,6 +275,8 @@
   {
     GNUNET_assert (0);
   }
+
+  GNUNET_TESTBED_operation_done (op);
 }
 
 
@@ -302,7 +305,6 @@
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
 
-
   peers = started_peers;
 
   peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));

Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2013-08-12 14:35:24 UTC 
(rev 28536)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2013-08-12 14:35:51 UTC 
(rev 28537)
@@ -87,7 +87,31 @@
   CONSENSUS_ROUND_FINISH
 };
 
+
 /**
+ * Complete information about the current round and all
+ * subrounds.
+ */
+struct RoundInfo
+{
+  /**
+   * The current main round.
+   */
+  enum ConsensusRound round;
+  /**
+   * The current exp round, valid if
+   * the main round is an exp round.
+   */
+  uint32_t exp_round;
+  /**
+   * The current exp subround, valid if
+   * the main round is an exp round.
+   */
+  uint32_t exp_subround;
+};
+
+
+/**
  * A consensus session consists of one local client and the remote authorities.
  */
 struct ConsensusSession
@@ -217,9 +241,14 @@
   struct GNUNET_SET_OperationHandle *set_op;
 
   /**
-   * Has commit been called on the set_op?
+   * Set operation we are planning on executing with this peer.
    */
-  int set_op_commited;
+  struct GNUNET_SET_OperationHandle *delayed_set_op;
+
+  /**
+   * Info about the round of the delayed set operation.
+   */
+  struct RoundInfo delayed_round_info;
 };
 
 
@@ -277,11 +306,26 @@
   int i;
 
   GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
+  if (NULL != session->element_set)
+  {
+    GNUNET_SET_destroy (session->element_set);
+    session->element_set = NULL;
+  }
+  if (NULL != session->set_listener)
+  {
+    GNUNET_SET_listen_cancel (session->set_listener);
+    session->set_listener = NULL;
+  }
   if (NULL != session->client_mq)
   {
     GNUNET_MQ_destroy (session->client_mq);
     session->client_mq = NULL;
   }
+  if (NULL != session->client)
+  {
+    GNUNET_SERVER_client_disconnect (session->client);
+    session->client = NULL;
+  }
   if (NULL != session->shuffle)
   {
     GNUNET_free (session->shuffle);
@@ -328,7 +372,7 @@
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got element for client\n",
                 session->local_peer_idx);
 
-    ev = GNUNET_MQ_msg (m, 
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
+    ev = GNUNET_MQ_msg_extra (m, element->size, 
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
     m->element_type = htons (element->type);
     memcpy (&m[1], element->data, element->size);
     GNUNET_MQ_send (session->client_mq, ev);
@@ -435,17 +479,16 @@
 static void
 find_partners (struct ConsensusSession *session)
 {
-  int arc;
+  unsigned int arc;
+  unsigned int num_ghosts;
+  unsigned int largest_arc;
   int partner_idx;
-  int largest_arc;
-  int num_ghosts;
 
   /* shuffled local index */
   int my_idx = session->shuffle[session->local_peer_idx];
 
   /* distance to neighboring peer in current subround */
   arc = 1 << session->exp_subround;
-  partner_idx = (my_idx + arc) % session->num_peers;
   largest_arc = 1;
   while (largest_arc < session->num_peers)
     largest_arc <<= 1;
@@ -456,7 +499,9 @@
   if (0 == (my_idx & arc))
   {
     /* we are outgoing */
+    partner_idx = (my_idx + arc) % session->num_peers;
     session->partner_outgoing = &session->info[session->shuffle[partner_idx]];
+    session->partner_outgoing->exp_subround_finished = GNUNET_NO;
     /* are we a 'ghost' of a peer that would exist if
      * the number of peers was a power of two, and thus have to partner
      * with an additional peer?
@@ -464,22 +509,26 @@
     if (my_idx < num_ghosts)
     {
       int ghost_partner_idx;
-      ghost_partner_idx = (my_idx - arc) % session->num_peers;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "my index %d, arc %d, peers %u\n", 
my_idx, arc, session->num_peers);
+      ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ghost partner is before %d\n", 
ghost_partner_idx);
       /* platform dependent; modulo sometimes returns negative values */
       if (ghost_partner_idx < 0)
-        ghost_partner_idx += arc;
+        ghost_partner_idx += session->num_peers;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ghost partner is after %d\n", 
ghost_partner_idx);
       session->partner_incoming = 
&session->info[session->shuffle[ghost_partner_idx]];
+      session->partner_incoming->exp_subround_finished = GNUNET_NO;
+      return;
     }
-    else
-    {
-      session->partner_incoming = NULL;
-    }
+    session->partner_incoming = NULL;
+    return;
   }
-  else
-  {
-    session->partner_outgoing = NULL;
-    session->partner_incoming = &session->info[session->shuffle[partner_idx]];
-  }
+  partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+  if (partner_idx < 0)
+    partner_idx += session->num_peers;
+  session->partner_outgoing = NULL;
+  session->partner_incoming = &session->info[session->shuffle[partner_idx]];
+  session->partner_incoming->exp_subround_finished = GNUNET_NO;
 }
 
 
@@ -497,23 +546,40 @@
                enum GNUNET_SET_Status status)
 {
   struct ConsensusPeerInformation *cpi = cls;
+  unsigned int remote_idx = cpi - cpi->session->info;
+  unsigned int local_idx = cpi->session->local_peer_idx;
 
+  GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
+                 (cpi == cpi->session->partner_incoming));
+
   switch (status)
   {
     case GNUNET_SET_STATUS_OK:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: 
element\n",
+                  local_idx, remote_idx);
       break;
     case GNUNET_SET_STATUS_FAILURE:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: 
failure\n",
+                  local_idx, remote_idx);
       cpi->set_op = NULL;
       return;
     case GNUNET_SET_STATUS_HALF_DONE:
     case GNUNET_SET_STATUS_DONE:
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: done\n",
+                  local_idx, remote_idx);
       cpi->exp_subround_finished = GNUNET_YES;
       cpi->set_op = NULL;
       if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: all reconciliations of 
subround done\n",
+                    local_idx);
         subround_over (cpi->session, NULL);
+      }
+      else
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: waiting for further set 
results\n",
+                    local_idx);
+      }
       return;
     default:
       GNUNET_break (0);
@@ -533,6 +599,39 @@
 
 
 /**
+ * Compare the round the session is in with the round of the given context 
message.
+ *
+ * @param session a consensus session
+ * @param round a round context message
+ * @return 0 if it's the same round, -1 if the session is in an earlier round,
+ *         1 if the session is in a later round
+ */
+static int
+rounds_compare (struct ConsensusSession *session,
+                struct RoundInfo* ri)
+{
+  if (session->current_round < ri->round)
+    return -1;
+  if (session->current_round > ri->round)
+    return 1;
+  if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
+  {
+    if (session->exp_round < ri->exp_round)
+      return -1;
+    if (session->exp_round > ri->exp_round)
+      return 1;
+    if (session->exp_subround < ri->exp_subround)
+      return -1;
+    if (session->exp_subround < ri->exp_subround)
+      return 1;
+    return 0;
+  }
+  /* comparing rounds when we are not in a exp round */
+  GNUNET_assert (0);
+}
+
+
+/**
  * Do the next subround in the exp-scheme.
  * This function can be invoked as a timeout task, or called manually (tc will 
be NULL then).
  *
@@ -557,7 +656,7 @@
     session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
   }
   
-  if (session->exp_round > NUM_EXP_ROUNDS)
+  if (session->exp_round >= NUM_EXP_ROUNDS)
   {
     round_over (session, NULL);
     return;
@@ -578,7 +677,7 @@
     /* subrounds done, start new log-round */
     session->exp_round++;
     session->exp_subround = 0;
-    shuffle (session);
+    //shuffle (session);
   }
   else 
   {
@@ -588,6 +687,10 @@
   /* determine the incoming and outgoing partner */
   find_partners (session);
 
+  GNUNET_assert (session->partner_outgoing != 
&session->info[session->local_peer_idx]);
+  GNUNET_assert (session->partner_incoming != 
&session->info[session->local_peer_idx]);
+
+  /* initiate set operation with the outgoing partner */
   if (NULL != session->partner_outgoing)
   {
     struct GNUNET_CONSENSUS_RoundContextMessage *msg;
@@ -606,13 +709,38 @@
         GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
                             &session->global_id,
                             (struct GNUNET_MessageHeader *) msg,
-                            0, /* FIXME */
+                            0, /* FIXME: salt */
                             GNUNET_SET_RESULT_ADDED,
                             set_result_cb, session->partner_outgoing);
+    GNUNET_free (msg);
     GNUNET_SET_commit (session->partner_outgoing->set_op, 
session->element_set);
-    session->partner_outgoing->set_op_commited = GNUNET_YES;
   }
 
+  /* commit to the delayed set operation */
+  if ((NULL != session->partner_incoming) && (NULL != 
session->partner_incoming->delayed_set_op))
+  {
+    int cmp = rounds_compare (session, 
&session->partner_incoming->delayed_round_info);
+
+    if (NULL != session->partner_incoming->set_op)
+    {
+      GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
+      session->partner_incoming->set_op = NULL;
+    }
+    if (cmp == 0)
+    {
+      GNUNET_SET_commit (session->partner_incoming->delayed_set_op, 
session->element_set);
+      session->partner_incoming->set_op = 
session->partner_incoming->delayed_set_op;
+      session->partner_incoming->delayed_set_op = NULL;
+      GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d resumed delayed round with 
P%d\n",
+                  session->local_peer_idx, (int) (session->partner_incoming - 
session->info));
+    }
+    else
+    {
+      /* this should not happen -- a round has been skipped! */
+      GNUNET_break_op (0);
+    }
+  }
+
 #ifdef GNUNET_EXTRA_LOGGING
   {
     int in;
@@ -777,14 +905,11 @@
   struct ConsensusSession *session = cls;
   struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct 
GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
   struct ConsensusPeerInformation *cpi;
+  struct GNUNET_SET_OperationHandle *set_op;
+  struct RoundInfo round_info;
   int index;
+  int cmp;
 
-  /* FIXME: should this even happen? */
-  /*
-  if (NULL == request)
-    return;
-  */
-
   if (NULL == context_msg)
   {
     GNUNET_break_op (0);
@@ -793,48 +918,53 @@
 
   index = get_peer_idx (other_peer, session);
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s 
(&other_peer->hashPubKey));
-
   if (index < 0)
   {
     GNUNET_break_op (0);
     return;
   }
 
+  round_info.round = ntohl (msg->round);
+  round_info.exp_round = ntohl (msg->exp_round);
+  round_info.exp_subround = ntohl (msg->exp_subround);
+
   cpi = &session->info[index];
 
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", 
session->local_peer_idx, index);
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got set request from P%d\n", 
session->local_peer_idx, index);
 
   switch (session->current_round)
   {
     case CONSENSUS_ROUND_EXCHANGE:
-      if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE)
+      cmp = rounds_compare (session, &round_info);
+      if (cmp > 0)
       {
+        /* the other peer is too late */
         GNUNET_break_op (0);
         return;
       }
-      if (ntohl (msg->exp_round) < session->exp_round)
+      /* kill old request, if any. this is legal,
+       * as the other peer would not make a new request if it would want to
+       * complete the old one! */
+      if (NULL != cpi->set_op)
       {
-        GNUNET_break_op (0);
-        return;
+        GNUNET_SET_operation_cancel (cpi->set_op);
+        cpi->set_op = NULL;
       }
-      if (ntohl (msg->exp_subround) < session->exp_subround)
-      {
-        GNUNET_break_op (0);
-        return;
-      }
-      if (NULL != cpi->set_op)
-        GNUNET_SET_operation_cancel (cpi->set_op);
-      cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+      set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
                                        set_result_cb, &session->info[index]);
-      if (ntohl (msg->exp_subround) == session->exp_subround)
+      if (cmp == 0)
       {
-        cpi->set_op_commited = GNUNET_YES;
-        GNUNET_SET_commit (cpi->set_op, session->element_set);
+        cpi->set_op = set_op;
+        GNUNET_SET_commit (set_op, session->element_set);
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d commited to set request from 
P%d\n", session->local_peer_idx, index);
       }
       else
       {
-        cpi->set_op_commited = GNUNET_NO;
+        /* if there's a exp subround running, mark it as finished, as the set 
op has been canceled! */
+        cpi->delayed_set_op = set_op;
+        cpi->delayed_round_info = round_info;
+        cpi->exp_subround_finished = GNUNET_YES;
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d delaying set request from 
P%d\n", session->local_peer_idx, index);
       }
       break;
     default:
@@ -934,7 +1064,6 @@
   session = GNUNET_new (struct ConsensusSession);
   session->client = client;
   session->client_mq = GNUNET_MQ_queue_for_server_client (client);
-  GNUNET_SERVER_client_keep (client);
   GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
   initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -990,6 +1119,7 @@
   memcpy (&element[1], &msg[1], element_size);
   element->data = &element[1];
   GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
+  GNUNET_free (element);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
 
   GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", 
session->local_peer_idx);
@@ -1030,6 +1160,7 @@
   }
   if (session->num_peers <= 1)
   {
+    /* FIXME: what to do here? */
     //send_client_conclude_done (session);
   }
   else
@@ -1080,7 +1211,7 @@
       (CONSENSUS_ROUND_FINISH == session->current_round))
     destroy_session (session);
   else
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for 
consensus to finish\n");
+    GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client disconnected, but waiting for 
consensus to finish\n");
 }
 
 
@@ -1131,7 +1262,7 @@
 {
   int ret;
   ret = GNUNET_SERVICE_run (argc, argv, "consensus", 
GNUNET_SERVICE_OPTION_NONE, &run, NULL);
-  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
+  GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
   return (GNUNET_OK == ret) ? 0 : 1;
 }
 

Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf    2013-08-12 14:35:24 UTC (rev 
28536)
+++ gnunet/src/consensus/test_consensus.conf    2013-08-12 14:35:51 UTC (rev 
28537)
@@ -5,7 +5,7 @@
 HOME = $SERVICEHOME
 BINARY = gnunet-service-consensus
 #PREFIX = gdbserver :12345
-PREFIX = valgrind
+PREFIX = valgrind --leak-check=full 
 ACCEPT_FROM = 127.0.0.1;
 ACCEPT_FROM6 = ::1;
 UNIXPATH = /tmp/gnunet-service-consensus.sock
@@ -23,7 +23,7 @@
 
 [set]
 OPTIONS = -L INFO
-PREFIX = valgrind
+PREFIX = valgrind --leak-check=full
 
 
 [testbed]




reply via email to

[Prev in Thread] Current Thread [Next in Thread]