gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r36443 - in gnunet/src: consensus include set


From: gnunet
Subject: [GNUnet-SVN] r36443 - in gnunet/src: consensus include set
Date: Mon, 5 Oct 2015 23:26:56 +0200

Author: dold
Date: 2015-10-05 23:26:56 +0200 (Mon, 05 Oct 2015)
New Revision: 36443

Modified:
   gnunet/src/consensus/gnunet-consensus-profiler.c
   gnunet/src/consensus/gnunet-service-consensus.c
   gnunet/src/consensus/test_consensus.conf
   gnunet/src/include/gnunet_set_service.h
   gnunet/src/set/gnunet-service-set.c
   gnunet/src/set/set_api.c
Log:
work on consensus and set

- evil peers for consensus
- various fixes for consensus and set


Modified: gnunet/src/consensus/gnunet-consensus-profiler.c
===================================================================
--- gnunet/src/consensus/gnunet-consensus-profiler.c    2015-10-05 16:35:44 UTC 
(rev 36442)
+++ gnunet/src/consensus/gnunet-consensus-profiler.c    2015-10-05 21:26:56 UTC 
(rev 36443)
@@ -437,7 +437,7 @@
         gettext_noop ("number of peers in consensus"),
         GNUNET_YES, &GNUNET_GETOPT_set_uint, &num_peers },
       { 'k', "value-replication", NULL,
-        gettext_noop ("how many peers receive one value?"),
+        gettext_noop ("how many peers (random selection without replacement) 
receive one value?"),
         GNUNET_YES, &GNUNET_GETOPT_set_uint, &replication },
       { 'x', "num-values", NULL,
         gettext_noop ("number of values"),

Modified: gnunet/src/consensus/gnunet-service-consensus.c
===================================================================
--- gnunet/src/consensus/gnunet-service-consensus.c     2015-10-05 16:35:44 UTC 
(rev 36442)
+++ gnunet/src/consensus/gnunet-service-consensus.c     2015-10-05 21:26:56 UTC 
(rev 36443)
@@ -128,26 +128,35 @@
   PHASE_KIND_GRADECAST_CONFIRM,
   PHASE_KIND_GRADECAST_CONFIRM_GRADE,
   PHASE_KIND_GRADECAST_APPLY_RESULT,
+  /**
+   * Apply a repetition of the all-to-all
+   * gradecast to the current set.
+   */
+  PHASE_KIND_APPLY_REP,
   PHASE_KIND_FINISH,
 };
 
 
-enum ActionType
+enum TaskKind
 {
   /**
    * Do a set reconciliation with another peer (or via looback).
    */
-  ACTION_RECONCILE,
+  TASK_RECONCILE,
   /**
+   * Same as reconciliation, but only care about added elements.
+   */
+  TASK_UNION,
+  /**
    * Apply a referendum with a threshold
    * to a set and/or a diff.
    */
-  ACTION_EVAL_RFN,
+  TASK_EVAL_RFN,
   /**
    * Apply a diff to a set.
    */
-  ACTION_APPLY_DIFF,
-  ACTION_FINISH,
+  TASK_APPLY_DIFF,
+  FASK_FINISH,
 };
 
 enum SetKind
@@ -154,7 +163,7 @@
 {
   SET_KIND_NONE = 0,
   SET_KIND_CURRENT,
-  SET_KIND_LEADER,
+  SET_KIND_LEADER_PROPOSAL,
   SET_KIND_ECHO_RESULT,
 };
 
@@ -161,7 +170,8 @@
 enum DiffKind
 {
   DIFF_KIND_NONE = 0,
-  DIFF_KIND_LEADER,
+  DIFF_KIND_LEADER_PROPOSAL,
+  DIFF_KIND_LEADER_CONSENSUS,
   DIFF_KIND_GRADECAST_RESULT,
 };
 
@@ -170,9 +180,74 @@
   RFN_KIND_NONE = 0,
   RFN_KIND_ECHO,
   RFN_KIND_CONFIRM,
+  RFN_KIND_GRADECAST_RESULT
 };
 
 
+struct SetOpCls
+{
+  struct SetKey input_set;
+
+  struct SetKey output_set;
+  struct RfnKey output_rfn;
+  struct DiffKey output_diff;
+
+  int do_not_remove;
+
+  struct GNUNET_SET_OperationHandle *op;
+};
+
+struct EvalRfnCls
+{
+  struct SetKey input_set;
+  struct RfnKey input_rfn;
+  
+  uint16_t threshold;
+
+  struct SetKey output_set;
+  struct DiffKey output_diff;
+};
+
+
+struct ApplyDiffCls
+{
+  struct SetKey input_set;
+  struct DiffKey input_diff;
+  struct SetKey output_set;
+};
+
+
+struct LeaderApplyCls
+{
+  struct DiffKey input_diff_1;
+  struct DiffKey input_diff_2;
+
+  struct RfnKey output_rfn;
+};
+
+
+struct FinishCls
+{
+  struct SetKey input_set;
+};
+
+/**
+ * Closure for both @a start_task
+ * and @a cancel_task.
+ */
+union TaskFuncCls
+{
+  struct SetOpCls setop;
+  struct EvalRfnCls eval_rfn;
+  struct ApplyDiffCls apply_diff;
+  struct LeaderApplyCls leader_apply;
+  struct FinishCls finish;
+};
+
+struct TaskEntry;
+
+typedef void (*TaskFunc) (struct TaskEntry *task);
+
 /*
  * Node in the consensus task graph.
  */
@@ -182,30 +257,16 @@
 
   struct Step *step;
 
-  int is_running;
+  int is_started;
 
   int is_finished;
 
-  enum ActionType action;
+  enum TaskKind kind;
 
-  struct SetKey input_set;
-  struct DiffKey input_diff;
-  struct RfnKey input_rfn;
-  struct SetKey output_set;
-  struct DiffKey output_diff;
-  struct RfnKey output_rfn;
+  TaskFunc start;
+  TaskFunc cancel;
 
-  /**
-   * Threshold when evaluating referendums.
-   */
-  uint16_t threshold;
-
-  /**
-   * Operation that is running for this task.
-   */
-  struct GNUNET_SET_OperationHandle *op;
-
-  struct GNUNET_SET_Handle *commited_set;
+  union TaskFuncCls cls;
 };
 
 
@@ -280,17 +341,10 @@
   char *debug_name;
 };
 
-struct RfnPeerInfo
-{
-  /* Peers can propose changes,
-   * but they are only accepted once
-   * the whole set operation is done. */
-  int is_commited;
-};
 
 struct RfnElementInfo
 {
-  struct GNUNET_SET_Element *element;
+  const struct GNUNET_SET_Element *element;
 
   /*
    * Vote (or VOTE_NONE) from every peer
@@ -323,12 +377,20 @@
    * not counted for majority votes or thresholds.
    */
   int *peer_commited;
+
+
+  /**
+   * Contestation state of the peer.  If a peer is contested, the values it
+   * contributed are still counted for applying changes, but the grading is
+   * affected.
+   */
+  int *peer_contested;
 };
 
 
 struct DiffElementInfo
 {
-  struct GNUNET_SET_Element *element;
+  const struct GNUNET_SET_Element *element;
 
   /**
    * Positive weight for 'add', negative
@@ -468,13 +530,13 @@
 finish_task (struct TaskEntry *task);
 
 static void
-run_task_remote_union (struct ConsensusSession *session, struct TaskEntry 
*task);
+task_start_reconcile (struct TaskEntry *task);
 
 static void
-run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task);
+task_start_eval_rfn (struct TaskEntry *task);
 
 static void
-run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task);
+task_start_apply_diff (struct TaskEntry *task);
 
 static void
 run_ready_steps (struct ConsensusSession *session);
@@ -492,6 +554,7 @@
     case PHASE_KIND_GRADECAST_CONFIRM: return "GRADECAST_CONFIRM";
     case PHASE_KIND_GRADECAST_CONFIRM_GRADE: return "GRADECAST_CONFIRM_GRADE";
     case PHASE_KIND_GRADECAST_APPLY_RESULT: return "GRADECAST_APPLY_RESULT";
+    case PHASE_KIND_APPLY_REP: return "APPLY_REP";
     default: return "(unknown)";
   }
 }
@@ -503,7 +566,7 @@
   switch (kind)
   {
     case SET_KIND_CURRENT: return "CURRENT";
-    case SET_KIND_LEADER: return "LEADER";
+    case SET_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
     case SET_KIND_NONE: return "NONE";
     default: return "(unknown)";
   }
@@ -527,13 +590,27 @@
   switch (kind)
   {
     case DIFF_KIND_NONE: return "NONE";
-    case DIFF_KIND_LEADER: return "LEADER";
+    case DIFF_KIND_LEADER_CONSENSUS: return "LEADER_CONSENSUS";
     case DIFF_KIND_GRADECAST_RESULT: return "GRADECAST_RESULT";
+    case DIFF_KIND_LEADER_PROPOSAL: return "LEADER_PROPOSAL";
     default: return "(unknown)";
   }
 }
 
+#ifdef GNUNET_EXTRA_LOGGING
+
+
 static const char *
+debug_str_element (const struct GNUNET_SET_Element *el)
+{
+  struct GNUNET_HashCode hash;
+
+  GNUNET_SET_element_hash (el, &hash);
+
+  return GNUNET_h2s (&hash);
+}
+
+static const char *
 debug_str_task_key (struct TaskKey *tk)
 {
   static char buf[256];
@@ -583,7 +660,9 @@
   return buf;
 }
 
+#endif /* GNUNET_EXTRA_LOGGING */
 
+
 /**
  * Destroy a session, free all resources associated with it.
  *
@@ -602,6 +681,8 @@
   {
     GNUNET_MQ_destroy (session->client_mq);
     session->client_mq = NULL;
+    /* The MQ cleanup will also disconnect the underlying client. */
+    session->client = NULL;
   }
   if (NULL != session->client)
   {
@@ -634,8 +715,9 @@
     struct GNUNET_CONSENSUS_ElementMessage *m;
 
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                "P%d: got element for client\n",
-                session->local_peer_idx);
+                "P%d: sending element %s to client\n",
+                session->local_peer_idx,
+                debug_str_element (element));
 
     ev = GNUNET_MQ_msg_extra (m, element->size,
                               
GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT);
@@ -736,7 +818,36 @@
              int weight,
              const struct GNUNET_SET_Element *element)
 {
-  GNUNET_assert (0);
+  struct DiffElementInfo *di;
+  struct GNUNET_HashCode hash;
+
+  GNUNET_assert ( (1 == weight) || (-1 == weight));
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "diff_insert with element size %u\n",
+              element->size);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "hashing element\n");
+
+  GNUNET_SET_element_hash (element, &hash);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "hashed element\n");
+
+  di = GNUNET_CONTAINER_multihashmap_get (diff->changes, &hash);
+
+  if (NULL == di)
+  {
+    di = GNUNET_new (struct DiffElementInfo);
+    di->element = GNUNET_SET_element_dup (element);
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multihashmap_put (diff->changes,
+                                                      &hash, di,
+                                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+  }
+
+  di->weight = weight;
 }
 
 
@@ -747,10 +858,39 @@
           int vote,
           const struct GNUNET_SET_Element *element)
 {
+  struct RfnElementInfo *ri;
+  struct GNUNET_HashCode hash;
+
   GNUNET_assert (voting_peer < num_peers);
-  GNUNET_assert (0);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "voting for element of size %u\n",
+              element->size);
+
+  rfn->peer_commited[voting_peer] = GNUNET_YES;
+
+  GNUNET_SET_element_hash (element, &hash);
+  ri = GNUNET_CONTAINER_multihashmap_get (rfn->rfn_elements, &hash);
+
+
+  if (NULL == ri)
+  {
+    ri = GNUNET_new (struct RfnElementInfo);
+    ri->element = GNUNET_SET_element_dup (element);
+    ri->votes = GNUNET_new_array (num_peers, int);
+    GNUNET_assert (GNUNET_OK ==
+                   GNUNET_CONTAINER_multihashmap_put (rfn->rfn_elements,
+                                                      &hash, ri,
+                                                      
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST));
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "rfn vote element %p\n",
+              ri->element);
+  ri->votes[voting_peer] = vote;
 }
 
+
 uint16_t
 task_other_peer (struct TaskEntry *task)
 {
@@ -760,6 +900,7 @@
   return task->key.peer1;
 }
 
+
 /**
  * Callback for set operation results. Called for each element
  * in the result set.
@@ -779,6 +920,10 @@
   struct DiffEntry *output_diff = NULL;
   struct ReferendumEntry *output_rfn = NULL;
   unsigned int other_idx;
+  struct SetOpCls *setop;
+
+  setop = &task->cls.setop;
+
   
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "P%u: got set result for {%s}, status %u\n",
@@ -786,7 +931,7 @@
               debug_str_task_key (&task->key),
               status);
 
-  if (GNUNET_NO == task->is_running)
+  if (GNUNET_NO == task->is_started)
   {
     GNUNET_break_op (0);
     return;
@@ -808,14 +953,23 @@
     GNUNET_assert (0);
   }
 
-  if (SET_KIND_NONE != task->output_set.set_kind)
-    output_set = lookup_set (session, &task->output_set);
+  if (SET_KIND_NONE != setop->output_set.set_kind)
+  {
+    output_set = lookup_set (session, &setop->output_set);
+    GNUNET_assert (NULL != output_set);
+  }
 
-  if (DIFF_KIND_NONE != task->output_diff.diff_kind)
-    output_diff = lookup_diff (session, &task->output_diff);
+  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
+  {
+    output_diff = lookup_diff (session, &setop->output_diff);
+    GNUNET_assert (NULL != output_diff);
+  }
 
-  if (RFN_KIND_NONE != task->output_rfn.rfn_kind)
-    output_rfn = lookup_rfn (session, &task->output_rfn);
+  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
+  {
+    output_rfn = lookup_rfn (session, &setop->output_rfn);
+    GNUNET_assert (NULL != output_rfn);
+  }
 
   if (GNUNET_YES == session->peers_ignored[other_idx])
   {
@@ -827,8 +981,10 @@
 
   switch (status)
   {
-    // case GNUNET_SET_STATUS_MISSING_LOCAL:
-    case GNUNET_SET_STATUS_OK:
+    case GNUNET_SET_STATUS_ADD_LOCAL:
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Adding element in Task {%s}\n",
+                  debug_str_task_key (&task->key));
       if (NULL != output_set)
       {
         // FIXME: record pending adds, use callback
@@ -836,25 +992,95 @@
                                 element,
                                 NULL,
                                 NULL);
-
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: adding element %s into set {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_set_key (&setop->output_set),
+                    debug_str_task_key (&task->key));
+#endif
       }
       if (NULL != output_diff)
       {
         diff_insert (output_diff, 1, element);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: adding element %s into diff {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_diff_key (&setop->output_diff),
+                    debug_str_task_key (&task->key));
+#endif
       }
       if (NULL != output_rfn)
       {
         rfn_vote (output_rfn, task_other_peer (task), session->num_peers, 
VOTE_ADD, element);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: adding element %s into rfn {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_rfn_key (&setop->output_rfn),
+                    debug_str_task_key (&task->key));
+#endif
       }
       // XXX: add result to structures in task
       break;
-    //case GNUNET_SET_STATUS_MISSING_REMOTE:
-    //  // XXX: add result to structures in task
-    //  break;
+    case GNUNET_SET_STATUS_ADD_REMOTE:
+      if (GNUNET_YES == setop->do_not_remove)
+        break;
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Removing element in Task {%s}\n",
+                  debug_str_task_key (&task->key));
+      if (NULL != output_set)
+      {
+        // FIXME: record pending adds, use callback
+        GNUNET_SET_remove_element (output_set->h,
+                                   element,
+                                   NULL,
+                                   NULL);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: removing element %s from set {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_set_key (&setop->output_set),
+                    debug_str_task_key (&task->key));
+#endif
+      }
+      if (NULL != output_diff)
+      {
+        diff_insert (output_diff, -1, element);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: removing element %s from diff {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_diff_key (&setop->output_diff),
+                    debug_str_task_key (&task->key));
+#endif
+      }
+      if (NULL != output_rfn)
+      {
+        rfn_vote (output_rfn, task_other_peer (task), session->num_peers, 
VOTE_REMOVE, element);
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: removing element %s from rfn {%s} of task {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (element),
+                    debug_str_rfn_key (&setop->output_rfn),
+                    debug_str_task_key (&task->key));
+#endif
+      }
+      break;
     case GNUNET_SET_STATUS_DONE:
       // XXX: check first if any changes to the underlying
       // set are still pending
       // XXX: commit other peer in referendum
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                  "Finishing setop in Task {%s}\n",
+                  debug_str_task_key (&task->key));
       finish_task (task);
       break;
     case GNUNET_SET_STATUS_FAILURE:
@@ -867,8 +1093,86 @@
   }
 }
 
+#ifdef EVIL
 
+enum Evilness
+{
+  EVILNESS_NONE,
+  EVILNESS_CRAM,
+  EVILNESS_SLACK,
+};
 
+static void
+get_evilness (struct ConsensusSession *session, enum Evilness *ret_type, 
unsigned int *ret_num)
+{
+  char *evil_spec;
+  char *field;
+  char *evil_type_str = NULL;
+
+  if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "consensus", 
"EVIL_SPEC", &evil_spec))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "P%u: no evilness\n",
+                session->local_peer_idx);
+    *ret_type = EVILNESS_NONE;
+    return;
+  }
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "P%u: got evilness spec\n",
+              session->local_peer_idx);
+
+  for (field = strtok (evil_spec, "/");
+       NULL != field;
+       field = strtok (NULL, "/"))
+  {
+    unsigned int peer_num;
+    unsigned int evil_num;
+    int ret;
+
+    evil_type_str = NULL;
+
+    ret = sscanf (field, "%u;%m[a-z];%u", &peer_num, &evil_type_str, 
&evil_num);
+
+    if (ret != 3)
+    {
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in EVIL_SPEC, 
behaving like a good peer.\n",
+                  field); 
+      goto not_evil;
+    }
+
+    GNUNET_assert (NULL != evil_type_str);
+
+    if (peer_num == session->local_peer_idx)
+    {
+      if (0 == strcmp ("slack", evil_type_str))
+        *ret_type = EVILNESS_SLACK;
+      else if (0 == strcmp ("cram", evil_type_str))
+      {
+        *ret_type = EVILNESS_CRAM;
+        *ret_num = evil_num;
+      }
+      else
+      {
+        GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Malformed field '%s' in 
EVIL_SPEC (unknown type), behaving like a good peer.\n"); 
+        goto not_evil;
+      }
+      goto cleanup;
+    }
+    /* No GNUNET_free since memory was allocated by libc */
+    free (evil_type_str);
+    evil_type_str = NULL;
+  }
+not_evil:
+  *ret_type = EVILNESS_NONE;
+cleanup:
+  GNUNET_free (evil_spec);
+  if (NULL != evil_type_str)
+    free (evil_type_str);
+}
+
+#endif
+
+
 /**
  * Commit the appropriate set for a
  * task.
@@ -878,11 +1182,67 @@
             struct TaskEntry *task)
 {
   struct SetEntry *set;
+  struct SetOpCls *setop = &task->cls.setop;
 
-  GNUNET_assert (NULL != task->op);
-  set = lookup_set (session, &task->input_set);
+  GNUNET_assert (NULL != setop->op);
+  set = lookup_set (session, &setop->input_set);
   GNUNET_assert (NULL != set);
-  GNUNET_SET_commit (task->op, set->h);
+
+#ifdef EVIL
+  {
+    unsigned int i;
+    unsigned int evil_num;
+    enum Evilness evilness;
+
+    get_evilness (session, &evilness, &evil_num);
+    switch (evilness)
+    {
+      case EVILNESS_CRAM:
+        /* We're not cramming elements in the
+           all-to-all round, since that would just
+           add more elements to the result set, but
+           wouldn't test robustness. */
+        if (PHASE_KIND_ALL_TO_ALL == task->key.kind)
+        {
+          GNUNET_SET_commit (setop->op, set->h);
+          break;
+        }
+        for (i = 0; i < evil_num; i++)
+        {
+          struct GNUNET_HashCode hash;
+          struct GNUNET_SET_Element element;
+          element.data = &hash;
+          element.size = sizeof (struct GNUNET_HashCode);
+          element.element_type = 0;
+
+          GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_STRONG, 
&hash);
+          GNUNET_SET_add_element (set->h, &element, NULL, NULL);
+#ifdef GNUNET_EXTRA_LOGGING
+          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                      "P%u: evil peer: cramming element %s into set {%s} of 
task {%s}\n",
+                      session->local_peer_idx,
+                      debug_str_element (&element),
+                      debug_str_set_key (&setop->input_set),
+                      debug_str_task_key (&task->key));
+#endif
+        }
+        GNUNET_SET_commit (setop->op, set->h);
+        break;
+      case EVILNESS_SLACK:
+        GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+                    "P%u: evil peer: slacking\n",
+                    session->local_peer_idx,
+                    evil_num);
+        /* Do nothing. */
+        break;
+      case EVILNESS_NONE:
+        GNUNET_SET_commit (setop->op, set->h);
+        break;
+    }
+  }
+#else
+  GNUNET_SET_commit (setop->op, set->h);
+#endif
 }
 
 
@@ -892,6 +1252,8 @@
 {
   struct GNUNET_HashCode hash;
 
+  GNUNET_assert (NULL != diff);
+
   GNUNET_CRYPTO_hash (&diff->key, sizeof (struct DiffKey), &hash);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap_put (session->diffmap, &hash, 
diff,
@@ -906,6 +1268,10 @@
 
   GNUNET_assert (NULL != set->h);
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Putting set %s\n",
+              debug_str_set_key (&set->key));
+
   GNUNET_CRYPTO_hash (&set->key, sizeof (struct SetKey), &hash);
   GNUNET_assert (GNUNET_OK ==
                  GNUNET_CONTAINER_multihashmap_put (session->setmap, &hash, 
set,
@@ -931,26 +1297,183 @@
 output_cloned_cb (void *cls, struct GNUNET_SET_Handle *copy)
 {
   struct TaskEntry *task = (struct TaskEntry *) cls;
+  struct SetOpCls *setop = &task->cls.setop;
   struct ConsensusSession *session = task->step->session;
   struct SetEntry *set = GNUNET_new (struct SetEntry);
 
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
               "P%u: Received lazy copy, storing output set %s\n",
-              session->local_peer_idx, debug_str_set_key (&task->output_set));
+              session->local_peer_idx, debug_str_set_key (&setop->output_set));
 
-  set->key = task->output_set;
+  set->key = setop->output_set;
   set->h = copy;
   put_set (task->step->session, set);
-  run_task_remote_union (task->step->session, task);
+  task_start_reconcile (task);
 }
 
 
 static void
-run_task_remote_union (struct ConsensusSession *session, struct TaskEntry 
*task)
+task_cancel_reconcile (struct TaskEntry *task)
 {
+  /* not implemented yet */
+  GNUNET_assert (0);
+}
+
+
+static void
+apply_diff_to_rfn (struct DiffEntry *diff,
+                   struct ReferendumEntry *rfn,
+                   uint16_t voting_peer,
+                   uint16_t num_peers)
+{
+  struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
+  struct DiffElementInfo *di;
+
+  iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff->changes);
+
+  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, 
NULL, (const void **) &di))
+  {
+    if (di->weight > 0)
+    {
+      rfn_vote (rfn, voting_peer, num_peers, VOTE_ADD, di->element);
+    }
+    if (di->weight < 0)
+    {
+      rfn_vote (rfn, voting_peer, num_peers, VOTE_REMOVE, di->element);
+    }
+  }
+
+  GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+}
+
+
+struct DiffEntry *
+diff_create ()
+{
+  struct DiffEntry *d = GNUNET_new (struct DiffEntry);
+
+  d->changes = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
+  
+  return d;
+}
+
+
+struct DiffEntry *
+diff_compose (struct DiffEntry *diff_1,
+              struct DiffEntry *diff_2)
+{
+  struct DiffEntry *diff_new;
+  struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
+  struct DiffElementInfo *di;
+ 
+  diff_new = diff_create ();
+
+  iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_1->changes);
+  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, 
NULL, (const void **) &di))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "iterating first diff\n");
+    diff_insert (diff_new, di->weight, di->element);
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "insert done\n");
+  }
+  GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "iterating first diff done\n");
+
+  iter = GNUNET_CONTAINER_multihashmap_iterator_create (diff_2->changes);
+  while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, 
NULL, (const void **) &di))
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                "iterating second diff\n");
+    diff_insert (diff_new, di->weight, di->element);
+  }
+  GNUNET_CONTAINER_multihashmap_iterator_destroy (iter);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "iterating second diff done\n");
+
+  return diff_new;
+}
+
+
+struct ReferendumEntry *
+rfn_create (uint16_t size)
+{
+  struct ReferendumEntry *rfn;
+
+  rfn = GNUNET_new (struct ReferendumEntry);
+  rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
+  rfn->peer_commited = GNUNET_new_array (size, int);
+
+  return rfn;
+}
+
+
+static void
+diff_destroy (struct DiffEntry *diff)
+{
+  GNUNET_CONTAINER_multihashmap_destroy (diff->changes);
+  GNUNET_free (diff);
+}
+
+
+static void
+task_start_leader_apply (struct TaskEntry *task)
+{
+  struct LeaderApplyCls *lacls = &task->cls.leader_apply;
+  struct ConsensusSession *session = task->step->session;
+  struct DiffEntry *diff_1;
+  struct DiffEntry *diff_2;
+  struct DiffEntry *diff_combined;
+  struct ReferendumEntry *rfn;
+
+  diff_1 = lookup_diff (session, &lacls->input_diff_1);
+  GNUNET_assert (NULL != diff_1);
+
+  diff_2 = lookup_diff (session, &lacls->input_diff_2);
+  GNUNET_assert (NULL != diff_2);
+
+  rfn = lookup_rfn (session, &lacls->output_rfn);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "looked up everything\n");
+  
+  if (NULL == rfn)
+  {
+    rfn = rfn_create (session->num_peers);
+    rfn->key = lacls->output_rfn;
+    put_rfn (session, rfn);
+  }
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "ensured rfn\n");
+
+  diff_combined = diff_compose (diff_1, diff_2);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "composed diffs\n");
+
+  apply_diff_to_rfn (diff_combined, rfn, task->key.leader, session->num_peers);
+
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "applied diffs to rfns\n");
+
+  diff_destroy (diff_combined);
+
+  finish_task (task);
+}
+
+
+static void
+task_start_reconcile (struct TaskEntry *task)
+{
   struct SetEntry *input;
+  struct SetOpCls *setop = &task->cls.setop;
+  struct ConsensusSession *session = task->step->session;
 
-  input = lookup_set (session, &task->input_set);
+  input = lookup_set (session, &setop->input_set);
   GNUNET_assert (NULL != input);
   GNUNET_assert (NULL != input->h);
 
@@ -959,11 +1482,11 @@
      because we want something valid in there, even
      if the other peer doesn't talk to us */
 
-  if (SET_KIND_NONE != task->output_set.set_kind)
+  if (SET_KIND_NONE != setop->output_set.set_kind)
   {
     /* If we don't have an existing output set,
        we clone the input set. */
-    if (NULL == lookup_set (session, &task->output_set))
+    if (NULL == lookup_set (session, &setop->output_set))
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "Output set missing, copying from input set\n");
@@ -975,9 +1498,9 @@
     }
   }
 
-  if (RFN_KIND_NONE != task->output_rfn.rfn_kind)
+  if (RFN_KIND_NONE != setop->output_rfn.rfn_kind)
   {
-    if (NULL == lookup_rfn (session, &task->output_rfn))
+    if (NULL == lookup_rfn (session, &setop->output_rfn))
     {
       struct ReferendumEntry *rfn;
 
@@ -984,16 +1507,26 @@
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                   "P%u: output rfn <%s> missing, creating.\n",
                   session->local_peer_idx,
-                  debug_str_rfn_key (&task->output_rfn));
+                  debug_str_rfn_key (&setop->output_rfn));
 
-      rfn = GNUNET_new (struct ReferendumEntry);
-      rfn->key = task->output_rfn;
-      rfn->rfn_elements = GNUNET_CONTAINER_multihashmap_create (8, GNUNET_NO);
-      rfn->peer_commited = GNUNET_new_array (session->num_peers, int);
+      rfn = rfn_create (session->num_peers);
+      rfn->key = setop->output_rfn;
       put_rfn (session, rfn);
     }
   }
 
+  if (DIFF_KIND_NONE != setop->output_diff.diff_kind)
+  {
+    if (NULL == lookup_diff (session, &setop->output_diff))
+    {
+      struct DiffEntry *diff;
+
+      diff = diff_create ();
+      diff->key = setop->output_diff;
+      put_diff (session, diff);
+    }
+  }
+
   if (task->key.peer1 == session->local_peer_idx)
   {
     struct GNUNET_CONSENSUS_RoundContextMessage rcm = { 0 };
@@ -1001,7 +1534,7 @@
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                 "P%u: Looking up set {%s} to run remote union\n",
                 session->local_peer_idx,
-                debug_str_set_key (&task->input_set));
+                debug_str_set_key (&setop->input_set));
 
     rcm.header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
     rcm.header.size = htons (sizeof (struct 
GNUNET_CONSENSUS_RoundContextMessage));
@@ -1012,23 +1545,20 @@
     rcm.leader = htons (task->key.leader);
     rcm.repetition = htons (task->key.repetition);
 
-    GNUNET_assert (NULL == task->op);
+    GNUNET_assert (NULL == setop->op);
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: initiating set op with P%u, our 
set is %s\n",
-                session->local_peer_idx, task->key.peer2, debug_str_set_key 
(&task->input_set));
+                session->local_peer_idx, task->key.peer2, debug_str_set_key 
(&setop->input_set));
 
     // XXX: maybe this should be done while
     // setting up tasks alreays?
-    task->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
-                                   &session->global_id,
-                                   &rcm.header,
-                                   GNUNET_SET_RESULT_ADDED, /* XXX: will be 
obsolete soon */
-                                   set_result_cb,
-                                   task);
+    setop->op = GNUNET_SET_prepare (&session->peers[task->key.peer2],
+                                    &session->global_id,
+                                    &rcm.header,
+                                    GNUNET_SET_RESULT_SYMMETRIC,
+                                    set_result_cb,
+                                    task);
 
-    /* Referendums must be materialized as a set before */
-    GNUNET_assert (RFN_KIND_NONE == task->input_rfn.rfn_kind);
-
-    if (GNUNET_OK != GNUNET_SET_commit (task->op, input->h))
+    if (GNUNET_OK != GNUNET_SET_commit (setop->op, input->h))
     {
       GNUNET_break (0);
       /* XXX: cleanup? */
@@ -1041,9 +1571,8 @@
     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: waiting set op with P%u\n",
                 session->local_peer_idx, task->key.peer1);
 
-    if (NULL != task->op)
+    if (NULL != setop->op)
     {
-      GNUNET_assert (NULL == task->commited_set);
       commit_set (session, task);
     }
   }
@@ -1159,11 +1688,11 @@
 
   set = GNUNET_new (struct SetEntry);
   set->h = copy;
-  set->key = task->output_set;
+  set->key = task->cls.eval_rfn.output_set;
 
   put_set (session, set);
 
-  run_task_eval_rfn (session, task);
+  task_start_eval_rfn (task);
 }
 
 
@@ -1173,7 +1702,7 @@
  * set and store the result in the output set and/or output diff.
  */
 static void
-run_task_eval_rfn (struct ConsensusSession *session, struct TaskEntry *task)
+task_start_eval_rfn (struct TaskEntry *task)
 {
   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
   struct ReferendumEntry *input_rfn;
@@ -1181,24 +1710,23 @@
   struct SetEntry *output_set = NULL;
   struct DiffEntry *output_diff = NULL;
   struct SetChangeProgressCls *progress_cls;
+  struct EvalRfnCls *rcls = &task->cls.eval_rfn;
+  struct ConsensusSession *session = task->step->session;
 
   /* Have at least one output */
-  GNUNET_assert ( (task->output_set.set_kind != SET_KIND_NONE) ||
-                  (task->output_diff.diff_kind != DIFF_KIND_NONE));
+  GNUNET_assert ( (rcls->output_set.set_kind != SET_KIND_NONE) ||
+                  (rcls->output_diff.diff_kind != DIFF_KIND_NONE));
 
-  /* Not allowed as output */
-  GNUNET_assert ( (task->output_rfn.rfn_kind == RFN_KIND_NONE));
-
-  if (SET_KIND_NONE != task->output_set.set_kind)
+  if (SET_KIND_NONE != rcls->output_set.set_kind)
   {
     /* We have a set output, thus the output set must
        exist or copy it from the input set */
-    output_set = lookup_set (session, &task->output_set);
+    output_set = lookup_set (session, &rcls->output_set);
     if (NULL == output_set)
     {
       struct SetEntry *input_set;
 
-      input_set = lookup_set (session, &task->input_set);
+      input_set = lookup_set (session, &rcls->input_set);
       GNUNET_assert (NULL != input_set);
       GNUNET_SET_copy_lazy (input_set->h,
                             eval_rfn_copy_cb,
@@ -1209,21 +1737,26 @@
     }
   }
 
-  if (DIFF_KIND_NONE != task->output_diff.diff_kind)
+  if (DIFF_KIND_NONE != rcls->output_diff.diff_kind)
   {
-    output_diff = lookup_diff (session, &task->output_diff);
+    output_diff = lookup_diff (session, &rcls->output_diff);
     if (NULL == output_diff)
     {
-      output_diff = GNUNET_new (struct DiffEntry);
-      output_diff->key = task->output_diff;
-      output_diff->changes = GNUNET_CONTAINER_multihashmap_create (8, 
GNUNET_NO);
+      output_diff = diff_create ();
+      output_diff->key = rcls->output_diff;
       put_diff (session, output_diff);
     }
   }
 
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+              "Evaluating referendum in Task {%s}\n",
+              debug_str_task_key (&task->key));
+
+
   progress_cls = GNUNET_new (struct SetChangeProgressCls);
+  progress_cls->task = task;
 
-  input_rfn = lookup_rfn (session, &task->input_rfn);
+  input_rfn = lookup_rfn (session, &rcls->input_rfn);
 
   GNUNET_assert (NULL != input_rfn);
 
@@ -1232,18 +1765,28 @@
 
   while (GNUNET_YES == GNUNET_CONTAINER_multihashmap_iterator_next (iter, 
NULL, (const void **) &ri))
   {
-    int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, 
task->threshold);
+    int majority_vote = rfn_majority (session->num_peers, input_rfn, ri, 
rcls->threshold);
     switch (majority_vote)
     {
       case VOTE_ADD:
+#ifdef GNUNET_EXTRA_LOGGING
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "P%u: referendum vote result: VOTE_ADD for element %s in 
task {%s} with"
+                      "output set {%s} and output diff {%s}\n",
+                      session->local_peer_idx,
+                      debug_str_element (ri->element),
+                      debug_str_task_key (&task->key),
+                      debug_str_set_key (&rcls->output_set),
+                      debug_str_diff_key (&rcls->output_diff));
+#endif
         if (NULL != output_set)
         {
           progress_cls->num_pending++;
           GNUNET_assert (GNUNET_OK ==
                          GNUNET_SET_add_element (output_set->h,
-                                     ri->element,
-                                     eval_rfn_progress,
-                                     progress_cls));
+                                                 ri->element,
+                                                 eval_rfn_progress,
+                                                 progress_cls));
         }
         if (NULL != output_diff)
         {
@@ -1253,16 +1796,37 @@
       case VOTE_CONTESTED:
         if (NULL != output_set)
           output_set->is_contested = GNUNET_YES;
+#ifdef GNUNET_EXTRA_LOGGING
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "P%u: referendum vote result: VOTE_CONTESTED for element 
%s in task {%s} with"
+                    "output set {%s} and output diff {%s}\n",
+                    session->local_peer_idx,
+                    debug_str_element (ri->element),
+                    debug_str_task_key (&task->key),
+                    debug_str_set_key (&rcls->output_set),
+                    debug_str_diff_key (&rcls->output_diff));
+#endif
         /* fallthrough */
       case VOTE_REMOVE:
+#ifdef GNUNET_EXTRA_LOGGING
+        if (VOTE_REMOVE == majority_vote)
+          GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                      "P%u: referendum vote result: VOTE_REMOVE for element %s 
in task {%s} with"
+                      "output set {%s} and output diff {%s}\n",
+                      session->local_peer_idx,
+                      debug_str_element (ri->element),
+                      debug_str_task_key (&task->key),
+                      debug_str_set_key (&rcls->output_set),
+                      debug_str_diff_key (&rcls->output_diff));
+#endif
         if (NULL != output_set)
         {
           progress_cls->num_pending++;
           GNUNET_assert (GNUNET_OK ==
                          GNUNET_SET_remove_element (output_set->h,
-                                     ri->element,
-                                     eval_rfn_progress,
-                                     progress_cls));
+                                                    ri->element,
+                                                    eval_rfn_progress,
+                                                    progress_cls));
         }
         if (NULL != output_diff)
         {
@@ -1270,6 +1834,8 @@
         }
         break;
       case VOTE_NONE:
+        GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                    "referendum vote result: VOTE_NONE\n");
         /* Nothing to do. */
         break;
       default:
@@ -1294,14 +1860,15 @@
   struct TaskEntry *task = (struct TaskEntry *) cls;
   struct ConsensusSession *session = task->step->session;
   struct SetEntry *set;
+  struct ApplyDiffCls *diffop = &task->cls.apply_diff;
 
   set = GNUNET_new (struct SetEntry);
   set->h = copy;
-  set->key = task->output_set;
+  set->key = diffop->output_set;
 
   put_set (session, set);
 
-  run_task_apply_diff (session, task);
+  task_start_apply_diff (task);
 }
 
 
@@ -1334,7 +1901,7 @@
 
 
 static void
-run_task_apply_diff (struct ConsensusSession *session, struct TaskEntry *task)
+task_start_apply_diff (struct TaskEntry *task)
 {
   struct SetEntry *output_set;
   struct DiffEntry *input_diff;
@@ -1341,21 +1908,23 @@
   struct GNUNET_CONTAINER_MultiHashMapIterator *iter;
   struct DiffElementInfo *di;
   struct SetChangeProgressCls *progress_cls;
+  struct ApplyDiffCls *diffop = &task->cls.apply_diff;
+  struct ConsensusSession *session = task->step->session;
 
-  GNUNET_assert (task->output_set.set_kind != SET_KIND_NONE);
-  GNUNET_assert (task->input_diff.diff_kind != DIFF_KIND_NONE);
+  GNUNET_assert (diffop->output_set.set_kind != SET_KIND_NONE);
+  GNUNET_assert (diffop->input_diff.diff_kind != DIFF_KIND_NONE);
 
-  input_diff = lookup_diff (session, &task->input_diff);
+  input_diff = lookup_diff (session, &diffop->input_diff);
 
   GNUNET_assert (NULL != input_diff);
 
-  output_set = lookup_set (session, &task->output_set);
+  output_set = lookup_set (session, &diffop->output_set);
 
   if (NULL == output_set)
   {
       struct SetEntry *input_set;
 
-      input_set = lookup_set (session, &task->input_set);
+      input_set = lookup_set (session, &diffop->input_set);
       GNUNET_assert (NULL != input_set);
       GNUNET_SET_copy_lazy (input_set->h,
                             apply_diff_copy_cb,
@@ -1403,11 +1972,12 @@
 
 
 static void
-run_task_finish (struct ConsensusSession *session, struct TaskEntry *task)
+task_start_finish (struct TaskEntry *task)
 {
   struct SetEntry *final_set;
+  struct ConsensusSession *session = task->step->session;
 
-  final_set = lookup_set (session, &task->input_set);
+  final_set = lookup_set (session, &task->cls.finish.input_set);
 
   GNUNET_assert (NULL != final_set);
 
@@ -1418,37 +1988,17 @@
 }
 
 static void
-run_task (struct ConsensusSession *session, struct TaskEntry *task)
+start_task (struct ConsensusSession *session, struct TaskEntry *task)
 {
-  GNUNET_assert (GNUNET_NO == task->is_running);
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: starting task {%s}\n", 
session->local_peer_idx, debug_str_task_key (&task->key));
+
+  GNUNET_assert (GNUNET_NO == task->is_started);
   GNUNET_assert (GNUNET_NO == task->is_finished);
+  GNUNET_assert (NULL != task->start);
 
-  
-    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running task {%s}\n", 
session->local_peer_idx, debug_str_task_key (&task->key));
+  task->start (task);
 
-  switch (task->action)
-  {
-    case ACTION_RECONCILE:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_RECONCILE 
task\n", session->local_peer_idx);
-      run_task_remote_union (session, task);
-      break;
-    case ACTION_EVAL_RFN:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_EVAL_RFN 
task\n", session->local_peer_idx);
-      run_task_eval_rfn (session, task);
-      break;
-    case ACTION_APPLY_DIFF:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_APPLY_DIFF 
task\n", session->local_peer_idx);
-      run_task_apply_diff (session, task);
-      break;
-    case ACTION_FINISH:
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: running ACTION_FINISH 
task\n", session->local_peer_idx);
-      run_task_finish (session, task);
-      break;
-    default:
-      /* not reached */
-      GNUNET_assert (0);
-  }
-  task->is_running = GNUNET_YES;
+  task->is_started = GNUNET_YES;
 }
 
 
@@ -1515,7 +2065,7 @@
 
       step->is_running = GNUNET_YES;
       for (i = 0; i < step->tasks_len; i++)
-        run_task (session, step->tasks[i]);
+        start_task (session, step->tasks[i]);
 
       /* Sometimes there is no task to trigger finishing the step, so we have 
to do it here. */
       if ( (step->finished_tasks == step->tasks_len) && (GNUNET_NO == 
step->is_finished))
@@ -1730,12 +2280,6 @@
     return;
   }
 
-  if (ACTION_RECONCILE != task->action)
-  {
-    GNUNET_break_op (0);
-    return;
-  }
-
   if (GNUNET_YES == task->is_finished)
   {
     GNUNET_break_op (0);
@@ -1754,8 +2298,8 @@
   else
     my_result_cb = set_result_cb;
 
-  task->op = GNUNET_SET_accept (request,
-                                GNUNET_SET_RESULT_ADDED, /* XXX: obsolete soon 
*/
+  task->cls.setop.op = GNUNET_SET_accept (request,
+                                GNUNET_SET_RESULT_SYMMETRIC,
                                 my_result_cb,
                                 task);
   
@@ -1762,7 +2306,7 @@
   /* If the task hasn't been started yet, 
      we wait for that until we commit. */
 
-  if (GNUNET_YES == task->is_running)
+  if (GNUNET_YES == task->is_started)
   {
     commit_set (session, task);
   }
@@ -1969,11 +2513,11 @@
       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(1): %d %d %d %d\n", 
session->local_peer_idx, p1, p2, rep, lead);
       task = ((struct TaskEntry) {
         .step = step,
-        .action = ACTION_RECONCILE,
+        .start = task_start_reconcile,
+        .cancel = task_cancel_reconcile,
         .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, me 
},
-        .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
-        .output_set = (struct SetKey) { SET_KIND_NONE },
       });
+      task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
       put_task (session->taskmap, &task);
     }
     /* We run this task to make sure that the leader
@@ -1982,12 +2526,13 @@
        without the code having to handle any special cases. */
     task = ((struct TaskEntry) {
       .step = step,
-      .action = ACTION_RECONCILE,
       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, me, me, rep, me },
-      .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
-      .output_set = (struct SetKey) { SET_KIND_LEADER, rep, me },
-      .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, me },
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
+    task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, 
rep, me };
+    task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, 
rep, me };
     put_task (session->taskmap, &task);
   }
   else
@@ -1998,12 +2543,13 @@
     GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: GC LEADER(2): %d %d %d %d\n", 
session->local_peer_idx, p1, p2, rep, lead);
     task = ((struct TaskEntry) {
       .step = step,
-      .action = ACTION_RECONCILE,
       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_LEADER, p1, p2, rep, 
lead},
-      .input_set = (struct SetKey) { SET_KIND_CURRENT, rep },
-      .output_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
-      .output_diff = (struct DiffKey) { DIFF_KIND_LEADER, rep, lead },
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, rep };
+    task.cls.setop.output_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, 
rep, lead };
+    task.cls.setop.output_diff = (struct DiffKey) { DIFF_KIND_LEADER_PROPOSAL, 
rep, lead };
     put_task (session->taskmap, &task);
   }
 
@@ -2022,11 +2568,12 @@
     arrange_peers (&p1, &p2, n);
     task = ((struct TaskEntry) {
       .step = step,
-      .action = ACTION_RECONCILE,
       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO, p1, p2, rep, lead },
-      .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
-      .output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, 
rep, lead };
+    task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
     put_task (session->taskmap, &task);
   }
 
@@ -2041,12 +2588,12 @@
   task = ((struct TaskEntry) {
     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_ECHO_GRADE, -1, -1, rep, 
lead },
     .step = step,
-    .action = ACTION_EVAL_RFN,
-    .input_set = (struct SetKey) { SET_KIND_LEADER, rep, lead },
-    .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
-    .output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
-    .threshold = n - t,
+    .start = task_start_eval_rfn
   });
+  task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_LEADER_PROPOSAL, 
rep, lead },
+  task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
+  task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, 
lead },
+  task.cls.eval_rfn.threshold = n - t,
   put_task (session->taskmap, &task);
 
   prev_step = step;
@@ -2064,11 +2611,12 @@
     arrange_peers (&p1, &p2, n);
     task = ((struct TaskEntry) {
       .step = step,
-      .action = ACTION_RECONCILE,
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
       .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM, p1, p2, rep, 
lead},
-      .input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, lead },
-      .output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead },
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_ECHO_RESULT, rep, 
lead };
+    task.cls.setop.output_rfn = (struct RfnKey) { RFN_KIND_CONFIRM, rep, lead 
};
     put_task (session->taskmap, &task);
   }
 
@@ -2081,15 +2629,34 @@
 
   // evaluate ConfirmationReferendum and
   // apply it to the LeaderReferendum
+  // XXX: the diff should contain grading information
   task = ((struct TaskEntry) {
     .step = step,
     .key = (struct TaskKey) { PHASE_KIND_GRADECAST_CONFIRM_GRADE, -1, -1, rep, 
lead },
-    .action = ACTION_EVAL_RFN,
-    .input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead },
-    .output_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, rep },
+    .start = task_start_eval_rfn,
   });
+  task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_ECHO, rep, lead };
+  task.cls.eval_rfn.output_diff = (struct DiffKey) { 
DIFF_KIND_LEADER_CONSENSUS, rep, lead };
   put_task (session->taskmap, &task);
 
+
+  prev_step = step;
+  step = create_step (session, round, 1);
+#ifdef GNUNET_EXTRA_LOGGING
+  GNUNET_asprintf (&step->debug_name, "gc apply, lead %u rep %u", lead, rep);
+#endif
+  step_depend_on (step, prev_step);
+
+  task = ((struct TaskEntry) {
+    .step = step,
+    .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, rep, 
lead },
+    .start = task_start_leader_apply,
+  });
+  task.cls.leader_apply.input_diff_1 = (struct DiffKey) { 
DIFF_KIND_LEADER_PROPOSAL, rep, lead };
+  task.cls.leader_apply.input_diff_2 = (struct DiffKey) { 
DIFF_KIND_LEADER_CONSENSUS, rep, lead };
+  task.cls.leader_apply.output_rfn = (struct RfnKey) { 
RFN_KIND_GRADECAST_RESULT, rep };
+  put_task (session->taskmap, &task);
+
   step_depend_on (step_after, step);
 }
 
@@ -2142,10 +2709,12 @@
     task = ((struct TaskEntry) {
       .key = (struct TaskKey) { PHASE_KIND_ALL_TO_ALL, p1, p2, -1, -1 },
       .step = step,
-      .action = ACTION_RECONCILE,
-      .input_set = (struct SetKey) { SET_KIND_CURRENT, 0 },
-      .output_set = (struct SetKey) { SET_KIND_CURRENT, 0 },
+      .start = task_start_reconcile,
+      .cancel = task_cancel_reconcile,
     });
+    task.cls.setop.input_set = (struct SetKey) { SET_KIND_CURRENT, 0 };
+    task.cls.setop.output_set = task.cls.setop.input_set;
+    task.cls.setop.do_not_remove = GNUNET_YES;
     put_task (session->taskmap, &task);
   }
 
@@ -2164,7 +2733,7 @@
 
     step_rep_start = create_step (session, round, 1);
 #ifdef GNUNET_EXTRA_LOGGING
-      GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", 
i);
+    GNUNET_asprintf (&step_rep_start->debug_name, "gradecast start rep %u", i);
 #endif
 
     step_depend_on (step_rep_start, prev_step);
@@ -2171,7 +2740,7 @@
 
     step_rep_end = create_step (session, round, 1);
 #ifdef GNUNET_EXTRA_LOGGING
-      GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
+    GNUNET_asprintf (&step_rep_end->debug_name, "gradecast end rep %u", i);
 #endif
 
     /* parallel gradecasts */
@@ -2178,18 +2747,16 @@
     for (lead = 0; lead < n; lead++)
       construct_task_graph_gradecast (session, i, lead, step_rep_start, 
step_rep_end);
 
-    // TODO: add peers to ignore list,
-    //
-    // evaluate ConfirmationReferendum and
-    // apply it to the LeaderReferendum
+    // TODO: add peers to ignore list, either here or
+    // already in the gradecast.
     task = ((struct TaskEntry) {
       .step = step_rep_end,
-      .key = (struct TaskKey) { PHASE_KIND_GRADECAST_APPLY_RESULT, -1, -1, i, 
-1},
-      .action = ACTION_APPLY_DIFF,
-      .input_set = (struct SetKey) { SET_KIND_CURRENT, i },
-      .input_diff = (struct DiffKey) { DIFF_KIND_GRADECAST_RESULT, i },
-      .output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 },
+      .key = (struct TaskKey) { PHASE_KIND_APPLY_REP, -1, -1, i, -1},
+      .start = task_start_eval_rfn,
     });
+    task.cls.eval_rfn.input_set = (struct SetKey) { SET_KIND_CURRENT, i };
+    task.cls.eval_rfn.input_rfn = (struct RfnKey) { RFN_KIND_GRADECAST_RESULT, 
i };
+    task.cls.eval_rfn.output_set = (struct SetKey) { SET_KIND_CURRENT, i + 1 };
     put_task (session->taskmap, &task);
 
     prev_step = step_rep_end;
@@ -2206,9 +2773,9 @@
   task = ((struct TaskEntry) {
     .step = step,
     .key = (struct TaskKey) { PHASE_KIND_FINISH, -1, -1, -1, -1 },
-    .input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 },
-    .action = ACTION_FINISH,
+    .start = task_start_finish,
   });
+  task.cls.finish.input_set = (struct SetKey) { SET_KIND_CURRENT, t + 1 };
 
   put_task (session->taskmap, &task);
 }
@@ -2399,10 +2966,21 @@
   }
   session->num_client_insert_pending++;
   GNUNET_SET_add_element (initial_set, element, client_insert_done, session);
+
+#ifdef GNUNET_EXTRA_LOGGING
+  {
+    struct GNUNET_HashCode hash;
+
+    GNUNET_SET_element_hash (element, &hash);
+
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element %s added\n",
+                session->local_peer_idx,
+                GNUNET_h2s (&hash));
+  }
+#endif
+
   GNUNET_free (element);
   GNUNET_SERVER_receive_done (client, GNUNET_OK);
-
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "P%u: element added\n", 
session->local_peer_idx);
 }
 
 

Modified: gnunet/src/consensus/test_consensus.conf
===================================================================
--- gnunet/src/consensus/test_consensus.conf    2015-10-05 16:35:44 UTC (rev 
36442)
+++ gnunet/src/consensus/test_consensus.conf    2015-10-05 21:26:56 UTC (rev 
36443)
@@ -1,15 +1,14 @@
address@hidden@ ../../contrib/no_forcestart.conf
address@hidden@ ../../contrib/no_forcestart.conf
 
 [PATHS]
 GNUNET_TEST_HOME = /tmp/test-consensus/
 
 [consensus]
-# PREFIX = valgrind
+PREFIX = valgrind
 OPTIONS = -L INFO
 BINARY = gnunet-service-evil-consensus
 
-# Evil behavior: Peer 0 does not execute leader step
-#EVIL_SPEC = 0;pass;leader
+EVIL_SPEC = 0;cram;5
 
 # Evil behavior: Peer 0 adds 5 random elements when he is the gradecast leader
 # (every peer gets the same element.
@@ -19,6 +18,11 @@
 # (every peer gets different elements).
 #EVIL_SPEC = 0;stuff-different;leader;5
 
+
+
+[core]
+FORECESTART = YES
+
 [cadet]
 #PREFIX = valgrind
 

Modified: gnunet/src/include/gnunet_set_service.h
===================================================================
--- gnunet/src/include/gnunet_set_service.h     2015-10-05 16:35:44 UTC (rev 
36442)
+++ gnunet/src/include/gnunet_set_service.h     2015-10-05 21:26:56 UTC (rev 
36443)
@@ -470,7 +470,27 @@
 void
 GNUNET_SET_iterate_cancel (struct GNUNET_SET_Handle *set);
 
+/**
+ * Create a copy of an element.  The copy
+ * must be GNUNET_free-d by the caller.
+ *
+ * @param element the element to copy
+ * @return the copied element
+ */
+struct GNUNET_SET_Element *
+GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element);
 
+/**
+ * Hash a set element.
+ *
+ * @param element the element that should be hashed
+ * @param ret_hash a pointer to where the hash of @a element
+ *        should be stored
+ */
+void
+GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element, struct 
GNUNET_HashCode *ret_hash);
+
+
 #if 0                           /* keep Emacsens' auto-indent happy */
 {
 #endif

Modified: gnunet/src/set/gnunet-service-set.c
===================================================================
--- gnunet/src/set/gnunet-service-set.c 2015-10-05 16:35:44 UTC (rev 36442)
+++ gnunet/src/set/gnunet-service-set.c 2015-10-05 21:26:56 UTC (rev 36443)
@@ -844,6 +844,11 @@
     ee->mutations = NULL;
     ee->mutations_size = 0;
     ee->element_hash = hash;
+    GNUNET_break (GNUNET_YES ==
+                  GNUNET_CONTAINER_multihashmap_put (set->content->elements,
+                                                     &ee->element_hash,
+                                                     ee,
+                                                     
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   }
   else if (GNUNET_YES == _GSS_is_element_of_set (ee, set))
   {
@@ -859,11 +864,6 @@
     GNUNET_array_append (ee->mutations, ee->mutations_size, mut);
   }
 
-  GNUNET_break (GNUNET_YES ==
-                GNUNET_CONTAINER_multihashmap_put (set->content->elements,
-                                                   &ee->element_hash,
-                                                   ee,
-                                                   
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
   set->vt->add (set->state, ee);
 }
 

Modified: gnunet/src/set/set_api.c
===================================================================
--- gnunet/src/set/set_api.c    2015-10-05 16:35:44 UTC (rev 36442)
+++ gnunet/src/set/set_api.c    2015-10-05 21:26:56 UTC (rev 36443)
@@ -1112,4 +1112,40 @@
 }
 
 
+/**
+ * Create a copy of an element.  The copy
+ * must be GNUNET_free-d by the caller.
+ *
+ * @param element the element to copy
+ * @return the copied element
+ */
+struct GNUNET_SET_Element *
+GNUNET_SET_element_dup (const struct GNUNET_SET_Element *element)
+{
+  struct GNUNET_SET_Element *copy;
+
+  copy = GNUNET_malloc (element->size + sizeof (struct GNUNET_SET_Element));
+  copy->size = element->size;
+  copy->element_type = element->element_type;
+  copy->data = &copy[1];
+  memcpy ((void *) copy->data, element->data, copy->size);
+
+  return copy;
+}
+
+
+/**
+ * Hash a set element.
+ *
+ * @param element the element that should be hashed
+ * @param ret_hash a pointer to where the hash of @a element
+ *        should be stored
+ */
+void
+GNUNET_SET_element_hash (const struct GNUNET_SET_Element *element, struct 
GNUNET_HashCode *ret_hash)
+{
+  /* FIXME: The element type should also be hashed. */
+  GNUNET_CRYPTO_hash (element->data, element->size, ret_hash);
+}
+
 /* end of set_api.c */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]