gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28350 - gnunet/src/experimentation


From: gnunet
Subject: [GNUnet-SVN] r28350 - gnunet/src/experimentation
Date: Thu, 1 Aug 2013 14:25:15 +0200

Author: wachs
Date: 2013-08-01 14:25:15 +0200 (Thu, 01 Aug 2013)
New Revision: 28350

Modified:
   gnunet/src/experimentation/gnunet-daemon-experimentation.h
   gnunet/src/experimentation/gnunet-daemon-experimentation_nodes.c
   gnunet/src/experimentation/gnunet-daemon-experimentation_scheduler.c
Log:
changes to scheduler


Modified: gnunet/src/experimentation/gnunet-daemon-experimentation.h
===================================================================
--- gnunet/src/experimentation/gnunet-daemon-experimentation.h  2013-08-01 
11:30:57 UTC (rev 28349)
+++ gnunet/src/experimentation/gnunet-daemon-experimentation.h  2013-08-01 
12:25:15 UTC (rev 28350)
@@ -399,10 +399,14 @@
 
 
 /**
- * Start the scheduler component
+ * Add a new experiment for a node
+ *
+ * @param n the node
+ * @param e the experiment
+ * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
  */
 void
-GED_scheduler_add (struct Node *n, struct Experiment *e);
+GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound);
 
 /**
  * Start the scheduler component

Modified: gnunet/src/experimentation/gnunet-daemon-experimentation_nodes.c
===================================================================
--- gnunet/src/experimentation/gnunet-daemon-experimentation_nodes.c    
2013-08-01 11:30:57 UTC (rev 28349)
+++ gnunet/src/experimentation/gnunet-daemon-experimentation_nodes.c    
2013-08-01 12:25:15 UTC (rev 28350)
@@ -317,7 +317,7 @@
                        GNUNET_i2s (&n->id));
 
        /* Tell the scheduler to add a node with an experiment */
-       GED_scheduler_add (n, e);
+       GED_scheduler_add (n, e, GNUNET_YES);
        counter ++;
 }
 
@@ -786,9 +786,6 @@
                GNUNET_break (0);
                return;
        }
-
-       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer 
%s for experiment `%s'\n"),
-                       "STOP", GNUNET_i2s (peer), name);
        GED_scheduler_handle_stop (n, e);
 }
 

Modified: gnunet/src/experimentation/gnunet-daemon-experimentation_scheduler.c
===================================================================
--- gnunet/src/experimentation/gnunet-daemon-experimentation_scheduler.c        
2013-08-01 11:30:57 UTC (rev 28349)
+++ gnunet/src/experimentation/gnunet-daemon-experimentation_scheduler.c        
2013-08-01 12:25:15 UTC (rev 28350)
@@ -64,19 +64,41 @@
        struct Experiment *e;
        struct Node *n;
        int state;
+       int outbound;
        GNUNET_SCHEDULER_TaskIdentifier task;
 };
 
-struct ScheduledExperiment *waiting_head;
-struct ScheduledExperiment *waiting_tail;
+struct ScheduledExperiment *waiting_in_head;
+struct ScheduledExperiment *waiting_in_tail;
 
-struct ScheduledExperiment *running_head;
-struct ScheduledExperiment *running_tail;
+struct ScheduledExperiment *running_in_head;
+struct ScheduledExperiment *running_in_tail;
 
+struct ScheduledExperiment *waiting_out_head;
+struct ScheduledExperiment *waiting_out_tail;
+
+struct ScheduledExperiment *running_out_head;
+struct ScheduledExperiment *running_out_tail;
+
+
 static unsigned int experiments_scheduled;
 static unsigned int experiments_running;
 static unsigned int experiments_requested;
 
+
+static struct ScheduledExperiment *
+find_experiment (struct ScheduledExperiment *head, struct ScheduledExperiment 
*tail,
+                                                                struct Node 
*n, struct Experiment *e, int outbound)
+{
+       struct ScheduledExperiment *cur;
+       for (cur = head; NULL != cur; cur = cur->next)
+       {
+               if ((cur->n == n) && (cur->e == e) && (cur->outbound == 
outbound)) /* Node and experiment are equal */
+                       break;
+       }
+       return cur;
+}
+
 static void
 request_timeout (void *cls,const struct GNUNET_SCHEDULER_TaskContext* tc)
 {
@@ -86,19 +108,19 @@
        GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Peer `%s' did not respond to 
request for experiment `%s'\n",
                        GNUNET_i2s (&se->n->id), se->e->name);
 
-       GNUNET_CONTAINER_DLL_remove (waiting_head, waiting_tail, se);
+       GNUNET_CONTAINER_DLL_remove (waiting_out_head, waiting_out_tail, se);
        GNUNET_free (se);
 
        /* Remove experiment */
-
        GNUNET_assert (experiments_requested > 0);
        experiments_requested --;
        GNUNET_STATISTICS_set (GED_stats, "# experiments requested", 
experiments_requested, GNUNET_NO);
 }
 
-static void start_experiment (void *cls,const struct 
GNUNET_SCHEDULER_TaskContext* tc)
+static void run_experiment_inbound (void *cls,const struct 
GNUNET_SCHEDULER_TaskContext* tc)
 {
        struct ScheduledExperiment *se = cls;
+       struct GNUNET_TIME_Relative start;
        struct GNUNET_TIME_Relative end;
        struct GNUNET_TIME_Relative backoff;
 
@@ -111,15 +133,78 @@
                backoff.rel_value += GNUNET_CRYPTO_random_u32 
(GNUNET_CRYPTO_QUALITY_WEAK, 1000);
                GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to 
peer `%s' for `%s' for %llu ms\n",
                                GNUNET_i2s (&se->n->id), se->e->name, (unsigned 
long long) backoff.rel_value);
-               se->task = GNUNET_SCHEDULER_add_delayed (backoff, 
&start_experiment, se);
+               se->task = GNUNET_SCHEDULER_add_delayed (backoff, 
&run_experiment_inbound, se);
                return;
        }
        else if (BUSY == se->state)
                se->state = NOT_RUNNING;
 
-       if (NOT_RUNNING == se->state)
+       switch (se->state) {
+               case NOT_RUNNING:
+                       /* Send START_ACK message */
+                       //GED_nodes_request_start (se->n, se->e);
+                       se->state = REQUESTED;
+                       /* Schedule to run */
+                       start = 
GNUNET_TIME_absolute_get_remaining(se->e->start);
+                       if (0 == start.rel_value)
+                                       se->task = GNUNET_SCHEDULER_add_now 
(&run_experiment_inbound, se);
+                       else
+                                       se->task = GNUNET_SCHEDULER_add_delayed 
(start, &run_experiment_inbound, se);
+                       break;
+               case REQUESTED:
+                       /* Already requested */
+                       se->state = STARTED;
+               case STARTED:
+                       /* Experiment is running */
+                       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment 
`%s' peer for `%s'\n",
+                                       GNUNET_i2s (&se->n->id), se->e->name);
+
+                       /* do work here */
+
+                       /* Reschedule */
+                       end = 
GNUNET_TIME_absolute_get_remaining(GNUNET_TIME_absolute_add (se->e->stop, 
se->e->frequency));
+                       if (0 == end.rel_value)
+                       {
+                               se->state = STOPPED;
+                               return; /* End of experiment is reached */
+                       }
+                       /* Reschedule */
+                       se->task = GNUNET_SCHEDULER_add_delayed 
(se->e->frequency, &run_experiment_inbound, se);
+                       break;
+               case STOPPED:
+                       /* Experiment expired */
+                       break;
+               default:
+                       break;
+       }
+
+}
+
+static void run_experiment_outbound (void *cls,const struct 
GNUNET_SCHEDULER_TaskContext* tc)
+{
+       struct ScheduledExperiment *se = cls;
+       struct GNUNET_TIME_Relative end;
+       struct GNUNET_TIME_Relative backoff;
+
+       se->task = GNUNET_SCHEDULER_NO_TASK;
+
+       if (GNUNET_NO == GED_nodes_rts (se->n))
        {
-                       /* Send start message */
+               /* Cannot send to peer, core is busy */
+               se->state = BUSY;
+               backoff = GNUNET_TIME_UNIT_SECONDS;
+               backoff.rel_value += GNUNET_CRYPTO_random_u32 
(GNUNET_CRYPTO_QUALITY_WEAK, 1000);
+               GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Delaying start request to 
peer `%s' for `%s' for %llu ms\n",
+                               GNUNET_i2s (&se->n->id), se->e->name, (unsigned 
long long) backoff.rel_value);
+               se->task = GNUNET_SCHEDULER_add_delayed (backoff, 
&run_experiment_outbound, se);
+               return;
+       }
+       else if (BUSY == se->state)
+                       se->state = NOT_RUNNING; /* Not busy anymore, can send 
*/
+
+       switch (se->state) {
+               case NOT_RUNNING:
+                       /* Send START message */
                        GED_nodes_request_start (se->n, se->e);
                        se->state = REQUESTED;
                        se->task = GNUNET_SCHEDULER_add_delayed 
(EXP_RESPONSE_TIMEOUT, &request_timeout, se);
@@ -128,15 +213,12 @@
                                        GNUNET_i2s (&se->n->id), se->e->name);
                        experiments_requested ++;
                        GNUNET_STATISTICS_set (GED_stats, "# experiments 
requested", experiments_requested, GNUNET_NO);
-                       return;
-       }
-       else if (REQUESTED == se->state)
-       {
-                       /* Already requested */
-                       return;
-       }
-       else if (STARTED == se->state)
-       {
+                       break;
+               case REQUESTED:
+                       /* Expecting START_ACK */
+                       GNUNET_break (0);
+                       break;
+               case STARTED:
                        /* Experiment is running */
                        GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running experiment 
`%s' peer for `%s'\n",
                                        GNUNET_i2s (&se->n->id), se->e->name);
@@ -151,15 +233,17 @@
                                return; /* End of experiment is reached */
                        }
                        /* Reschedule */
-               se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, 
&start_experiment, se);
-       }
-
-       else if (STOPPED == se->state)
-       {
+               se->task = GNUNET_SCHEDULER_add_delayed (se->e->frequency, 
&run_experiment_outbound, se);
+                       break;
+               case STOPPED:
                        /* Experiment expired */
+                       break;
+               default:
+                       break;
        }
 }
 
+
 /**
  * Handle a START message from a remote node
  *
@@ -169,7 +253,21 @@
 void
 GED_scheduler_handle_start (struct Node *n, struct Experiment *e)
 {
+       struct ScheduledExperiment *se;
 
+       if ((NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, 
n, e, GNUNET_NO))) ||
+                (NULL != (se = find_experiment (running_in_head, 
running_in_tail, n, e, GNUNET_NO))))
+       {
+               GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received duplicate %s 
message from peer %s for experiment `%s'\n"),
+                               "START", GNUNET_i2s (&n->id), e->name);
+               GNUNET_break_op (0);
+               return;
+       }
+
+       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer 
%s for experiment `%s'\n"),
+                       "START", GNUNET_i2s (&n->id), e->name);
+
+       GED_scheduler_add (n, e, GNUNET_NO);
 }
 
 /**
@@ -181,7 +279,20 @@
 void
 GED_scheduler_handle_start_ack (struct Node *n, struct Experiment *e)
 {
+       struct ScheduledExperiment *se;
 
+       if (NULL == (se = find_experiment (waiting_in_head, waiting_in_tail, n, 
e, GNUNET_NO)))
+       {
+               GNUNET_break (0);
+               return;
+       }
+
+       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer 
%s for requested experiment `%s'\n"),
+                       "START_ACK", GNUNET_i2s (&n->id), e->name);
+
+       if (GNUNET_SCHEDULER_NO_TASK != se->task)
+               GNUNET_SCHEDULER_cancel (se->task);
+       se->task = GNUNET_SCHEDULER_add_now (&run_experiment_outbound, se);
 }
 
 
@@ -194,7 +305,23 @@
 void
 GED_scheduler_handle_stop (struct Node *n, struct Experiment *e)
 {
+       struct ScheduledExperiment *se;
 
+       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message from peer 
%s for experiment `%s'\n"),
+                       "STOP", GNUNET_i2s (&n->id), e->name);
+
+       if (NULL != (se = find_experiment (waiting_in_head, waiting_in_tail, n, 
e, GNUNET_NO)))
+       {
+               GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message 
from peer %s for waiting experiment `%s'\n"),
+                               "STOP", GNUNET_i2s (&n->id), e->name);
+       }
+
+       if (NULL != (se = find_experiment (running_in_head, running_in_tail, n, 
e, GNUNET_NO)))
+       {
+               GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _("Received %s message 
from peer %s for running experiment `%s'\n"),
+                               "STOP", GNUNET_i2s (&n->id), e->name);
+       }
+
 }
 
 /**
@@ -202,35 +329,51 @@
  *
  * @param n the node
  * @param e the experiment
+ * @param outbound are we initiator (GNUNET_YES) or client (GNUNET_NO)?
  */
 void
-GED_scheduler_add (struct Node *n, struct Experiment *e)
+GED_scheduler_add (struct Node *n, struct Experiment *e, int outbound)
 {
        struct ScheduledExperiment *se;
        struct GNUNET_TIME_Relative start;
        struct GNUNET_TIME_Relative end;
 
+       GNUNET_assert ((GNUNET_YES == outbound) || (GNUNET_NO == outbound));
+
        start = GNUNET_TIME_absolute_get_remaining(e->start);
        end = GNUNET_TIME_absolute_get_remaining(e->stop);
        if (0 == end.rel_value)
                        return; /* End of experiment is reached */
 
        /* Add additional checks here if required */
-
        se = GNUNET_malloc (sizeof (struct ScheduledExperiment));
        se->state = NOT_RUNNING;
+       se->outbound = outbound;
        se->e = e;
        se->n = n;
-       if (0 == start.rel_value)
-                       se->task = GNUNET_SCHEDULER_add_now (&start_experiment, 
se);
+
+       if (GNUNET_YES == outbound)
+       {
+               if (0 == start.rel_value)
+                               se->task = GNUNET_SCHEDULER_add_now 
(&run_experiment_outbound, se);
+               else
+                               se->task = GNUNET_SCHEDULER_add_delayed (start, 
&run_experiment_outbound, se);
+               GNUNET_CONTAINER_DLL_insert (waiting_out_head, 
waiting_out_tail, se);
+       }
        else
-                       se->task = GNUNET_SCHEDULER_add_delayed (start, 
&start_experiment, se);
+       {
+               if (0 == start.rel_value)
+                               se->task = GNUNET_SCHEDULER_add_now 
(&run_experiment_inbound, se);
+               else
+                               se->task = GNUNET_SCHEDULER_add_delayed (start, 
&run_experiment_inbound, se);
+               GNUNET_CONTAINER_DLL_insert (waiting_in_head, waiting_in_tail, 
se);
+       }
 
-       GNUNET_CONTAINER_DLL_insert (waiting_head, waiting_tail, se);
-       GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Added experiment `%s' for node to 
be scheduled\n",
-                       e->name, GNUNET_i2s(&se->n->id));
+       GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Added %s experiment `%s' for node 
to be scheduled\n",
+                       (GNUNET_YES == outbound) ? "outbound" : "inbound", 
e->name, GNUNET_i2s(&se->n->id));
        experiments_scheduled ++;
        GNUNET_STATISTICS_set (GED_stats, "# experiments scheduled", 
experiments_scheduled, GNUNET_NO);
+
 }
 
 /**
@@ -253,11 +396,11 @@
        struct ScheduledExperiment *cur;
        struct ScheduledExperiment *next;
 
-       next = waiting_head;
+       next = waiting_in_head;
        while (NULL != (cur = next))
        {
                        next = cur->next;
-                       GNUNET_CONTAINER_DLL_remove (waiting_head, 
waiting_tail, cur);
+                       GNUNET_CONTAINER_DLL_remove (waiting_in_head, 
waiting_in_tail, cur);
                        if (GNUNET_SCHEDULER_NO_TASK != cur->task)
                        {
                                        GNUNET_SCHEDULER_cancel (cur->task);
@@ -269,11 +412,11 @@
                        GNUNET_STATISTICS_set (GED_stats, "# experiments 
scheduled", experiments_scheduled, GNUNET_NO);
        }
 
-       next = running_head;
+       next = running_in_head;
        while (NULL != (cur = next))
        {
                        next = cur->next;
-                       GNUNET_CONTAINER_DLL_remove (running_head, 
running_tail, cur);
+                       GNUNET_CONTAINER_DLL_remove (running_in_head, 
running_in_tail, cur);
                        if (GNUNET_SCHEDULER_NO_TASK != cur->task)
                        {
                                        GNUNET_SCHEDULER_cancel (cur->task);




reply via email to

[Prev in Thread] Current Thread [Next in Thread]