[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r36451 - gnunet/src/consensus
From: |
gnunet |
Subject: |
[GNUnet-SVN] r36451 - gnunet/src/consensus |
Date: |
Tue, 6 Oct 2015 17:22:02 +0200 |
Author: dold
Date: 2015-10-06 17:22:02 +0200 (Tue, 06 Oct 2015)
New Revision: 36451
Modified:
gnunet/src/consensus/gnunet-service-consensus.c
Log:
towards handling byz. faults correctly
- blacklisting of peers in consensus
- restructuring
Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c 2015-10-06 14:14:15 UTC
(rev 36450)
+++ gnunet/src/consensus/gnunet-service-consensus.c 2015-10-06 15:22:02 UTC
(rev 36451)
@@ -34,9 +34,34 @@
#include "consensus.h"
+#define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX
+ 1)
+
+enum ReferendumVote
+{
+ /**
+ * Vote that nothing should change.
+ * This option is never voted explicitly.
+ */
+ VOTE_STAY = 0,
+ /**
+ * Vote that an element should be added.
+ */
+ VOTE_ADD = 1,
+ /**
+ * Vote that an element should be removed.
+ */
+ VOTE_REMOVE = 2,
+};
+
+
GNUNET_NETWORK_STRUCT_BEGIN
+
+struct ContestedPayload
+{
+};
+
/**
* Tuple of integers that together
* identify a task uniquely.
@@ -72,24 +97,7 @@
};
-enum ReferendumVote
-{
- /**
- * Vote that nothing should change.
- * This option is never voted explicitly.
- */
- VOTE_STAY = 0,
- /**
- * Vote that an element should be added.
- */
- VOTE_ADD = 1,
- /**
- * Vote that an element should be removed.
- */
- VOTE_REMOVE = 2,
-};
-
struct SetKey
{
int set_kind GNUNET_PACKED;
@@ -136,7 +144,6 @@
PHASE_KIND_GRADECAST_ECHO_GRADE,
PHASE_KIND_GRADECAST_CONFIRM,
PHASE_KIND_GRADECAST_CONFIRM_GRADE,
- PHASE_KIND_GRADECAST_APPLY_RESULT,
/**
* Apply a repetition of the all-to-all
* gradecast to the current set.
@@ -484,9 +491,6 @@
finish_task (struct TaskEntry *task);
static void
-task_start_reconcile (struct TaskEntry *task);
-
-static void
run_ready_steps (struct ConsensusSession *session);
static const char *
@@ -501,7 +505,6 @@
case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
- case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT";
case PHASE_KIND_APPLY_REP: return "APPLY_REP";
default: return "(unknown)";
}
@@ -772,9 +775,27 @@
static void
+rfn_commit (struct ReferendumEntry *rfn,
+ uint16_t commit_peer)
+{
+ GNUNET_assert (commit_peer < rfn->num_peers);
+
+ rfn->peer_commited[commit_peer] = GNUNET_YES;
+}
+
+
+static void
+rfn_contest (struct ReferendumEntry *rfn,
+ uint16_t contested_peer)
+{
+ GNUNET_assert (contested_peer < rfn->num_peers);
+
+ rfn->peer_contested[contested_peer] = GNUNET_YES;
+}
+
+static void
rfn_vote (struct ReferendumEntry *rfn,
uint16_t voting_peer,
- uint16_t num_peers,
enum ReferendumVote vote,
const struct GNUNET_SET_Element *element)
{
@@ -781,15 +802,12 @@
struct RfnElementInfo *ri;
struct GNUNET_HashCode hash;
- GNUNET_assert (voting_peer < num_peers);
+ GNUNET_assert (voting_peer < rfn->num_peers);
/* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
since VOTE_KEEP is implicit in not voting. */
GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
- // XXX: should happen in another place!
- rfn->peer_commited[voting_peer] = GNUNET_YES;
-
GNUNET_SET_element_hash (element, &hash);
ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
@@ -797,7 +815,7 @@
{
ri = GNUNET_new (struct RfnElementInfo);
ri->element = GNUNET_SET_element_dup (element);
- ri->votes = GNUNET_new_array (num_peers, int);
+ ri->votes = GNUNET_new_array (rfn->num_peers, int);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
&hash, ri,
@@ -897,6 +915,16 @@
return;
}
+ if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) ||
(GNUNET_SET_STATUS_ADD_REMOTE == status) )
+ {
+ if ( (GNUNET_YES == setop->transceive_contested) &&
(ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
+ {
+ GNUNET_assert (NULL != output_rfn);
+ rfn_contest (output_rfn, task_other_peer (task));
+ return;
+ }
+ }
+
switch (status)
{
case GNUNET_SET_STATUS_ADD_LOCAL:
@@ -933,7 +961,7 @@
}
if (NULL != output_rfn)
{
- rfn_vote (output_rfn, task_other_peer (task), session->num_peers,
VOTE_ADD, element);
+ rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"P%u: adding element %s into rfn {%s} of task {%s}\n",
@@ -948,6 +976,8 @@
case GNUNET_SET_STATUS_ADD_REMOTE:
if (GNUNET_YES == setop->do_not_remove)
break;
+ if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
+ break;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Removing element in Task {%s}\n",
debug_str_task_key (&task->key));
@@ -981,7 +1011,7 @@
}
if (NULL != output_rfn)
{
- rfn_vote (output_rfn, task_other_peer (task), session->num_peers,
VOTE_REMOVE, element);
+ rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
#ifdef GNUNET_EXTRA_LOGGING
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"P%u: removing element %s from rfn {%s} of task {%s}\n",
@@ -995,10 +1025,13 @@
case GNUNET_SET_STATUS_DONE:
// XXX: check first if any changes to the underlying
// set are still pending
- // XXX: commit other peer in referendum
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Finishing setop in Task {%s}\n",
debug_str_task_key (&task->key));
+ if (NULL != output_rfn)
+ {
+ rfn_commit (output_rfn, task_other_peer (task));
+ }
finish_task (task);
break;
case GNUNET_SET_STATUS_FAILURE:
@@ -1159,6 +1192,15 @@
}
}
#else
+ if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES ==
set->is_contested) )
+ {
+ struct GNUNET_SET_Element element;
+ struct ContestedPayload payload;
+ element.data = &payload;
+ element.size = sizeof (struct ContestedPayload);
+ element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
+ GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+ }
GNUNET_SET_commit (setop->op, set->h);
#endif
}
@@ -1212,25 +1254,6 @@
static void
-output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy)
-{
- struct TaskEntry *task = (struct TaskEntry *) cls;
- struct SetOpCls *setop = &task->cls.setop;
- struct ConsensusSession *session = task->step->session;
- struct SetEntry *set = GNUNET_new (struct SetEntry);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "P%u: Received lazy copy, storing output set %s\n",
- session->local_peer_idx, debug_str_set_key (&setop->output_set));
-
- set->key = setop->output_set;
- set->h = copy;
- put_set (task->step->session, set);
- task_start_reconcile (task);
-}
-
-
-static void
task_cancel_reconcile (struct TaskEntry *task)
{
/* not implemented yet */
@@ -1249,15 +1272,18 @@
iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
- while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter,
NULL, (const void **) &di))
+ while (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_iterator_next (iter,
+ NULL,
+ (const void **) &di))
{
if (di->weight > 0)
{
- rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element);
+ rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
}
if (di->weight < 0)
{
- rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element);
+ rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
}
}
@@ -1401,6 +1427,12 @@
struct SetEntry *src_set;
struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Copying set {%s} to {%s} for task {%s}\n",
+ debug_str_set_key (src_set_key),
+ debug_str_set_key (dst_set_key),
+ debug_str_task_key (&task->key));
+
scc->task = task;
scc->dst_set_key = *dst_set_key;
src_set = lookup_set (task->step->session, src_set_key);
@@ -1410,7 +1442,35 @@
scc);
}
+
+struct SetMutationProgressCls
+{
+ int num_pending;
+ /**
+ * Task to finish once all changes are through.
+ */
+ struct TaskEntry *task;
+};
+
+
static void
+set_mutation_done (void *cls)
+{
+ struct SetMutationProgressCls *pc = cls;
+
+ GNUNET_assert (pc->num_pending > 0);
+
+ pc->num_pending--;
+
+ if (0 == pc->num_pending)
+ {
+ struct TaskEntry *task = pc->task;
+ GNUNET_free (pc);
+ finish_task (task);
+ }
+}
+
+static void
task_start_apply_round (struct TaskEntry *task)
{
struct ConsensusSession *session = task->step->session;
@@ -1421,6 +1481,7 @@
struct ReferendumEntry *rfn_in;
struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
struct RfnElementInfo *ri;
+ struct SetMutationProgressCls *progress_cls;
sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
@@ -1436,6 +1497,9 @@
rfn_in = lookup_rfn (session, &rk_in);
GNUNET_assert (NULL != rfn_in);
+ progress_cls = GNUNET_new (struct SetMutationProgressCls);
+ progress_cls->task = task;
+
iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter,
NULL, (const void **) &ri))
@@ -1448,10 +1512,20 @@
switch (majority_vote)
{
case VOTE_ADD:
- // XXX: add to set
+ progress_cls->num_pending++;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_SET_add_element (set_out->h,
+ ri->element,
+ set_mutation_done,
+ progress_cls));
break;
case VOTE_REMOVE:
- // XXX: remove from set
+ progress_cls->num_pending++;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_SET_remove_element (set_out->h,
+ ri->element,
+ set_mutation_done,
+ progress_cls));
break;
case VOTE_STAY:
// do nothing
@@ -1461,9 +1535,19 @@
break;
}
}
+
+ if (progress_cls->num_pending == 0)
+ {
+ // call closure right now, no pending ops
+ GNUNET_free (progress_cls);
+ finish_task (task);
+ }
}
+#define THRESH(s) (((s)->num_peers / 3))
+
+
static void
task_start_grade (struct TaskEntry *task)
{
@@ -1475,6 +1559,7 @@
struct DiffKey diff_key;
struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
struct RfnElementInfo *ri;
+ unsigned int gradecast_confidence = 2;
rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition
};
output_rfn = lookup_rfn (session, &rfn_key);
@@ -1481,6 +1566,7 @@
if (NULL == output_rfn)
{
output_rfn = rfn_create (session->num_peers);
+ output_rfn->key = rfn_key;
put_rfn (session, output_rfn);
}
@@ -1492,26 +1578,50 @@
input_rfn = lookup_rfn (session, &rfn_key);
GNUNET_assert (NULL != input_rfn);
+ iter = GNUNET_CONTAINER_multihashmap_iterator_create
(input_rfn->rfn_elements);
+
apply_diff_to_rfn (input_diff, output_rfn, task->key.leader,
session->num_peers);
- iter = GNUNET_CONTAINER_multihashmap_iterator_create
(input_rfn->rfn_elements);
-
while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter,
NULL, (const void **) &ri))
{
uint16_t majority_num;
enum ReferendumVote majority_vote;
+ // XXX: we need contested votes and non-contested votes here
rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
-
+ if (majority_num < (session->num_peers / 3) * 2)
+ {
+ gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
+ }
+ if (majority_num < (session->num_peers / 3) + 1)
+ {
+ gradecast_confidence = 0;
+ }
switch (majority_vote)
{
+ case VOTE_STAY:
+ break;
+ case VOTE_ADD:
+ rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
+ break;
+ case VOTE_REMOVE:
+ rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
+ break;
default:
GNUNET_assert (0);
break;
}
}
+
+ if (gradecast_confidence >= 1)
+ rfn_commit (output_rfn, task->key.leader);
+
+ if (gradecast_confidence <= 1)
+ session->peers_blacklisted[task->key.leader] = GNUNET_YES;
+
+ finish_task (task);
}
@@ -1537,12 +1647,7 @@
we clone the input set. */
if (NULL == lookup_set (session, &setop->output_set))
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Output set missing, copying from input set\n");
- /* Since the cloning is asynchronous,
- we'll retry the current function once the copy
- has been provided by the SET service. */
- GNUNET_SET_copy_lazy (input->h, output_cloned_cb, task);
+ create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
return;
}
}
@@ -1640,38 +1745,7 @@
}
-
-
-struct SetMutationProgressCls
-{
- int num_pending;
- /**
- * Task to finish once all changes are through.
- */
- struct TaskEntry *task;
-};
-
-
static void
-set_mutation_done (void *cls)
-{
- struct SetMutationProgressCls *pc = cls;
-
- GNUNET_assert (pc->num_pending > 0);
-
- pc->num_pending--;
-
- if (0 == pc->num_pending)
- {
- struct TaskEntry *task = pc->task;
- GNUNET_free (pc);
- finish_task (task);
- }
-}
-
-
-
-static void
task_start_eval_echo (struct TaskEntry *task)
{
struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
@@ -1708,7 +1782,10 @@
iter = GNUNET_CONTAINER_multihashmap_iterator_create
(input_rfn->rfn_elements);
GNUNET_assert (NULL != iter);
- while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter,
NULL, (const void **) &ri))
+ while (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_iterator_next (iter,
+ NULL,
+ (const void **) &ri))
{
enum ReferendumVote majority_vote;
uint16_t majority_num;
@@ -1717,7 +1794,12 @@
if (majority_num < session->num_peers / 3)
{
- majority_vote = VOTE_REMOVE;
+ /* It is not the case that all nonfaulty peers
+ echoed the same value. Since we're doing a set reconciliation, we
+ can't simply send "nothing" for the value. Thus we mark our 'confirm'
+ reconciliation as contested. Other peers might not know that the
+ leader is faulty, thus we still re-distribute in the confirmation
+ round. */
output_set->is_contested = GNUNET_YES;
}
@@ -2509,8 +2591,6 @@
for (lead = 0; lead < n; lead++)
construct_task_graph_gradecast (session, i, lead, step_rep_start,
step_rep_end);
- // TODO: add peers to blacklisted list, either here or
- // already in the gradecast.
task = ((struct TaskEntry) {
.step = step_rep_end,
.key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r36451 - gnunet/src/consensus,
gnunet <=