gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r36451 - gnunet/src/consensus


From: gnunet
Subject: [GNUnet-SVN] r36451 - gnunet/src/consensus
Date: Tue, 6 Oct 2015 17:22:02 +0200

Author: dold
Date: 2015-10-06 17:22:02 +0200 (Tue, 06 Oct 2015)
New Revision: 36451

Modified:
   gnunet/src/consensus/gnunet-service-consensus.c
Log:
towards handling byz. faults correctly

- blacklisting of peers in consensus
- restructuring


Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2015-10-06 14:14:15 UTC 
(rev 36450)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2015-10-06 15:22:02 UTC 
(rev 36451)
@@ -34,9 +34,34 @@
 #include "consensus.h"
 
 
+#define ELEMENT_TYPE_CONTESTED_MARKER (GNUNET_CONSENSUS_ELEMENT_TYPE_USER_MAX 
+ 1)
 
+
+enum ReferendumVote
+{
+  /**
+   * Vote that nothing should change.
+   * This option is never voted explicitly.
+   */
+  VOTE_STAY = 0,
+  /**
+   * Vote that an element should be added.
+   */
+  VOTE_ADD = 1,
+  /**
+   * Vote that an element should be removed.
+   */
+  VOTE_REMOVE = 2,
+};
+
+
 GNUNET_NETWORK_STRUCT_BEGIN
 
+
+struct ContestedPayload
+{
+};
+
 /**
  * Tuple of integers that together
  * identify a task uniquely.
@@ -72,24 +97,7 @@
 };
 
 
-enum ReferendumVote
-{
-  /**
-   * Vote that nothing should change.
-   * This option is never voted explicitly.
-   */
-  VOTE_STAY = 0,
-  /**
-   * Vote that an element should be added.
-   */
-  VOTE_ADD = 1,
-  /**
-   * Vote that an element should be removed.
-   */
-  VOTE_REMOVE = 2,
-};
 
-
 struct SetKey
 {
   int set_kind GNUNET_PACKED;
@@ -136,7 +144,6 @@
   PHASE_KIND_GRADECAST_ECHO_GRADE,
   PHASE_KIND_GRADECAST_CONFIRM,
   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
-  PHASE_KIND_GRADECAST_APPLY_RESULT,
   /**
    * Apply a repetition of the all-to-all
    * gradecast to the current set.
@@ -484,9 +491,6 @@
 finish_task (struct TaskEntry *task);
 
 static void
-task_start_reconcile (struct TaskEntry *task);
-
-static void
 run_ready_steps (struct ConsensusSession *session);
 
 static const char *
@@ -501,7 +505,6 @@
     case PHASE_KIND_GRADECAST_ECHO_GRADE: return "GRADECAST_ECHO_GRADE";
     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
-    case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT";
     case PHASE_KIND_APPLY_REP: return "APPLY_REP";
     default: return "(unknown)";
   }
@@ -772,9 +775,27 @@
 
 
 static void
+rfn_commit (struct ReferendumEntry *rfn,
+            uint16_t commit_peer)
+{
+  GNUNET_assert (commit_peer < rfn->num_peers);
+
+  rfn->peer_commited[commit_peer] = GNUNET_YES;
+}
+
+
+static void
+rfn_contest (struct ReferendumEntry *rfn,
+             uint16_t contested_peer)
+{
+  GNUNET_assert (contested_peer < rfn->num_peers);
+
+  rfn->peer_contested[contested_peer] = GNUNET_YES;
+}
+
+static void
 rfn_vote (struct ReferendumEntry *rfn,
           uint16_t voting_peer,
-          uint16_t num_peers,
           enum ReferendumVote vote,
           const struct GNUNET_SET_Element *element)
 {
@@ -781,15 +802,12 @@
   struct RfnElementInfo *ri;
   struct GNUNET_HashCode hash;
 
-  GNUNET_assert (voting_peer < num_peers);
+  GNUNET_assert (voting_peer < rfn->num_peers);
 
   /* Explicit voting only makes sense with VOTE_ADD or VOTE_REMOTE,
      since VOTE_KEEP is implicit in not voting. */
   GNUNET_assert ( (VOTE_ADD == vote) || (VOTE_REMOVE == vote) );
 
-  // XXX: should happen in another place!
-  rfn->peer_commited[voting_peer] = GNUNET_YES;
-
   GNUNET_SET_element_hash (element, &hash);
   ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
 
@@ -797,7 +815,7 @@
   {
     ri = GNUNET_new (struct RfnElementInfo);
     ri->element = GNUNET_SET_element_dup (element);
-    ri->votes = GNUNET_new_array (num_peers, int);
+    ri->votes = GNUNET_new_array (rfn->num_peers, int);
     GNUNET_assert (GNUNET_OK ==
                    GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
                                                       &hash, ri,
@@ -897,6 +915,16 @@
     return;
   }
 
+  if ( (GNUNET_SET_STATUS_ADD_LOCAL == status) || 
(GNUNET_SET_STATUS_ADD_REMOTE == status) )
+  {
+    if ( (GNUNET_YES == setop->transceive_contested) && 
(ELEMENT_TYPE_CONTESTED_MARKER == element->element_type) )
+    {
+      GNUNET_assert (NULL != output_rfn);
+      rfn_contest (output_rfn, task_other_peer (task));
+      return;
+    }
+  }
+
   switch (status)
   {
     case GNUNET_SET_STATUS_ADD_LOCAL:
@@ -933,7 +961,7 @@
       }
       if (NULL != output_rfn)
       {
-        rfn_vote (output_rfn, task_other_peer (task), session->num_peers, 
VOTE_ADD, element);
+        rfn_vote (output_rfn, task_other_peer (task), VOTE_ADD, element);
 #ifdef GNUNET_EXTRA_LOGGING
         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                     "P%u: adding element %s into rfn {%s} of task {%s}\n",
@@ -948,6 +976,8 @@
     case GNUNET_SET_STATUS_ADD_REMOTE:
       if (GNUNET_YES == setop->do_not_remove)
         break;
+      if (ELEMENT_TYPE_CONTESTED_MARKER == element->element_type)
+        break;
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Removing element in Task {%s}\n",
                   debug_str_task_key (&task->key));
@@ -981,7 +1011,7 @@
       }
       if (NULL != output_rfn)
       {
-        rfn_vote (output_rfn, task_other_peer (task), session->num_peers, 
VOTE_REMOVE, element);
+        rfn_vote (output_rfn, task_other_peer (task), VOTE_REMOVE, element);
 #ifdef GNUNET_EXTRA_LOGGING
         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                     "P%u: removing element %s from rfn {%s} of task {%s}\n",
@@ -995,10 +1025,13 @@
     case GNUNET_SET_STATUS_DONE:
       // XXX: check first if any changes to the underlying
       // set are still pending
-      // XXX: commit other peer in referendum
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Finishing setop in Task {%s}\n",
                   debug_str_task_key (&task->key));
+      if (NULL != output_rfn)
+      {
+        rfn_commit (output_rfn, task_other_peer (task));
+      }
       finish_task (task);
       break;
     case GNUNET_SET_STATUS_FAILURE:
@@ -1159,6 +1192,15 @@
     }
   }
 #else
+  if ( (GNUNET_YES == setop->transceive_contested) && (GNUNET_YES == 
set->is_contested) )
+  {
+    struct GNUNET_SET_Element element;
+    struct ContestedPayload payload;
+    element.data = &payload;
+    element.size = sizeof (struct ContestedPayload);
+    element.element_type = ELEMENT_TYPE_CONTESTED_MARKER;
+    GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+  }
   GNUNET_SET_commit (setop->op, set->h);
 #endif
 }
@@ -1212,25 +1254,6 @@
 
 
 static void
-output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy)
-{
-  struct TaskEntry *task = (struct TaskEntry *) cls;
-  struct SetOpCls *setop = &task->cls.setop;
-  struct ConsensusSession *session = task->step->session;
-  struct SetEntry *set = GNUNET_new (struct SetEntry);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-              "P%u: Received lazy copy, storing output set %s\n",
-              session->local_peer_idx, debug_str_set_key (&setop->output_set));
-
-  set->key = setop->output_set;
-  set->h = copy;
-  put_set (task->step->session, set);
-  task_start_reconcile (task);
-}
-
-
-static void
 task_cancel_reconcile (struct TaskEntry *task)
 {
   /* not implemented yet */
@@ -1249,15 +1272,18 @@
 
   iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
 
-  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, 
NULL, (const void **) &di))
+  while (GNUNET_YES ==
+         GNUNET_CONTAINER_multihashmap_iterator_next (iter,
+                                                      NULL,
+                                                      (const void **) &di))
   {
     if (di->weight > 0)
     {
-      rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element);
+      rfn_vote (rfn, voting_peer, VOTE_ADD, di->element);
     }
     if (di->weight < 0)
     {
-      rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element);
+      rfn_vote (rfn, voting_peer, VOTE_REMOVE, di->element);
     }
   }
 
@@ -1401,6 +1427,12 @@
   struct SetEntry *src_set;
   struct SetCopyCls *scc = GNUNET_new (struct SetCopyCls);
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Copying set {%s} to {%s} for task {%s}\n",
+              debug_str_set_key (src_set_key),
+              debug_str_set_key (dst_set_key),
+              debug_str_task_key (&task->key));
+
   scc->task = task;
   scc->dst_set_key = *dst_set_key;
   src_set = lookup_set (task->step->session, src_set_key);
@@ -1410,7 +1442,35 @@
                         scc);
 }
 
+
+struct SetMutationProgressCls
+{
+  int num_pending;
+  /**
+   * Task to finish once all changes are through.
+   */
+  struct TaskEntry *task;
+};
+
+
 static void
+set_mutation_done (void *cls)
+{
+  struct SetMutationProgressCls *pc = cls;
+
+  GNUNET_assert (pc->num_pending > 0);
+
+  pc->num_pending--;
+
+  if (0 == pc->num_pending)
+  {
+    struct TaskEntry *task = pc->task;
+    GNUNET_free (pc);
+    finish_task (task);
+  }
+}
+
+static void
 task_start_apply_round (struct TaskEntry *task)
 {
   struct ConsensusSession *session = task->step->session;
@@ -1421,6 +1481,7 @@
   struct ReferendumEntry *rfn_in;
   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
   struct RfnElementInfo *ri;
+  struct SetMutationProgressCls *progress_cls;
 
   sk_in = (struct SetKey) { SET_KIND_CURRENT, task->key.repetition };
   rk_in = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition };
@@ -1436,6 +1497,9 @@
   rfn_in = lookup_rfn (session, &rk_in);
   GNUNET_assert (NULL != rfn_in);
 
+  progress_cls = GNUNET_new (struct SetMutationProgressCls);
+  progress_cls->task = task;
+
   iter = GNUNET_CONTAINER_multihashmap_iterator_create (rfn_in->rfn_elements);
 
   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, 
NULL, (const void **) &ri))
@@ -1448,10 +1512,20 @@
     switch (majority_vote)
     {
       case VOTE_ADD:
-        // XXX: add to set
+        progress_cls->num_pending++;
+        GNUNET_assert (GNUNET_OK ==
+                       GNUNET_SET_add_element (set_out->h,
+                                               ri->element,
+                                               set_mutation_done,
+                                               progress_cls));
         break;
       case VOTE_REMOVE:
-        // XXX: remove from set
+        progress_cls->num_pending++;
+        GNUNET_assert (GNUNET_OK ==
+                       GNUNET_SET_remove_element (set_out->h,
+                                                  ri->element,
+                                                  set_mutation_done,
+                                                  progress_cls));
         break;
       case VOTE_STAY:
         // do nothing
@@ -1461,9 +1535,19 @@
         break;
     }
   }
+
+  if (progress_cls->num_pending == 0)
+  {
+    // call closure right now, no pending ops
+    GNUNET_free (progress_cls);
+    finish_task (task);
+  }
 }
 
 
+#define THRESH(s) (((s)->num_peers / 3))
+
+
 static void
 task_start_grade (struct TaskEntry *task)
 {
@@ -1475,6 +1559,7 @@
   struct DiffKey diff_key;
   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
   struct RfnElementInfo *ri;
+  unsigned int gradecast_confidence = 2;
 
   rfn_key = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, task->key.repetition 
};
   output_rfn = lookup_rfn (session, &rfn_key);
@@ -1481,6 +1566,7 @@
   if (NULL == output_rfn)
   {
     output_rfn = rfn_create (session->num_peers);
+    output_rfn->key = rfn_key;
     put_rfn (session, output_rfn);
   }
 
@@ -1492,26 +1578,50 @@
   input_rfn = lookup_rfn (session, &rfn_key);
   GNUNET_assert (NULL != input_rfn);
 
+  iter = GNUNET_CONTAINER_multihashmap_iterator_create 
(input_rfn->rfn_elements);
+
   apply_diff_to_rfn (input_diff, output_rfn, task->key.leader, 
session->num_peers);
 
-  iter = GNUNET_CONTAINER_multihashmap_iterator_create 
(input_rfn->rfn_elements);
-
   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, 
NULL, (const void **) &ri))
   {
     uint16_t majority_num;
     enum ReferendumVote majority_vote;
 
+    // XXX: we need contested votes and non-contested votes here
     rfn_majority (input_rfn, ri, &majority_num, &majority_vote);
 
-    
+    if (majority_num < (session->num_peers / 3) * 2)
+    {
+      gradecast_confidence = GNUNET_MIN(1, gradecast_confidence);
+    }
+    if (majority_num < (session->num_peers / 3) + 1)
+    {
+      gradecast_confidence = 0;
+    }
 
     switch (majority_vote)
     {
+      case VOTE_STAY:
+        break;
+      case VOTE_ADD:
+        rfn_vote (output_rfn, task->key.leader, VOTE_ADD, ri->element);
+        break;
+      case VOTE_REMOVE:
+        rfn_vote (output_rfn, task->key.leader, VOTE_REMOVE, ri->element);
+        break;
       default:
         GNUNET_assert (0);
         break;
     }
   }
+
+  if (gradecast_confidence >= 1)
+    rfn_commit (output_rfn, task->key.leader);
+
+  if (gradecast_confidence <= 1)
+    session->peers_blacklisted[task->key.leader] = GNUNET_YES;
+
+  finish_task (task);
 }
 
 
@@ -1537,12 +1647,7 @@
        we clone the input set. */
     if (NULL == lookup_set (session, &setop->output_set))
     {
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                  "Output set missing, copying from input set\n");
-      /* Since the cloning is asynchronous,
-         we'll retry the current function once the copy
-         has been provided by the SET service. */
-      GNUNET_SET_copy_lazy (input->h, output_cloned_cb, task);
+      create_set_copy_for_task (task, &setop->input_set, &setop->output_set);
       return;
     }
   }
@@ -1640,38 +1745,7 @@
 }
 
 
-
-
-struct SetMutationProgressCls
-{
-  int num_pending;
-  /**
-   * Task to finish once all changes are through.
-   */
-  struct TaskEntry *task;
-};
-
-
 static void
-set_mutation_done (void *cls)
-{
-  struct SetMutationProgressCls *pc = cls;
-
-  GNUNET_assert (pc->num_pending > 0);
-
-  pc->num_pending--;
-
-  if (0 == pc->num_pending)
-  {
-    struct TaskEntry *task = pc->task;
-    GNUNET_free (pc);
-    finish_task (task);
-  }
-}
-
-
-
-static void
 task_start_eval_echo (struct TaskEntry *task)
 {
   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
@@ -1708,7 +1782,10 @@
   iter = GNUNET_CONTAINER_multihashmap_iterator_create 
(input_rfn->rfn_elements);
   GNUNET_assert (NULL != iter);
 
-  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, 
NULL, (const void **) &ri))
+  while (GNUNET_YES ==
+         GNUNET_CONTAINER_multihashmap_iterator_next (iter,
+                                                      NULL,
+                                                      (const void **) &ri))
   {
     enum ReferendumVote majority_vote;
     uint16_t majority_num;
@@ -1717,7 +1794,12 @@
 
     if (majority_num < session->num_peers / 3)
     {
-      majority_vote = VOTE_REMOVE;
+      /* It is not the case that all nonfaulty peers
+         echoed the same value.  Since we're doing a set reconciliation, we
+         can't simply send "nothing" for the value.  Thus we mark our 'confirm'
+         reconciliation as contested.  Other peers might not know that the
+         leader is faulty, thus we still re-distribute in the confirmation
+         round. */
       output_set->is_contested = GNUNET_YES;
     }
 
@@ -2509,8 +2591,6 @@
     for (lead = 0; lead < n; lead++)
       construct_task_graph_gradecast (session, i, lead, step_rep_start, 
step_rep_end);
 
-    // TODO: add peers to blacklisted list, either here or
-    // already in the gradecast.
     task = ((struct TaskEntry) {
       .step = step_rep_end,
       .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},




reply via email to

[Prev in Thread] Current Thread [Next in Thread]