gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28161 - gnunet-planetlab/gplmt/gplmt


From: gnunet
Subject: [GNUnet-SVN] r28161 - gnunet-planetlab/gplmt/gplmt
Date: Thu, 18 Jul 2013 15:05:17 +0200

Author: wachs
Date: 2013-07-18 15:05:16 +0200 (Thu, 18 Jul 2013)
New Revision: 28161

Modified:
   gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
worker loop


Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-18 12:56:27 UTC (rev 
28160)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-18 13:05:16 UTC (rev 
28161)
@@ -30,13 +30,10 @@
 import select
 import signal
 
-
-
-
 try:
     import gplmt.Configuration as Configuration
     import gplmt.Util as Util
-    import gplmt.Tasks as Tasklist
+    import gplmt.Tasks as Tasks
     import gplmt.Targets as Targets
     from gplmt.SCP import SCPClient
     from gplmt.SCP import SCPException
@@ -89,343 +86,100 @@
 g_configuration = None
 workersList = list()
 
-class NodeWorkerThread (threading.Thread):
-    def __init__(self, threadID, node, tasks):
-        threading.Thread.__init__(self)
-        self.threadID = threadID
-        self.node = node
-        self.tasks = tasks
-
-    def exec_run_per_host (self, task, transport):
-        found = False
-        default = None
-        cmd = None          
-        try:
-            f = open(task.command_file, 'r')
-            for line in f:
-                if (line[0] == '#'):
-                    continue;
-                sline = line.split (';',2)
-                if (sline[0] == self.node.hostname):
-                    cmd = sline[1].strip()
-                    found = True
-                if (sline[0] == ''):
-                    default = sline[1].strip()
-            f.close()
-        except IOError as e:
-            print str(e)
-        
-        t = task.copy()
-        if (found == True):
-            g_logger.log (self.node.hostname + " : Found specific command '"+ 
cmd + "'")
-            t.command = cmd
-            t.arguments = ""
-        elif ((found == False) and (default != None)):
-            g_logger.log (self.node.hostname + " : Using default command '"+ 
default + "'")
-            t.command = default
-            t.arguments = ""
-        else:
-            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' 
failed: no command to execute")
-            return Tasklist.Taskresult.fail
-        
-        return self.exec_run(t, transport)
-        
-    def exec_run (self, task, transport):
-        global interrupt
-        message = "undefined"
-        output = ""
-        if(interrupt):
-            message = "'"+ task.name +  "' interrupted by user"
-            g_logger.log (self.node.hostname + " : Task '"+ message)
-            return TaskExecutionResult(Tasklist.Taskresult.user_interrupt, 
"interrupted by user", "")
-        if ((task.command == None) and (task.arguments == None)):
-            message = "'"+ task.name +  "' no command to execute"
-            g_logger.log (self.node.hostname + " : Task " + message)
-            return TaskExecutionResult(Tasklist.Taskresult.fail, "no command 
to execute", "")
-        try:
-            channel = transport.open_session()
-            channel.settimeout(1.0)
-            channel.get_pty ()
-            #print self.node + " CMD: " + task.command + " " + task.arguments
-            channel.exec_command(task.command + " " + task.arguments)
-            
-        except SSHException as e:
-            g_logger.log (self.node.hostname + " : Error while trying to 
connect: " + str(e))
-        
-        if (task.timeout > 0):
-            timeout = task.timeout
-        else:
-            timeout = -1
-        result = Tasklist.Taskresult.success
-        exit_status = -1
-        start_time = time.time ()
-        
-        stdout_data = ""
-        stderr_data = ""
-        
-        while 1:
-            if(interrupt):
-                result = Tasklist.Taskresult.user_interrupt
-                break
-            if (timeout != -1):
-                delta = time.time() - start_time
-                if (delta > timeout):
-                    g_logger.log (self.node.hostname + " : Timeout after " 
+str(delta) +" seconds")                    
-                    result = Tasklist.Taskresult.timeout
-                    break
-            (r, w, e) = select.select([channel], [], [], 1)
-            if r:
-                got_data = False
-                if channel.recv_ready():
-                    data = r[0].recv(4096)
-                    if data:
-                        got_data = True
-                        g_logger.log (self.node.hostname + " : " + data)
-                        output += data
-                        stdout_data += data
-                if channel.recv_stderr_ready():
-                    data = r[0].recv_stderr(4096)
-                    if data:
-                        got_data = True
-                        g_logger.log (self.node.hostname + " : " + data)
-                        output += data
-                        stderr_data += data
-                if not got_data:
-                    break
-        
-        if (result == Tasklist.Taskresult.success):
-            exit_status = channel.recv_exit_status ()  
-        
-        if (result == Tasklist.Taskresult.success):
-            if (task.expected_return_code != -1):                    
-                if (exit_status != task.expected_return_code):
-                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' completed after "+ str(time.time() - start_time) +" sec, but exit code " 
+str(exit_status)+ " was not as expected " + str(task.expected_return_code))
-                    g_logger.log (stdout_data)
-                    g_logger.log (stderr_data)
-                    result = Tasklist.Taskresult.return_value_did_not_match
-                else:
-                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code " 
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
-            
-            if (task.expected_output != None):
-                output_contained = False
-                if (task.expected_output in stdout_data):
-                    output_contained = True
-                if (task.expected_output in stderr_data):
-                        output_contained = True                
-                if (output_contained == True):
-                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' expected output '"+task.expected_output+"' was found")
-                else:
-                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' expected output '"+task.expected_output+"' was not found")
-                    result = Tasklist.Taskresult.output_did_not_match
-                    
-        if (result == Tasklist.Taskresult.success):
-            message = "'"+ task.name +  "' successful"
-            g_logger.log (self.node.hostname + " : Task " + message)
-        elif (result == Tasklist.Taskresult.timeout):
-            message = "'"+ task.name +  "' with timeout"
-            g_logger.log (self.node.hostname + " : Task "+ message)
-        elif (result == Tasklist.Taskresult.user_interrupt):
-            message = "'"+ task.name +  "' interrupted by user"
-            g_logger.log (self.node.hostname + " : Task "+ message)
-        else: 
-            message = "'"+ task.name +  "' failed"
-            g_logger.log (self.node.hostname + " : Task "+ message)
-        return TaskExecutionResult(result, message, output)
-    def exec_put (self, task, transport):
-        if (False == os.path.exists (task.src)):
-            return Tasklist.Taskresult.src_file_not_found
-            
-        result = Tasklist.Taskresult.success
-        try:
-            if (g_configuration.ssh_transfer == 
Configuration.TransferMode.scp):
-                try:
-                    scp = SCPClient (transport)
-                    scp.put (task.src, task.dest)
-                except SCPException as e:
-                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' :" + str(e))
-                    result = Tasklist.Taskresult.fail
-                    pass
-            if (g_configuration.ssh_transfer == 
Configuration.TransferMode.sftp):                
-                sftp = paramiko.SFTPClient.from_transport (transport)
-                sftp.put(task.src, task.dest)
-                sftp.close()
-        except paramiko.SSHException as e:
-            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :" 
+ str(e))
-            result = Tasklist.Taskresult.fail
-            pass
-        except (OSError, IOError) as e:
-            g_logger.log (self.node + " : Task '"+ task.name + "' : " + str(e))
-            result = Tasklist.Taskresult.src_file_not_found 
-            pass           
-        return result
-            
-    def exec_get (self, task, transport):
-        result = Tasklist.Taskresult.success
-        try:
-            if (g_configuration.ssh_transfer == 
Configuration.TransferMode.scp): 
-                try:
-                    scp = SCPClient (transport)
-                    scp.get (task.src, task.dest)
-                except SCPException as e:
-                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' :" + str(e))
-                    result = Tasklist.Taskresult.fail
-                    pass                
-            if (g_configuration.ssh_transfer == 
Configuration.TransferMode.sftp):
-                sftp = paramiko.SFTPClient.from_transport (transport)
-                sftp.get (task.src, task.dest)
-                sftp.close()
-        except paramiko.SSHException as e:
-            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :" 
+ str(e))
-            result = Tasklist.Taskresult.fail
-            pass
-        except (OSError, IOError) as e:
-            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' : " 
+ str(e))
-            result = Tasklist.Taskresult.src_file_not_found 
-            pass           
-        return result    
-          
-    def run(self):
-        global interrupt
-        g_logger.log (self.node.hostname + " : Starting tasklist " + 
self.tasks.name)
-        task = self.tasks.get()
-        if(interrupt):
-            return Tasklist.Taskresult.user_interrupt
-        try: 
-            ssh = paramiko.SSHClient()
-            if (g_configuration.ssh_use_known_hosts):
-                g_logger.log (self.node.hostname + " : Loading known hosts")
-                ssh.load_system_host_keys ()
-
-            # Automatically add new hostkeys
-            if (g_configuration.ssh_add_unkown_hostkeys == True):
-                ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
-            # check for private key existance
-            keyfile = None
-            if (g_configuration.ssh_keyfile != None): 
-                if (os.path.exists (g_configuration.ssh_keyfile)):
-                    g_logger.log (self.node.hostname + " : Found " + 
g_configuration.ssh_keyfile)
-                    keyfile = g_configuration.ssh_keyfile
-                else:
-                    g_logger.log (self.node.hostname + " : Not found " + 
g_configuration.ssh_keyfile)
-            
-            
-            g_logger.log (self.node.hostname + " : Trying to connect to '" 
+Util.print_ssh_connection (self.node) + "'")
-            if self.node.username is not None: #credentials are supplied in 
node file
-                if (self.node.password is not None):                    
-                    #print 'HERE 1!' +  self.node.hostname + 
str(self.node.port or 22) + self.node.username
-                    ssh.connect (self.node.hostname,
-                                 port=self.node.port or 22,
-                                 username=self.node.username,
-                                 password=self.node.password,
-                                 timeout=10)
-                else:
-                    #print 'HERE 2!' +  self.node.hostname + 
str(self.node.port or 22) + self.node.username
-                    ssh.connect (self.node.hostname,
-                                 port=self.node.port or 22,
-                                 username=self.node.username,
-                                 timeout=10)                                   
     
-            elif ("" != g_configuration.ssh_username):
-                g_logger.log (self.node.hostname + " : Using private keyfile 
'" +str(keyfile)+ "'")
-                ssh.connect (self.node.hostname,
-                       port=self.node.port or 22,
-                                       username=g_configuration.ssh_username, 
-                                       password=g_configuration.ssh_password,
-                                       timeout=10,
-                                       key_filename=keyfile)
-            else:
-                g_logger.log (self.node.hostname + " : Trying to connect to " 
+ 
-                          self.node.hostname + 
-                          " using password '" + g_configuration.ssh_password+ 
-                          "' and private keyfile '" +str(keyfile)+ "'")
-                ssh.connect (self.node.hostname,
-                       port=self.node.port or 22,
-                                       password=g_configuration.ssh_password,
-                                       timeout=10,
-                                       key_filename=keyfile)                 
-        except (IOError,
-                paramiko.SSHException,
-                paramiko.BadHostKeyException, 
-                paramiko.AuthenticationException,
-                socket.error) as e:  
-            g_logger.log (self.node.hostname + " : Error while trying to 
connect: " + str(e))
-            g_notifications.node_connected (self.node.hostname, False, str(e))
-            g_notifications.tasklist_completed (self.node.hostname, 
self.tasks, False, "")
-            return
-        
-        g_notifications.node_connected (self.node.hostname, True, "")
-        transport = ssh.get_transport()        
-        success = True
-        task_result = Tasklist.Taskresult.success
-        while (None != task and not interrupt):
-            g_logger.log (self.node.hostname + " : Running task id " 
+str(task.id)+" '" + task.name + "'")
-            g_notifications.task_started (self.node.hostname, task, "")
-            if (task.__class__.__name__ == "Task"):
-                if (task.type == Tasklist.Operation.run):
-                    task_result = self.exec_run (task, transport)
-                    g_notifications.task_completed (self.node.hostname, task, 
task_result.result, task_result.message, task_result.output)
-                elif (task.type == Tasklist.Operation.put):
-                    task_result = self.exec_put (task, transport)
-                    g_notifications.task_completed (self.node.hostname, task, 
task_result, "")
-                elif (task.type == Tasklist.Operation.get):
-                    task_result = self.exec_get (task, transport)
-                    g_notifications.task_completed (self.node.hostname, task, 
task_result, "")
-                elif (task.type == Tasklist.Operation.run_per_host):
-                    task_result = self.exec_run_per_host (task, transport)
-                    g_notifications.task_completed (self.node.hostname, task, 
task_result, "")                    
-                else:
-                    print "UNSUPPORTED OPERATION!"                    
-            elif (task.__class__.__name__ == "Taskset"):
-                g_logger.log (self.node.hostname + " : Running task set")
-            if ((task.stop_on_fail == True) and (task_result.result != 
Tasklist.Taskresult.success)):
-                g_logger.log (self.node.hostname + " : Task failed and 
therefore execution is stopped")
-                g_notifications.task_completed (self.node.hostname, task, 
task_result.result, task_result.message, task_result.output)         
-                transport.close()
-                success = False
-                break
-            # If received user interrupt, close channel and break execution
-            elif (task_result == Tasklist.Taskresult.user_interrupt):
-                transport.close()
-                success = False
-                break
-            task = self.tasks.get()
-        
-        ssh.close()
-        g_notifications.node_disconnected (self.node.hostname, True, "")
-        g_notifications.tasklist_completed (self.node.hostname, self.tasks, 
success, "")
-        g_logger.log (self.node.hostname + " : All tasks done for " + 
self.node.hostname)
-        
-
 class AbstractWorker(threading.Thread):
     def __init__(self, threadID, node, tasks):
         threading.Thread.__init__(self)
         self.threadID = threadID
         self.node = node
         self.tasks = tasks    
-    def exec_run_per_host (self):
+    def connect (self):
+        raise NotImplementedError   
+    def disconnect (self):       
+        raise NotImplementedError 
+    def exec_run_per_host (self, task):
         raise NotImplementedError        
-    def exec_run (self):
+    def exec_run (self, task):
         raise NotImplementedError
     def exec_put (self):
         raise NotImplementedError
     def exec_get (self):
         raise NotImplementedError
     def run(self):    
-        print "run"
+        global interrupt
+        # Connecting
+        res = self.connect()
+        if (False == res):
+            g_notifications.node_connected (self.node, False, "Failed to 
connect")
+            return
+        else:
+            g_notifications.node_connected (self.node, True, "Connected 
successfully")
+        
+        # Starting tasklist 
+        g_logger.log (self.node.hostname + " : Starting tasklist " + 
self.tasks.name)
+        g_notifications.tasklist_started (self.node.hostname, self.tasks, "")  
          
+        task = self.tasks.get()
+        
+        #if (interrupt):
+        #    return Tasklist.Taskresult.user_interrupt
+               
+        # Executing Tasks 
+        while (None != task and not interrupt):            
+            g_logger.log (self.node.hostname + " : Running task id " 
+str(task.id)+" '" + task.name + "'")                        
+            g_notifications.task_started (self.node.hostname, task, "")
+            if (task.type == Tasks.Operation.run):
+                task_result = self.exec_run (task)
+                assert (None != task_result)
+                g_notifications.task_completed (self.node.hostname, task, 
task_result.result, task_result.message, task_result.output)
+            elif (task.type == Tasks.Operation.put):
+                task_result = self.exec_put (task)
+                assert (None != task_result)
+                g_notifications.task_completed (self.node.hostname, task, 
task_result, "")
+            elif (task.type == Tasks.Operation.get):
+                task_result = self.exec_get (task)
+                assert (None != task_result)                
+                g_notifications.task_completed (self.node.hostname, task, 
task_result, "")
+            elif (task.type == Tasks.Operation.run_per_host):
+                task_result = self.exec_run_per_host (task)
+                assert (None != task_result)
+                g_notifications.task_completed (self.node.hostname, task, 
task_result, "")                    
+            else:
+                print "UNSUPPORTED OPERATION!"      
+            task = self.tasks.get()
+        #disconnect
+        res = self.disconnect()
+        if (False == res):
+            g_notifications.node_disconnected (self.node, False, "Failed to 
disconnect")
+        else:
+            g_notifications.node_disconnected (self.node, True, "Disconnected 
successfully")
+             
+            
 
 class TestWorker (AbstractWorker):
-    def exec_run_per_host (self):
-        raise NotImplementedError        
-    def exec_run (self):
-        raise NotImplementedError
+    def connect (self):
+        return True   
+    def disconnect (self):       
+        return True        
+    def exec_run_per_host (self, task):
+        print "Executing per host " + task.name        
+        return TaskExecutionResult(Tasks.Taskresult.success, 
"exec_run_per_host successful", "")
+    def exec_run (self, task):
+        print "Executing " + task.name
+        return TaskExecutionResult(Tasks.Taskresult.success, "exec_run 
successful", "")        
     def exec_put (self):
-        raise NotImplementedError
+        print "Putting " + task.name
+        return TaskExecutionResult(Tasks.Taskresult.success, "exec_put 
successful", "")             
     def exec_get (self):
-        raise NotImplementedError    
+        print "Getting " + task.name
+        return TaskExecutionResult(Tasks.Taskresult.success, "exec_get 
successful", "")         
 
 class LocalWorker (AbstractWorker):
-    def exec_run_per_host (self):
+    def connect (self):
+        raise NotImplementedError   
+    def disconnect (self):       
+        raise NotImplementedError     
+    def exec_run_per_host (self, task):
         raise NotImplementedError        
-    def exec_run (self):
+    def exec_run (self, task):
         raise NotImplementedError
     def exec_put (self):
         raise NotImplementedError
@@ -433,9 +187,13 @@
         raise NotImplementedError        
 
 class RemoteSSHWorker (AbstractWorker):
-    def exec_run_per_host (self):
+    def connect (self):
+        raise NotImplementedError   
+    def disconnect (self):       
+        raise NotImplementedError     
+    def exec_run_per_host (self, task):
         raise NotImplementedError        
-    def exec_run (self):
+    def exec_run (self, task):
         raise NotImplementedError
     def exec_put (self):
         raise NotImplementedError
@@ -443,9 +201,13 @@
         raise NotImplementedError        
     
 class PlanetLabWorker (AbstractWorker):
-    def exec_run_per_host (self):
+    def connect (self):
+        raise NotImplementedError   
+    def disconnect (self):       
+        raise NotImplementedError     
+    def exec_run_per_host (self, task):
         raise NotImplementedError        
-    def exec_run (self):
+    def exec_run (self,task):
         raise NotImplementedError
     def exec_put (self):
         raise NotImplementedError
@@ -476,7 +238,6 @@
         return        
     def start (self):
         g_logger.log ("Starting execution for node " + self.node.hostname)
-        g_notifications.tasklist_started (self.node.hostname, self.tasks, "")
         self.thread.start()
     
 class Worker:




reply via email to

[Prev in Thread] Current Thread [Next in Thread]