gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28172 - gnunet-planetlab/gplmt/gplmt


From: gnunet
Subject: [GNUnet-SVN] r28172 - gnunet-planetlab/gplmt/gplmt
Date: Thu, 18 Jul 2013 16:48:06 +0200

Author: wachs
Date: 2013-07-18 16:48:06 +0200 (Thu, 18 Jul 2013)
New Revision: 28172

Modified:
   gnunet-planetlab/gplmt/gplmt/Nodes.py
   gnunet-planetlab/gplmt/gplmt/Notifications.py
   gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
abstract worker loop, test worker loop and notifications are working


Modified: gnunet-planetlab/gplmt/gplmt/Nodes.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Nodes.py       2013-07-18 14:37:43 UTC (rev 
28171)
+++ gnunet-planetlab/gplmt/gplmt/Nodes.py       2013-07-18 14:48:06 UTC (rev 
28172)
@@ -34,7 +34,7 @@
     print "That's a bug! please check README: " + str(e)
     sys.exit (1) 
 
-class Node:
+class NodeResult:
     def __init__(self, hostname, port = None, username = None, password = 
None):
         self.hostname = hostname
         self.port = port
@@ -76,7 +76,7 @@
         elif len(hostport) > 2:
             raise Exception("Invalid node definition: " + line)
             return None 
-        return Node(hostname, port, username, password)
+        return NodeResult(hostname, port, username, password)
 
 
 class Nodes:
@@ -84,22 +84,22 @@
         assert (None != logger)
         self.logger = logger
         self.filename = filename
-        self.nodes = list ()
+        self.node_results = list ()
     def load (self):        
-        self.logger.log ("Loading nodes file '" + self.filename + "'")
+        self.logger.log ("Loading node_results file '" + self.filename + "'")
         try:
             fobj = open (self.filename, "r") 
             for line in fobj: 
                 line = line.strip() 
-                node = Node.parse(line)                
+                node = NodeResult.parse(line)                
                 if (None != node):
-                    self.nodes.append(node)
+                    self.node_results.append(node)
                     self.logger.log ("Found node '" + 
Util.print_ssh_connection (node) + "'")
             fobj.close()
         except IOError:
             print "File " + self.filename + " not found"
             return False
-        self.logger.log ("Loaded " + str(len(self.nodes)) + " nodes")
+        self.logger.log ("Loaded " + str(len(self.node_results)) + " 
node_results")
         return True
 
 class StringNodes:
@@ -107,13 +107,13 @@
         assert (None != logger)
         self.str = str
         self.logger = logger
-        self.nodes = list ()
+        self.node_results = list ()
     def load (self):        
-        self.logger.log ("Loading nodes '" + self.str + "'")
-        node = Node.parse(self.str)
+        self.logger.log ("Loading node_results '" + self.str + "'")
+        node = NodeResult.parse(self.str)
         if (None == node):
             return False  
-        self.nodes.append(node)
+        self.node_results.append(node)
         self.logger.log ("Loaded node '" +Util.print_ssh_connection (node)+ 
"'")
         return True    
 
@@ -122,7 +122,7 @@
         assert (None != logger)
         self.logger = logger
         self.configuration = configuration
-        self.nodes = list ()
+        self.node_results = list ()
     def load (self):        
 
         if (self.configuration.pl_password == ""):
@@ -134,7 +134,7 @@
         if (self.configuration.pl_api_url == ""):            
             print "No PlanetLab API url given in configuration, fail!"
             return False
-        self.logger.log ("Retrieving nodes assigned to slice '" + 
self.configuration.pl_slicename + "' for user " +self.configuration.pl_username)
+        self.logger.log ("Retrieving node_results assigned to slice '" + 
self.configuration.pl_slicename + "' for user " +self.configuration.pl_username)
         try:
             server = xmlrpclib.ServerProxy(self.configuration.pl_api_url)
         except:
@@ -157,8 +157,8 @@
             return False            
         
         for node in node_hostnames:
-            n = Node(node, 22, self.configuration.pl_slicename, None)
+            n = NodeResult(node, 22, self.configuration.pl_slicename, None)
             self.logger.log ("Planetlab API returned: " + n.hostname)          
  
-            self.nodes.append(n)
-        self.logger.log ("Planetlab API returned " + str(len(self.nodes)) + " 
nodes")     
+            self.node_results.append(n)
+        self.logger.log ("Planetlab API returned " + 
str(len(self.node_results)) + " node_results")     
         return True

Modified: gnunet-planetlab/gplmt/gplmt/Notifications.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Notifications.py       2013-07-18 14:37:43 UTC 
(rev 28171)
+++ gnunet-planetlab/gplmt/gplmt/Notifications.py       2013-07-18 14:48:06 UTC 
(rev 28172)
@@ -47,14 +47,14 @@
     def task_completed (self, node, tasks, success, message):
         assert (0)
 
-class NodeCollection:
+class NodeResultCollection:
     def __init__(self):
-        self.nodes = list ()
-    def add (self, node):
-        self.nodes.append (node)
-    def get (self, name):
-        for n in self.nodes:
-            if (n.name == name):
+        self.node_results = list ()
+    def add (self, node_res):
+        self.node_results.append (node_res)
+    def get (self, node):
+        for n in self.node_results:
+            if (n.node == node):
                 return n
         return None
         
@@ -64,7 +64,6 @@
         self.msg = ""
         self.output = ""
     def finished (self, result, fail, msg, output):
-        print self.task.name + " done with " + str (fail)
         self.result = result
         self.fail = fail
         self.msg = msg
@@ -88,9 +87,9 @@
                 return tl
         return None
 
-class Node:
-    def __init__(self, name):
-        self.name = name
+class NodeResult:
+    def __init__(self, node):
+        self.node = node
         self.start = 0
         self.end = 0
         self.tasks = list ()
@@ -101,7 +100,7 @@
     def __init__(self, logger):
         assert (None != logger)
         self.logger = logger
-        self.nodes = NodeCollection ()
+        self.node_results = NodeResultCollection ()
     def node_connected (self, node, success, message):
         return
     def node_disconnected (self, node, success, message):
@@ -109,7 +108,7 @@
     def tasklist_started (self, node, tasks, message):
         return
     def tasklist_completed (self, node, tasks, success, message):
-        self.nodes.add (Node(node))
+        self.node_results.add (NodeResult(node))
         if (success == True):
             print node + " : Tasklist '" +  tasks.name + "' completed 
successfully"
         else:
@@ -124,72 +123,72 @@
     def __init__(self, logger):
         assert (None != logger)
         self.logger = logger
-        self.nodes = NodeCollection ()
+        self.node_results = NodeResultCollection ()
     def summarize (self):
         maxNodeLen = 0
         maxTasklistLen = 0
         # Calculate max length of node names and tasklist names
-        for n in self.nodes.nodes:
-            nodeLen = len(n.name)
+        for nres in self.node_results.node_results:
+            nodeLen = len(nres.node.hostname)
             if (nodeLen > maxNodeLen):
                 maxNodeLen = nodeLen
-            for tl in n.tasklists.tasklists:
+            for tl in nres.tasklists.tasklists:
                 tlLen = len(tl.name)
                 if(tlLen > maxTasklistLen):
                     maxTasklistLen = tlLen
         # Sort output (success then fail)
-        self.nodes.nodes.sort(key=lambda x: not 
x.tasklists.tasklists[0].success)
+        self.node_results.node_results.sort(key=lambda x: not 
x.tasklists.tasklists[0].success)
         # Print organized output
-        for n in self.nodes.nodes:
-            sys.stdout.write(n.name)
-            diff = maxNodeLen - len(n.name)
+        for nres in self.node_results.node_results:            
+            sys.stdout.write(nres.node.hostname)
+            diff = maxNodeLen - len(nres.node.hostname)
             sys.stdout.write(' ' * diff + ' | ')
-            for tl in n.tasklists.tasklists:
+            for tl in nres.tasklists.tasklists:
                 sys.stdout.write(tl.name)
                 diff = maxTasklistLen - len(tl.name)
                 sys.stdout.write(' ' * diff + ' | ')
                 print 'success' if tl.success else 'failed in: '
-            for t in n.tasks:
-                if (t.result != Tasklist.Taskresult.success):
+            for t in nres.tasks:
+                if (t.fail == True):
                     print "\tFAIL: " + t.task.name + " with '" +t.msg+ "' and 
'" +t.output+ "'"
                 else:
                     print "\tSUC " + t.task.name + " with '" +t.msg+ "' and '" 
+t.output+ "'"                     
                  
                 
         #    tsk_str = ""
-        #    for t in n.tasks:
+        #    for t in nres.tasks:
         #        tsk_f = "[e]"
         #        if (t.fail == True):
         #            tsk_f = "[f]"
         #        else:
         #            tsk_f = "[s]"
         #        tsk_str += t.task.name + " " + tsk_f + " ->"
-        #    print n.name
+        #    print nres.name
     def node_connected (self, node, success, message):
         # Get node object
-        nodeObj = self.nodes.get(node)
+        node_resObj = self.node_results.get(node)
         # Create it if it doesn't exist
-        if(None == self.nodes.get(node)):
-            nodeObj = Node(node)
-            self.nodes.add(nodeObj)
+        if(None == self.node_results.get(node)):
+            node_resObj = NodeResult(node)
+            self.node_results.add(node_resObj)
         # Set node start time as of now
-        nodeObj.start = time.time()
-        nodeObj.connectSuccess = success
+        node_resObj.start = time.time()
+        node_resObj.connectSuccess = success
         if (False == success):
-            nodeObj.error_msg = message
+            node_resObj.error_msg = message
         return
     def node_disconnected (self, node, success, message):
         # Mainly need to set node end connection time
-        nodeObj = self.nodes.get(node)
+        nodeObj = self.node_results.get(node)
         nodeObj.end = time.time()
         return 
     def tasklist_started (self, node, tasks, message):
         # Get node object
-        nodeObj = self.nodes.get(node)
+        nodeObj = self.node_results.get(node)
         # Create it if it doesn't exist (shouldn't node_connected be called 
before this?)
-        if(None == self.nodes.get(node)):
-            nodeObj = Node(node)
-            self.nodes.add(nodeObj)
+        if(None == self.node_results.get(node)):
+            nodeObj = NodeResult(node)
+            self.node_results.add(nodeObj)
         # Create tasklist object
         tasklist = TaskList(tasks.name)
         # Add it to the collection of node tasklists
@@ -197,7 +196,7 @@
         return
     def tasklist_completed (self, node, tasks, success, message):
         # Mainly want to set tasklist end time and success status
-        nodeObj = self.nodes.get(node)
+        nodeObj = self.node_results.get(node)
         tasklist = nodeObj.tasklists.get(tasks.name)
         tasklist.end = time.time()
         tasklist.success = success            
@@ -206,29 +205,29 @@
         #else:
         #    print node + " : Tasklist '" +  tasks.name + "' completed with 
failure"
     def task_started (self, node, task, message):
-        nodeObj = self.nodes.get(node)
+        nodeObj = self.node_results.get(node)
         if (None == nodeObj):
-            print "Node not found!"
+            print "NodeResult not found!"
             return 
         nodeObj.tasks.append (Task (task))
         return
     def task_completed (self, node, task, result, message, output):       
-        nodeObj = self.nodes.get(node)
+        nodeObj = self.node_results.get(node)
         for t in nodeObj.tasks:
             if t.task is task:
                 if (result != Tasklist.Taskresult.success):
                     t.finished (result, True, message, output)
-                #else:
-                #    t.finished (result, False, message, output)
+                else:
+                    t.finished (result, False, message, output)
         return         
 
 class SimpleNotification (Notification):
     def __init__(self, logger):
         assert (None != logger)
         self.logger = logger
-        #self.nodes = NodeCollection ()
+        #self.node_results = NodeResultCollection ()
     def summarize (self):
-        #for n in self.nodes:
+        #for n in self.node_results:
         #    tsk_str = ""
         #    for t in n.tasks:
         #        tsk_f = "[e]"
@@ -251,9 +250,9 @@
             print node.hostname + " : disconnected with failure"    
     def tasklist_started (self, node, tasks, message):
         print node.hostname + " : Tasklist '" +  tasks.name + "' started"
-        #self.nodes.add (Node(node))
+        #self.node_results.add (NodeResult(node))
     def tasklist_completed (self, node, tasks, success, message):
-        if (success == True):
+        if (success == Tasklist.Taskresult.success):
             print node.hostname + " : Tasklist '" +  tasks.name + "' completed 
successfully"
         else:
             print node.hostname + " : Tasklist '" +  tasks.name + "' completed 
with failure"

Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-18 14:37:43 UTC (rev 
28171)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-18 14:48:06 UTC (rev 
28172)
@@ -29,6 +29,7 @@
 import sys
 import select
 import signal
+import inspect
 
 try:
     import gplmt.Configuration as Configuration
@@ -93,22 +94,30 @@
         self.node = node
         self.tasks = tasks    
     def connect (self):
-        raise NotImplementedError   
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def disconnect (self):       
-        raise NotImplementedError 
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_run_per_host (self, task):
-        raise NotImplementedError        
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_run (self, task):
-        raise NotImplementedError
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_put (self):
-        raise NotImplementedError
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_get (self):
-        raise NotImplementedError
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def run(self):    
         global interrupt
         tasklist_success = True
         # Connecting
-        res = self.connect()
+        res = False
+        try:
+            res = self.connect()
+        except NotImplementedError as e:
+            print "Not implemented: " + str(self.__class__) + " function: " +  
str(e)
+            pass
+        except Exception as e:
+            print "Exception in Worker: " + str (e)
+            pass
         if (False == res):
             g_notifications.node_connected (self.node, False, "Failed to 
connect")
             return
@@ -120,46 +129,71 @@
         g_notifications.tasklist_started (self.node, self.tasks, "")           
 
         task = self.tasks.get()
         
-        #if (interrupt):
-        #    return Tasklist.Taskresult.user_interrupt
+        if (interrupt):
+            g_notifications.tasklist_completed (self.node, self.tasks, 
Tasks.Taskresult.user_interrupt, "")         
                
         # Executing Tasks 
         while (None != task and not interrupt):            
             g_logger.log (self.node.hostname + " : Running task id " 
+str(task.id)+" '" + task.name + "'")                        
             g_notifications.task_started (self.node, task, "")
-            if (task.type == Tasks.Operation.run):
-                task_result = self.exec_run (task)
-                assert (None != task_result)
-                g_notifications.task_completed (self.node, task, 
task_result.result, task_result.message, task_result.output)
-            elif (task.type == Tasks.Operation.put):
-                task_result = self.exec_put (task)
-                assert (None != task_result)
-                g_notifications.task_completed (self.node,  task, 
task_result.result, task_result.message, task_result.output)
-            elif (task.type == Tasks.Operation.get):
-                task_result = self.exec_get (task)
-                assert (None != task_result)                
-                g_notifications.task_completed (self.node,  task, 
task_result.result, task_result.message, task_result.output)
-            elif (task.type == Tasks.Operation.run_per_host):
-                task_result = self.exec_run_per_host (task)
-                assert (None != task_result)
-                g_notifications.task_completed (self.node,  task, 
task_result.result, task_result.message, task_result.output)                   
-            else:
-                print "UNSUPPORTED OPERATION!"
+            try:
+                if (task.type == Tasks.Operation.run):
+                    task_result = self.exec_run (task)
+                    assert (None != task_result)
+                    g_notifications.task_completed (self.node, task, 
task_result.result, task_result.message, task_result.output)
+                elif (task.type == Tasks.Operation.put):
+                    task_result = self.exec_put (task)
+                    assert (None != task_result)
+                    g_notifications.task_completed (self.node,  task, 
task_result.result, task_result.message, task_result.output)
+                elif (task.type == Tasks.Operation.get):
+                    task_result = self.exec_get (task)
+                    assert (None != task_result)                
+                    g_notifications.task_completed (self.node,  task, 
task_result.result, task_result.message, task_result.output)
+                elif (task.type == Tasks.Operation.run_per_host):
+                    task_result = self.exec_run_per_host (task)
+                    assert (None != task_result)
+                    g_notifications.task_completed (self.node,  task, 
task_result.result, task_result.message, task_result.output)                   
+                else:
+                    print "UNSUPPORTED OPERATION!"
+            except NotImplementedError as e:
+                print "Not implemented" + str (e)
+                pass
+            except Exception as e:
+                print "Exception in Worker:" + str (e)
+                pass
+            
             if ((task_result.result != Tasks.Taskresult.success) and 
(task.stop_on_fail == True)):
                 g_logger.log (self.node.hostname + " : Task failed and 
therefore execution is stopped")
-                g_notifications.task_completed (self.node.hostname, task, 
task_result.result, task_result.message, task_result.output)         
+                g_notifications.task_completed (self.node, task, 
task_result.result, task_result.message, task_result.output)         
                 self.disconnect()
                 tasklist_success = False
                 break                      
             task = self.tasks.get()
+
+        if (interrupt):            
+            g_notifications.tasklist_completed (self.node, self.tasks, 
Tasks.Taskresult.user_interrupt, "")
+            if (None != task):
+                g_notifications.task_completed (self.node, task, 
task_result.user_interrupt, "task was interrupted", "")                     
+                     
+        if (False == tasklist_success):
+            g_notifications.tasklist_completed (self.node, self.tasks, 
Tasks.Taskresult.fail, "")
+        else:
+            g_notifications.tasklist_completed (self.node, self.tasks, 
Tasks.Taskresult.success, "")   
+        #disconnect
+        try:
+            res = self.disconnect()
+        except NotImplementedError as e:
+            print "Not implemented: " + str(self.__class__) + " function: " +  
str(e)
+            pass
+        except Exception as e:
+            print "Exception in Worker:" + str (e)
+            pass
             
-        #disconnect
-        res = self.disconnect()
         if (False == res):
             g_notifications.node_disconnected (self.node, False, "Failed to 
disconnect")
         else:
             g_notifications.node_disconnected (self.node, True, "Disconnected 
successfully")
-        g_notifications.tasklist_completed (self.node, self.tasks, 
tasklist_success, "")
+        g_logger.log (self.node.hostname + " : All tasks done for " + 
self.node.hostname)            
              
             
 
@@ -183,45 +217,45 @@
 
 class LocalWorker (AbstractWorker):
     def connect (self):
-        raise NotImplementedError   
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def disconnect (self):       
         raise NotImplementedError     
     def exec_run_per_host (self, task):
-        raise NotImplementedError        
+        raise NotImplementedError (inspect.stack()[0][3])        
     def exec_run (self, task):
-        raise NotImplementedError
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_put (self):
-        raise NotImplementedError
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_get (self):
-        raise NotImplementedError        
+        raise NotImplementedError (inspect.stack()[0][3]) 
 
 class RemoteSSHWorker (AbstractWorker):
     def connect (self):
-        raise NotImplementedError   
+        raise NotImplementedError (inspect.stack()[0][3])    
     def disconnect (self):       
-        raise NotImplementedError     
+        raise NotImplementedError (inspect.stack()[0][3])     
     def exec_run_per_host (self, task):
-        raise NotImplementedError        
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_run (self, task):
-        raise NotImplementedError
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_put (self):
-        raise NotImplementedError
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_get (self):
-        raise NotImplementedError        
+        raise NotImplementedError (inspect.stack()[0][3]) 
     
 class PlanetLabWorker (AbstractWorker):
     def connect (self):
-        raise NotImplementedError   
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def disconnect (self):       
-        raise NotImplementedError     
+        raise NotImplementedError (inspect.stack()[0][3])  
     def exec_run_per_host (self, task):
-        raise NotImplementedError        
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_run (self,task):
-        raise NotImplementedError
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_put (self):
-        raise NotImplementedError
+        raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_get (self):
-        raise NotImplementedError   
+        raise NotImplementedError (inspect.stack()[0][3]) 
     
 
 class NodeWorker ():
@@ -267,7 +301,7 @@
         assert (hasattr(notifications, 'task_started'))
         assert (hasattr(notifications, 'task_completed'))
         self.target = target
-        self.nodes = nodes
+        self.node_results = nodes
         self.tasks = tasks
         g_configuration = configuration
         g_notifications = notifications
@@ -275,12 +309,11 @@
     def start (self):
         g_logger.log ("Starting execution on target '" + str (self.target) + 
"'")
 
-        for node in self.nodes.nodes:
+        for node in self.node_results.node_results:
             nw = NodeWorker (self.target, node, self.tasks.copy())
             workersList.append(nw)
             nw.start()
-            
-        return            
+                    
         # block main thread until all worker threads are finished to print 
summary
         threads_done = False
         while(not threads_done):




reply via email to

[Prev in Thread] Current Thread [Next in Thread]