gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28189 - in gnunet-planetlab/gplmt: . gplmt


From: gnunet
Subject: [GNUnet-SVN] r28189 - in gnunet-planetlab/gplmt: . gplmt
Date: Fri, 19 Jul 2013 13:11:39 +0200

Author: wachs
Date: 2013-07-19 13:11:38 +0200 (Fri, 19 Jul 2013)
New Revision: 28189

Modified:
   gnunet-planetlab/gplmt/gplmt.conf
   gnunet-planetlab/gplmt/gplmt.py
   gnunet-planetlab/gplmt/gplmt/Configuration.py
   gnunet-planetlab/gplmt/gplmt/Nodes.py
   gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
fixed test worker
added parallelism control


Modified: gnunet-planetlab/gplmt/gplmt/Configuration.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Configuration.py       2013-07-19 09:21:49 UTC 
(rev 28188)
+++ gnunet-planetlab/gplmt/gplmt/Configuration.py       2013-07-19 11:11:38 UTC 
(rev 28189)
@@ -39,15 +39,16 @@
 class Configuration:
     def __init__(self, filename, logger):
         assert (None != logger)
-        self.filename = filename
-        self.logger = logger
-        self.notifications = ""
+        self.gplmt_filename = filename
+        self.gplmt_logger = logger
+        self.gplmt_parallelism = 0
+        self.gplmt_notifications = ""
+        self.gplmt_taskfile = None
+        self.gplmt_nodesfile = None
         self.pl_slicename = ""
         self.pl_api_url = ""
         self.pl_username = None
         self.pl_password = None
-        self.taskfile = None
-        self.nodesfile = None
         self.ssh_username = ""
         self.ssh_add_unkown_hostkeys = False
         self.ssh_keyfile = None
@@ -65,21 +66,21 @@
         self.bb_webport = 0
         self.bb_slaveport = 0
     def load (self):
-        if (None == self.filename):            
+        if (None == self.gplmt_filename):            
             default_cfg = os.path.expanduser("~") + os.sep + ".gplmt" + 
os.sep+ "gplmt.conf"
             if (False == os.path.exists (default_cfg)):
                 print "Default configuration " +default_cfg+ " not found"
                 return False
             else:
-                self.logger.log ("Default configuration " +default_cfg+ " 
found")
-                self.filename = default_cfg
-        self.logger.log ("Loading configuration file '" + self.filename + "'")
-        if (False == os.path.exists (self.filename)):
-            print "File does not exist: '" + self.filename + "'"
+                self.gplmt_logger.log ("Default configuration " +default_cfg+ 
" found")
+                self.gplmt_filename = default_cfg
+        self.gplmt_logger.log ("Loading configuration file '" + 
self.gplmt_filename + "'")
+        if (False == os.path.exists (self.gplmt_filename)):
+            print "File does not exist: '" + self.gplmt_filename + "'"
             return False
         config = ConfigParser.RawConfigParser()
         try:
-            config.read(self.filename)
+            config.read(self.gplmt_filename)
         except ConfigParser.Error as e:
             print "Error parsing configuration: " + str (e)
             return False
@@ -109,19 +110,25 @@
         try:          
             try: 
                 # gplmt options
-                self.taskfile = config.get("gplmt", "tasks")
+                self.gplmt_taskfile = config.get("gplmt", "tasks")
             except ConfigParser.NoOptionError as e:
                 pass
             
             try: 
-                self.nodesfile = config.get("gplmt", "nodes")
+                self.gplmt_nodesfile = config.get("gplmt", "nodes")
             except ConfigParser.NoOptionError as e:
                 pass
-            
+
             try: 
-                self.notifications = config.get("gplmt", "notification")
+                # gplmt options
+                self.gplmt_parallelism = config.get("gplmt", "max_parallelism")
+                self.gplmt_parallelism = int(self.gplmt_parallelism)
             except ConfigParser.NoOptionError as e:
                 pass
+            try: 
+                self.gplmt_notifications = config.get("gplmt", "notification")
+            except ConfigParser.NoOptionError as e:
+                pass
         except ConfigParser.NoSectionError:
             pass                
 

Modified: gnunet-planetlab/gplmt/gplmt/Nodes.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Nodes.py       2013-07-19 09:21:49 UTC (rev 
28188)
+++ gnunet-planetlab/gplmt/gplmt/Nodes.py       2013-07-19 11:11:38 UTC (rev 
28189)
@@ -82,45 +82,45 @@
 class Nodes:
     def __init__(self, filename, logger):
         assert (None != logger)
-        self.logger = logger
-        self.filename = filename
+        self.gplmt_logger = logger
+        self.gplmt_filename = filename
         self.node_results = list ()
     def load (self):        
-        self.logger.log ("Loading node_results file '" + self.filename + "'")
+        self.gplmt_logger.log ("Loading node_results file '" + 
self.gplmt_filename + "'")
         try:
-            fobj = open (self.filename, "r") 
+            fobj = open (self.gplmt_filename, "r") 
             for line in fobj: 
                 line = line.strip() 
                 node = NodeResult.parse(line)                
                 if (None != node):
                     self.node_results.append(node)
-                    self.logger.log ("Found node '" + 
Util.print_ssh_connection (node) + "'")
+                    self.gplmt_logger.log ("Found node '" + 
Util.print_ssh_connection (node) + "'")
             fobj.close()
         except IOError:
-            print "File " + self.filename + " not found"
+            print "File " + self.gplmt_filename + " not found"
             return False
-        self.logger.log ("Loaded " + str(len(self.node_results)) + " 
node_results")
+        self.gplmt_logger.log ("Loaded " + str(len(self.node_results)) + " 
node_results")
         return True
 
 class StringNodes:
     def __init__(self, str, logger):
         assert (None != logger)
         self.str = str
-        self.logger = logger
+        self.gplmt_logger = logger
         self.node_results = list ()
     def load (self):        
-        self.logger.log ("Loading node_results '" + self.str + "'")
+        self.gplmt_logger.log ("Loading node_results '" + self.str + "'")
         node = NodeResult.parse(self.str)
         if (None == node):
             return False  
         self.node_results.append(node)
-        self.logger.log ("Loaded node '" +Util.print_ssh_connection (node)+ 
"'")
+        self.gplmt_logger.log ("Loaded node '" +Util.print_ssh_connection 
(node)+ "'")
         return True    
 
 class PlanetLabNodes:
     def __init__(self, configuration, logger):
         assert (None != logger)
-        self.logger = logger
+        self.gplmt_logger = logger
         self.configuration = configuration
         self.node_results = list ()
     def load (self):        
@@ -134,7 +134,7 @@
         if (self.configuration.pl_api_url == ""):            
             print "No PlanetLab API url given in configuration, fail!"
             return False
-        self.logger.log ("Retrieving node_results assigned to slice '" + 
self.configuration.pl_slicename + "' for user " +self.configuration.pl_username)
+        self.gplmt_logger.log ("Retrieving node_results assigned to slice '" + 
self.configuration.pl_slicename + "' for user " +self.configuration.pl_username)
         try:
             server = xmlrpclib.ServerProxy(self.configuration.pl_api_url)
         except:
@@ -158,7 +158,7 @@
         
         for node in node_hostnames:
             n = NodeResult(node, 22, self.configuration.pl_slicename, None)
-            self.logger.log ("Planetlab API returned: " + n.hostname)          
  
+            self.gplmt_logger.log ("Planetlab API returned: " + n.hostname)    
        
             self.node_results.append(n)
-        self.logger.log ("Planetlab API returned " + 
str(len(self.node_results)) + " node_results")     
+        self.gplmt_logger.log ("Planetlab API returned " + 
str(len(self.node_results)) + " node_results")     
         return True

Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-19 09:21:49 UTC (rev 
28188)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-19 11:11:38 UTC (rev 
28189)
@@ -194,15 +194,16 @@
             g_notifications.node_disconnected (self.node, False, "Failed to 
disconnect")
         else:
             g_notifications.node_disconnected (self.node, True, "Disconnected 
successfully")
-        g_logger.log (self.node.hostname + " : All tasks done for " + 
self.node.hostname)            
+        g_logger.log (self.node.hostname + " : All tasks done for " + 
self.node.hostname)
+        return            
              
             
 
 class TestWorker (AbstractWorker):
     def connect (self):
-        return True   
+        return TaskExecutionResult(Tasks.Taskresult.success, "", "")    
     def disconnect (self):       
-        return True        
+        return TaskExecutionResult(Tasks.Taskresult.success, "", "")     
     def exec_run_per_host (self, task):
         print "TestWorker executes per host  '" + task.name + "'"        
         return TaskExecutionResult(Tasks.Taskresult.success, 
"exec_run_per_host successful", "")
@@ -576,26 +577,43 @@
         assert (hasattr(notifications, 'task_completed'))
         self.target = target
         self.node_results = nodes
+        self.workers_waiting = list ()
+        self.workers_running = list ()
+        self.workers_finished = list ()
         self.tasks = tasks
+        self.parallelism = configuration.gplmt_parallelism
+        self.current_workers = 0
+        self.total_workers = 0
         g_configuration = configuration
         g_notifications = notifications
-        g_logger = logger;
+        g_logger = logger;        
     def start (self):
-        g_logger.log ("Starting execution on target '" + str (self.target) + 
"'")
-
+        if (0 != self.parallelism):
+            g_logger.log ("Starting execution on target '" + str (self.target))
+        else:
+            g_logger.log ("Starting execution on target '" + str (self.target) 
+ "' with a parallelism of " + str (self.parallelism))   
+            
+        self.total_workers = 0
         for node in self.node_results.node_results:
             nw = NodeWorker (self.target, node, self.tasks.copy())
-            workersList.append(nw)
-            nw.start()
-                    
-        # block main thread until all worker threads are finished to print 
summary
-        threads_done = False
-        while(not threads_done):
-            threads_done = True
-            for nw in workersList:
-                if(nw.thread.isAlive()):
-                    threads_done = False
-                    break
+            self.workers_waiting.append(nw)
+            self.total_workers += 1
+            
+        self.current_workers = 0
+        while (self.total_workers > len(self.workers_finished)):            
+            while ((0 != self.parallelism) and (len(self.workers_waiting) > 0) 
and (self.current_workers <= self.parallelism)):
+                nw = self.workers_waiting.pop(0)
+                nw.start()
+                nw = self.workers_running.append(nw)
+                self.current_workers += 1
+            for worker in self.workers_running:
+                if not worker.thread.isAlive():
+                    print worker.node.hostname + " finished"                   
 
+                    self.workers_running.remove(worker)
+                    self.workers_finished.append(worker)
+                    self.current_workers -= 1
             time.sleep(0.5)
+            g_logger.log(str(len(self.workers_finished)) +  " of " + str 
(self.total_workers) + " nodes finished")
+            
         g_notifications.summarize()
         

Modified: gnunet-planetlab/gplmt/gplmt.conf
===================================================================
--- gnunet-planetlab/gplmt/gplmt.conf   2013-07-19 09:21:49 UTC (rev 28188)
+++ gnunet-planetlab/gplmt/gplmt.conf   2013-07-19 11:11:38 UTC (rev 28189)
@@ -5,6 +5,8 @@
 #notification = simple
 notification = result
 tasks = contrib/tasklists/check_node.xml
+# Number of parallel workers, use 0 for unlimited
+#max_parallelism = 100
 
 [planetlab]
 slice = tumple_gnunet

Modified: gnunet-planetlab/gplmt/gplmt.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt.py     2013-07-19 09:21:49 UTC (rev 28188)
+++ gnunet-planetlab/gplmt/gplmt.py     2013-07-19 11:11:38 UTC (rev 28189)
@@ -117,19 +117,19 @@
                 assert False, "unhandled option"
     
         if (verbose == True):
-            main.logger = Util.Logger (True)
+            main.gplmt_logger = Util.Logger (True)
         else:
-            main.logger = Util.Logger (False)
+            main.gplmt_logger = Util.Logger (False)
             
         if (config_file != None):                            
             # Load configuration file
-            configuration = Configuration.Configuration (config_file, 
main.logger);
+            configuration = Configuration.Configuration (config_file, 
main.gplmt_logger);
             if (configuration.load() == False):
                 print "Failed to load configuration..."
                 sys.exit(2)
         else:
             # Load default configuration file
-            configuration = Configuration.Configuration (None, main.logger);
+            configuration = Configuration.Configuration (None, 
main.gplmt_logger);
             if (configuration.load() == False):
                 print "Failed to load default configuration..."
                 sys.exit(2)
@@ -138,19 +138,19 @@
         if (None != pl_password):
             configuration.pl_password = pl_password
         if (None != tasks_file):
-            configuration.taskfile = tasks_file
+            configuration.gplmt_taskfile = tasks_file
         if (None != nodes_file):
-            configuration.nodesfile = nodes_file                        
+            configuration.gplmt_nodesfile = nodes_file                        
         
     
-        # Load taskfile file
+        # Load gplmt_taskfile file
         if (None != command):
             print "Loading single command : " + str (command)
-            tasklist = Tasklist.Tasklist (configuration.taskfile, main.logger, 
startid);
-            tasklist.load_singletask(command, main.logger)
-        elif (configuration.taskfile):      
-            print "Loading task file : " + configuration.taskfile          
-            tasklist = Tasklist.Tasklist (configuration.taskfile, main.logger, 
startid);
+            tasklist = Tasklist.Tasklist (configuration.gplmt_taskfile, 
main.gplmt_logger, startid);
+            tasklist.load_singletask(command, main.gplmt_logger)
+        elif (configuration.gplmt_taskfile):      
+            print "Loading task file : " + configuration.gplmt_taskfile        
  
+            tasklist = Tasklist.Tasklist (configuration.gplmt_taskfile, 
main.gplmt_logger, startid);
             if (tasklist.load() == False):
                 sys.exit(2)  
         else:
@@ -177,7 +177,7 @@
     
         # Load hosts files: single host, nodes files, planetlab
         # command line beats configuration
-        if ((None == single_host) and (None == configuration.nodesfile) and
+        if ((None == single_host) and (None == configuration.gplmt_nodesfile) 
and
             (target != Targets.Target (Targets.Target.planetlab))):
             print "No nodes to use given!\n"
             usage()
@@ -185,20 +185,20 @@
             
         # Use a single host
         if (single_host != None):
-            main.logger.log ("Using single node '" + single_host + "'")
-            nodes = Nodes.StringNodes (single_host, main.logger)
+            main.gplmt_logger.log ("Using single node '" + single_host + "'")
+            nodes = Nodes.StringNodes (single_host, main.gplmt_logger)
             if (nodes.load() == False):
                 sys.exit(5) 
         # Use a nodes file
-        elif (None != configuration.nodesfile):
-            main.logger.log ("Using node file '" + configuration.nodesfile + 
"'")
-            nodes = Nodes.Nodes (configuration.nodesfile, main.logger);
+        elif (None != configuration.gplmt_nodesfile):
+            main.gplmt_logger.log ("Using node file '" + 
configuration.gplmt_nodesfile + "'")
+            nodes = Nodes.Nodes (configuration.gplmt_nodesfile, 
main.gplmt_logger);
             if (nodes.load() == False):
                 sys.exit(7)      
         elif (target == Targets.Target (Targets.Target.planetlab)):
             # Load PlanetLab nodes                
-            main.logger.log ("Using PlanetLab nodes")
-            nodes = Nodes.PlanetLabNodes (configuration, main.logger)
+            main.gplmt_logger.log ("Using PlanetLab nodes")
+            nodes = Nodes.PlanetLabNodes (configuration, main.gplmt_logger)
             if (nodes.load() == False):
                 sys.exit(6)        
         else:
@@ -206,17 +206,17 @@
             sys.exit(8)      
     
         # Set up notification
-        if (configuration.notifications == "simple"):
-            notifications = Notifications.SimpleNotification (main.logger)
-        elif (configuration.notifications == "result"):
-            notifications = Notifications.TaskListResultNotification 
(main.logger)
+        if (configuration.gplmt_notifications == "simple"):
+            notifications = Notifications.SimpleNotification 
(main.gplmt_logger)
+        elif (configuration.gplmt_notifications == "result"):
+            notifications = Notifications.TaskListResultNotification 
(main.gplmt_logger)
         else:
-            notifications = Notifications.TaskListResultNotification 
(main.logger)
+            notifications = Notifications.TaskListResultNotification 
(main.gplmt_logger)
     except KeyboardInterrupt:
         sys.exit(0)
 # Start execution
     try:
-        worker = Worker.Worker (main.logger, configuration, target, nodes, 
tasklist, notifications)
+        worker = Worker.Worker (main.gplmt_logger, configuration, target, 
nodes, tasklist, notifications)
         worker.start()
     except KeyboardInterrupt:
         print "Interrupt during execution, FIXME!"
@@ -234,7 +234,7 @@
   -n, --nodes=FILENAME       use node file FILENAME\n\
   -l, --tasks=FILENAME       use tasks file FILENAME\n\
   -t, --target=TARGET        TARGET={local|remote_ssh|planetlab}\n\
-  -C, --command=             run single command instead of taskfile, print 
output using -v\n\
+  -C, --command=             run single commandgplmt_taskfile of taskfile, 
print output using -v\n\
   -a, --all                  use all nodes assigned to PlanetLab slice instead 
of nodes file\n\
   -p, --password             password to access PlanetLab API\n\
   -H, --host                 run tasklist on given host\n\
@@ -246,7 +246,7 @@
 General help using GNU software: http://www.gnu.org/gethelp/";
 
 class Main:
-    logger = None;
+    gplmt_logger = None;
     def __init__(self):
         self.verbose = False;
         




reply via email to

[Prev in Thread] Current Thread [Next in Thread]