[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28537 - gnunet/src/consensus
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28537 - gnunet/src/consensus |
Date: |
Mon, 12 Aug 2013 16:35:51 +0200 |
Author: dold
Date: 2013-08-12 16:35:51 +0200 (Mon, 12 Aug 2013)
New Revision: 28537
Modified:
gnunet/src/consensus/consensus_api.c
gnunet/src/consensus/gnunet-consensus.c
gnunet/src/consensus/gnunet-service-consensus.c
gnunet/src/consensus/test_consensus.conf
Log:
- fixed consensus for >2 peers
Modified: gnunet/src/consensus/consensus_api.c
===================================================================
--- gnunet/src/consensus/consensus_api.c 2013-08-12 14:35:24 UTC (rev
28536)
+++ gnunet/src/consensus/consensus_api.c 2013-08-12 14:35:51 UTC (rev
28537)
@@ -266,6 +266,7 @@
i->cls = idc_cls;
GNUNET_MQ_notify_sent (ev, idc_adapter, i);
}
+ GNUNET_MQ_send (consensus->mq, ev);
}
Modified: gnunet/src/consensus/gnunet-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus.c 2013-08-12 14:35:24 UTC (rev
28536)
+++ gnunet/src/consensus/gnunet-consensus.c 2013-08-12 14:35:51 UTC (rev
28537)
@@ -95,6 +95,7 @@
static void
conclude_cb (void *cls)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus done\n");
GNUNET_SCHEDULER_add_now (destroy, cls);
}
@@ -274,6 +275,8 @@
{
GNUNET_assert (0);
}
+
+ GNUNET_TESTBED_operation_done (op);
}
@@ -302,7 +305,6 @@
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n");
-
peers = started_peers;
peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity));
Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c 2013-08-12 14:35:24 UTC
(rev 28536)
+++ gnunet/src/consensus/gnunet-service-consensus.c 2013-08-12 14:35:51 UTC
(rev 28537)
@@ -87,7 +87,31 @@
CONSENSUS_ROUND_FINISH
};
+
/**
+ * Complete information about the current round and all
+ * subrounds.
+ */
+struct RoundInfo
+{
+ /**
+ * The current main round.
+ */
+ enum ConsensusRound round;
+ /**
+ * The current exp round, valid if
+ * the main round is an exp round.
+ */
+ uint32_t exp_round;
+ /**
+ * The current exp subround, valid if
+ * the main round is an exp round.
+ */
+ uint32_t exp_subround;
+};
+
+
+/**
* A consensus session consists of one local client and the remote authorities.
*/
struct ConsensusSession
@@ -217,9 +241,14 @@
struct GNUNET_SET_OperationHandle *set_op;
/**
- * Has commit been called on the set_op?
+ * Set operation we are planning on executing with this peer.
*/
- int set_op_commited;
+ struct GNUNET_SET_OperationHandle *delayed_set_op;
+
+ /**
+ * Info about the round of the delayed set operation.
+ */
+ struct RoundInfo delayed_round_info;
};
@@ -277,11 +306,26 @@
int i;
GNUNET_CONTAINER_DLL_remove (sessions_head, sessions_tail, session);
+ if (NULL != session->element_set)
+ {
+ GNUNET_SET_destroy (session->element_set);
+ session->element_set = NULL;
+ }
+ if (NULL != session->set_listener)
+ {
+ GNUNET_SET_listen_cancel (session->set_listener);
+ session->set_listener = NULL;
+ }
if (NULL != session->client_mq)
{
GNUNET_MQ_destroy (session->client_mq);
session->client_mq = NULL;
}
+ if (NULL != session->client)
+ {
+ GNUNET_SERVER_client_disconnect (session->client);
+ session->client = NULL;
+ }
if (NULL != session->shuffle)
{
GNUNET_free (session->shuffle);
@@ -328,7 +372,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: got element for client\n",
session->local_peer_idx);
- ev = GNUNET_MQ_msg (m,
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
+ ev = GNUNET_MQ_msg_extra (m, element->size,
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
m->element_type = htons (element->type);
memcpy (&m[1], element->data, element->size);
GNUNET_MQ_send (session->client_mq, ev);
@@ -435,17 +479,16 @@
static void
find_partners (struct ConsensusSession *session)
{
- int arc;
+ unsigned int arc;
+ unsigned int num_ghosts;
+ unsigned int largest_arc;
int partner_idx;
- int largest_arc;
- int num_ghosts;
/* shuffled local index */
int my_idx = session->shuffle[session->local_peer_idx];
/* distance to neighboring peer in current subround */
arc = 1 << session->exp_subround;
- partner_idx = (my_idx + arc) % session->num_peers;
largest_arc = 1;
while (largest_arc < session->num_peers)
largest_arc <<= 1;
@@ -456,7 +499,9 @@
if (0 == (my_idx & arc))
{
/* we are outgoing */
+ partner_idx = (my_idx + arc) % session->num_peers;
session->partner_outgoing = &session->info[session->shuffle[partner_idx]];
+ session->partner_outgoing->exp_subround_finished = GNUNET_NO;
/* are we a 'ghost' of a peer that would exist if
* the number of peers was a power of two, and thus have to partner
* with an additional peer?
@@ -464,22 +509,26 @@
if (my_idx < num_ghosts)
{
int ghost_partner_idx;
- ghost_partner_idx = (my_idx - arc) % session->num_peers;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "my index %d, arc %d, peers %u\n",
my_idx, arc, session->num_peers);
+ ghost_partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ghost partner is before %d\n",
ghost_partner_idx);
/* platform dependent; modulo sometimes returns negative values */
if (ghost_partner_idx < 0)
- ghost_partner_idx += arc;
+ ghost_partner_idx += session->num_peers;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ghost partner is after %d\n",
ghost_partner_idx);
session->partner_incoming =
&session->info[session->shuffle[ghost_partner_idx]];
+ session->partner_incoming->exp_subround_finished = GNUNET_NO;
+ return;
}
- else
- {
- session->partner_incoming = NULL;
- }
+ session->partner_incoming = NULL;
+ return;
}
- else
- {
- session->partner_outgoing = NULL;
- session->partner_incoming = &session->info[session->shuffle[partner_idx]];
- }
+ partner_idx = (my_idx - (int) arc) % (int) session->num_peers;
+ if (partner_idx < 0)
+ partner_idx += session->num_peers;
+ session->partner_outgoing = NULL;
+ session->partner_incoming = &session->info[session->shuffle[partner_idx]];
+ session->partner_incoming->exp_subround_finished = GNUNET_NO;
}
@@ -497,23 +546,40 @@
enum GNUNET_SET_Status status)
{
struct ConsensusPeerInformation *cpi = cls;
+ unsigned int remote_idx = cpi - cpi->session->info;
+ unsigned int local_idx = cpi->session->local_peer_idx;
+ GNUNET_assert ((cpi == cpi->session->partner_outgoing) ||
+ (cpi == cpi->session->partner_incoming));
+
switch (status)
{
case GNUNET_SET_STATUS_OK:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u:
element\n",
+ local_idx, remote_idx);
break;
case GNUNET_SET_STATUS_FAILURE:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u:
failure\n",
+ local_idx, remote_idx);
cpi->set_op = NULL;
return;
case GNUNET_SET_STATUS_HALF_DONE:
case GNUNET_SET_STATUS_DONE:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: set result from P%u: done\n",
+ local_idx, remote_idx);
cpi->exp_subround_finished = GNUNET_YES;
cpi->set_op = NULL;
if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: all reconciliations of
subround done\n",
+ local_idx);
subround_over (cpi->session, NULL);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: waiting for further set
results\n",
+ local_idx);
+ }
return;
default:
GNUNET_break (0);
@@ -533,6 +599,39 @@
/**
+ * Compare the round the session is in with the round of the given context
message.
+ *
+ * @param session a consensus session
+ * @param round a round context message
+ * @return 0 if it's the same round, -1 if the session is in an earlier round,
+ * 1 if the session is in a later round
+ */
+static int
+rounds_compare (struct ConsensusSession *session,
+ struct RoundInfo* ri)
+{
+ if (session->current_round < ri->round)
+ return -1;
+ if (session->current_round > ri->round)
+ return 1;
+ if (session->current_round == CONSENSUS_ROUND_EXCHANGE)
+ {
+ if (session->exp_round < ri->exp_round)
+ return -1;
+ if (session->exp_round > ri->exp_round)
+ return 1;
+ if (session->exp_subround < ri->exp_subround)
+ return -1;
+ if (session->exp_subround < ri->exp_subround)
+ return 1;
+ return 0;
+ }
+ /* comparing rounds when we are not in a exp round */
+ GNUNET_assert (0);
+}
+
+
+/**
* Do the next subround in the exp-scheme.
* This function can be invoked as a timeout task, or called manually (tc will
be NULL then).
*
@@ -557,7 +656,7 @@
session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
}
- if (session->exp_round > NUM_EXP_ROUNDS)
+ if (session->exp_round >= NUM_EXP_ROUNDS)
{
round_over (session, NULL);
return;
@@ -578,7 +677,7 @@
/* subrounds done, start new log-round */
session->exp_round++;
session->exp_subround = 0;
- shuffle (session);
+ //shuffle (session);
}
else
{
@@ -588,6 +687,10 @@
/* determine the incoming and outgoing partner */
find_partners (session);
+ GNUNET_assert (session->partner_outgoing !=
&session->info[session->local_peer_idx]);
+ GNUNET_assert (session->partner_incoming !=
&session->info[session->local_peer_idx]);
+
+ /* initiate set operation with the outgoing partner */
if (NULL != session->partner_outgoing)
{
struct GNUNET_CONSENSUS_RoundContextMessage *msg;
@@ -606,13 +709,38 @@
GNUNET_SET_prepare (&session->partner_outgoing->peer_id,
&session->global_id,
(struct GNUNET_MessageHeader *) msg,
- 0, /* FIXME */
+ 0, /* FIXME: salt */
GNUNET_SET_RESULT_ADDED,
set_result_cb, session->partner_outgoing);
+ GNUNET_free (msg);
GNUNET_SET_commit (session->partner_outgoing->set_op,
session->element_set);
- session->partner_outgoing->set_op_commited = GNUNET_YES;
}
+ /* commit to the delayed set operation */
+ if ((NULL != session->partner_incoming) && (NULL !=
session->partner_incoming->delayed_set_op))
+ {
+ int cmp = rounds_compare (session,
&session->partner_incoming->delayed_round_info);
+
+ if (NULL != session->partner_incoming->set_op)
+ {
+ GNUNET_SET_operation_cancel (session->partner_incoming->set_op);
+ session->partner_incoming->set_op = NULL;
+ }
+ if (cmp == 0)
+ {
+ GNUNET_SET_commit (session->partner_incoming->delayed_set_op,
session->element_set);
+ session->partner_incoming->set_op =
session->partner_incoming->delayed_set_op;
+ session->partner_incoming->delayed_set_op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d resumed delayed round with
P%d\n",
+ session->local_peer_idx, (int) (session->partner_incoming -
session->info));
+ }
+ else
+ {
+ /* this should not happen -- a round has been skipped! */
+ GNUNET_break_op (0);
+ }
+ }
+
#ifdef GNUNET_EXTRA_LOGGING
{
int in;
@@ -777,14 +905,11 @@
struct ConsensusSession *session = cls;
struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct
GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
struct ConsensusPeerInformation *cpi;
+ struct GNUNET_SET_OperationHandle *set_op;
+ struct RoundInfo round_info;
int index;
+ int cmp;
- /* FIXME: should this even happen? */
- /*
- if (NULL == request)
- return;
- */
-
if (NULL == context_msg)
{
GNUNET_break_op (0);
@@ -793,48 +918,53 @@
index = get_peer_idx (other_peer, session);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s
(&other_peer->hashPubKey));
-
if (index < 0)
{
GNUNET_break_op (0);
return;
}
+ round_info.round = ntohl (msg->round);
+ round_info.exp_round = ntohl (msg->exp_round);
+ round_info.exp_subround = ntohl (msg->exp_subround);
+
cpi = &session->info[index];
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n",
session->local_peer_idx, index);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got set request from P%d\n",
session->local_peer_idx, index);
switch (session->current_round)
{
case CONSENSUS_ROUND_EXCHANGE:
- if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE)
+ cmp = rounds_compare (session, &round_info);
+ if (cmp > 0)
{
+ /* the other peer is too late */
GNUNET_break_op (0);
return;
}
- if (ntohl (msg->exp_round) < session->exp_round)
+ /* kill old request, if any. this is legal,
+ * as the other peer would not make a new request if it would want to
+ * complete the old one! */
+ if (NULL != cpi->set_op)
{
- GNUNET_break_op (0);
- return;
+ GNUNET_SET_operation_cancel (cpi->set_op);
+ cpi->set_op = NULL;
}
- if (ntohl (msg->exp_subround) < session->exp_subround)
- {
- GNUNET_break_op (0);
- return;
- }
- if (NULL != cpi->set_op)
- GNUNET_SET_operation_cancel (cpi->set_op);
- cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+ set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
set_result_cb, &session->info[index]);
- if (ntohl (msg->exp_subround) == session->exp_subround)
+ if (cmp == 0)
{
- cpi->set_op_commited = GNUNET_YES;
- GNUNET_SET_commit (cpi->set_op, session->element_set);
+ cpi->set_op = set_op;
+ GNUNET_SET_commit (set_op, session->element_set);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d commited to set request from
P%d\n", session->local_peer_idx, index);
}
else
{
- cpi->set_op_commited = GNUNET_NO;
+ /* if there's a exp subround running, mark it as finished, as the set
op has been canceled! */
+ cpi->delayed_set_op = set_op;
+ cpi->delayed_round_info = round_info;
+ cpi->exp_subround_finished = GNUNET_YES;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d delaying set request from
P%d\n", session->local_peer_idx, index);
}
break;
default:
@@ -934,7 +1064,6 @@
session = GNUNET_new (struct ConsensusSession);
session->client = client;
session->client_mq = GNUNET_MQ_queue_for_server_client (client);
- GNUNET_SERVER_client_keep (client);
GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -990,6 +1119,7 @@
memcpy (&element[1], &msg[1], element_size);
element->data = &element[1];
GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
+ GNUNET_free (element);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n",
session->local_peer_idx);
@@ -1030,6 +1160,7 @@
}
if (session->num_peers <= 1)
{
+ /* FIXME: what to do here? */
//send_client_conclude_done (session);
}
else
@@ -1080,7 +1211,7 @@
(CONSENSUS_ROUND_FINISH == session->current_round))
destroy_session (session);
else
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for
consensus to finish\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client disconnected, but waiting for
consensus to finish\n");
}
@@ -1131,7 +1262,7 @@
{
int ret;
ret = GNUNET_SERVICE_run (argc, argv, "consensus",
GNUNET_SERVICE_OPTION_NONE, &run, NULL);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "exit (%d)\n", GNUNET_OK != ret);
return (GNUNET_OK == ret) ? 0 : 1;
}
Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf 2013-08-12 14:35:24 UTC (rev
28536)
+++ gnunet/src/consensus/test_consensus.conf 2013-08-12 14:35:51 UTC (rev
28537)
@@ -5,7 +5,7 @@
HOME = $SERVICEHOME
BINARY = gnunet-service-consensus
#PREFIX = gdbserver :12345
-PREFIX = valgrind
+PREFIX = valgrind --leak-check=full
ACCEPT_FROM = 127.0.0.1;
ACCEPT_FROM6 = ::1;
UNIXPATH = /tmp/gnunet-service-consensus.sock
@@ -23,7 +23,7 @@
[set]
OPTIONS = -L INFO
-PREFIX = valgrind
+PREFIX = valgrind --leak-check=full
[testbed]
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28537 - gnunet/src/consensus,
gnunet <=