gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28184 - gnunet-planetlab/gplmt/gplmt


From: gnunet
Subject: [GNUnet-SVN] r28184 - gnunet-planetlab/gplmt/gplmt
Date: Fri, 19 Jul 2013 10:39:49 +0200

Author: wachs
Date: 2013-07-19 10:39:48 +0200 (Fri, 19 Jul 2013)
New Revision: 28184

Modified:
   gnunet-planetlab/gplmt/gplmt/Notifications.py
   gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
get and put operations for SSH worker


Modified: gnunet-planetlab/gplmt/gplmt/Notifications.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Notifications.py       2013-07-19 08:05:35 UTC 
(rev 28183)
+++ gnunet-planetlab/gplmt/gplmt/Notifications.py       2013-07-19 08:39:48 UTC 
(rev 28184)
@@ -262,6 +262,6 @@
         if (result == Tasklist.Taskresult.success):
             print node.hostname + " : Task '" +  task.name + "' completed 
successfully"
         elif (result == Tasklist.Taskresult.src_file_not_found):
-            print node.hostname + " : Task '" +  task.name + "' failed : 
source file not found"
+            print node.hostname + " : Task '" +  task.name + "' failed : 
source file not found: " + message
         else:
-            print node.hostname + " : Task '" +  task.name + "' completed with 
failure"             
+            print node.hostname + " : Task '" +  task.name + "' completed with 
failure" + message   

Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-19 08:05:35 UTC (rev 
28183)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-19 08:39:48 UTC (rev 
28184)
@@ -102,9 +102,9 @@
         raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_run (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
-    def exec_put (self):
+    def exec_put (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
-    def exec_get (self):
+    def exec_get (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
     def run(self):    
         global interrupt
@@ -133,6 +133,7 @@
         while (None != task and not interrupt):            
             g_logger.log (self.node.hostname + " : Running task id " 
+str(task.id)+" '" + task.name + "'")                        
             g_notifications.task_started (self.node, task, "")
+            task_result = None
             try:
                 if (task.type == Tasks.Operation.run):
                     task_result = self.exec_run (task)
@@ -155,14 +156,17 @@
             except NotImplementedError as e:
                 print "Not implemented" + str (e)
                 pass
-            except Exception as e:
-                print "Exception in Worker:" + str (e)
+            except Exception as e2:
+                print "Exception in Worker: " + str (e2)
                 pass
             if (interrupt):
                 break
+            if (None == task_result):
+                g_logger.log (self.node.hostname + " : Task '"+ task.name +"' 
failed to execute")
+                task_result = TaskExecutionResult(Tasks.Taskresult.fail, 
"failed to execute task: " + task.name, "")                   
+                pass                                
             if ((task_result.result != Tasks.Taskresult.success) and 
(task.stop_on_fail == True)):
-                g_logger.log (self.node.hostname + " : Task failed and 
therefore execution is stopped")
-                g_notifications.task_completed (self.node, task, 
task_result.result, task_result.message, task_result.output)         
+                g_logger.log (self.node.hostname + " : Task failed and 
therefore execution is stopped")     
                 self.disconnect()
                 tasklist_success = False
                 break                      
@@ -205,10 +209,10 @@
     def exec_run (self, task):
         print "TestWorker executes '" + task.name + "'"
         return TaskExecutionResult(Tasks.Taskresult.success, "exec_run 
successful", "")        
-    def exec_put (self):
+    def exec_put (self, task):
         print "TestWorker puts " + task.name + "'"
         return TaskExecutionResult(Tasks.Taskresult.success, "exec_put 
successful", "")             
-    def exec_get (self):
+    def exec_get (self, task):
         print "TestWorker puts '" + task.name + "'"
         return TaskExecutionResult(Tasks.Taskresult.success, "exec_get 
successful", "")         
 
@@ -221,9 +225,9 @@
         raise NotImplementedError (inspect.stack()[0][3])        
     def exec_run (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
-    def exec_put (self):
+    def exec_put (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
-    def exec_get (self):
+    def exec_get (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
 
 
@@ -252,20 +256,20 @@
             g_logger.log (self.node.hostname + " : Trying to connect to '" 
+Util.print_ssh_connection (self.node) + "'")
             if self.node.username is not None: #credentials are supplied in 
node file
                 if (self.node.password is not None):
-                    print "Using node information " + self.node.username + " " 
+self.node.password                      
+                    g_logger.log ( "Using node information " + 
self.node.username + " " +self.node.password)                      
                     self.ssh.connect (self.node.hostname,
                                 port=self.node.port or 22,
                                 username=self.node.username,
                                 password=self.node.password,
                                 timeout=10)
                 else:
-                    print "Using node information " + self.node.username 
+                    g_logger.log ( "Using node information " + 
self.node.username)
                     self.ssh.connect (self.node.hostname,
                                  port=self.node.port or 22,
                                  username=self.node.username,
                                  timeout=10)                                   
     
             elif ("" != g_configuration.ssh_username):
-                print "Using node information " + g_configuration.ssh_username 
 + " " + g_configuration.ssh_password 
+                g_logger.log ( "Using node information " + 
g_configuration.ssh_username  + " " + g_configuration.ssh_password)
                 self.ssh.connect (self.node.hostname,
                          port=self.node.port or 22,
                          username=g_configuration.ssh_username, 
@@ -273,14 +277,14 @@
                          timeout=10,
                          key_filename=keyfile)
             elif ("" != g_configuration.ssh_password):
-                print "Using node information " + g_configuration.ssh_password
+                g_logger.log ( "Using node information " + 
g_configuration.ssh_password)
                 self.ssh.connect (self.node.hostname,
                              port=self.node.port or 22,
                              password=g_configuration.self.ssh_password,
                              timeout=10,
                              key_filename=keyfile)
             else:
-                print "Using no information"
+                g_logger.log ("Using no information")
                 self.ssh.connect (self.node.hostname,
                              port=self.node.port or 22,
                              timeout=10,
@@ -302,7 +306,7 @@
         return TaskExecutionResult (Tasks.Taskresult.success, "", "")
     def exec_run_per_host (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
-    def exec_run (self, task):
+    def exec_run (self, task):        
         global interrupt
         message = "undefined"
         output = ""
@@ -361,11 +365,9 @@
                         output += data
                         stderr_data += data
                 if not got_data:
-                    break
-        
+                    break        
         if (result == Tasks.Taskresult.success):
-            exit_status = channel.recv_exit_status ()  
-        
+            exit_status = channel.recv_exit_status ()          
         if (result == Tasks.Taskresult.success):
             if (task.expected_return_code != -1):                    
                 if (exit_status != task.expected_return_code):
@@ -374,8 +376,7 @@
                     g_logger.log (stderr_data)
                     result = Tasks.Taskresult.return_value_did_not_match
                 else:
-                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code " 
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
-            
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code " 
+str(exit_status)+ " was as expected " + str(task.expected_return_code))        
    
             if (task.expected_output != None):
                 output_contained = False
                 if (task.expected_output in stdout_data):
@@ -401,10 +402,64 @@
             message = "'"+ task.name +  "' failed"
             g_logger.log (self.node.hostname + " : Task "+ message)
         return TaskExecutionResult(result, message, output)  
-    def exec_put (self):
-        raise NotImplementedError (inspect.stack()[0][3]) 
-    def exec_get (self):
-        raise NotImplementedError (inspect.stack()[0][3]) 
+    def exec_put (self, task):
+        if (False == os.path.exists (task.src)):
+            return TaskExecutionResult(Tasks.Taskresult.src_file_not_found, 
task.src, "")              
+        result = None
+        try:
+            if (g_configuration.ssh_transfer == 
Configuration.TransferMode.scp):
+                try:
+                    scp = SCPClient (self.transport)
+                    scp.put (task.src, task.dest)
+                except SCPException as e:
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' :" + str(e))
+                    result = TaskExecutionResult(Tasks.Taskresult.fail, 
str(e), "")
+                    pass
+            if (g_configuration.ssh_transfer == 
Configuration.TransferMode.sftp):                
+                sftp = paramiko.SFTPClient.from_transport (self.transport)
+                sftp.put(task.src, task.dest)
+                sftp.close()
+        except paramiko.SSHException as e:
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :" 
+ str(e))
+            result = TaskExecutionResult(Tasks.Taskresult.fail, str(e), "")
+            pass
+        except (OSError, IOError) as e:
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' : " 
+ str(e))
+            result = TaskExecutionResult(Tasks.Taskresult.src_file_not_found, 
str(e), "") 
+            pass 
+        if (None == result):          
+            result = TaskExecutionResult(Tasks.Taskresult.success, "", "")     
   
+        return result
+    def exec_get (self, task):
+        result = None
+        if(interrupt):
+            message = "'"+ task.name +  "' interrupted by user"
+            g_logger.log (self.node.hostname + " : Task '"+ message)
+            return TaskExecutionResult(Tasks.Taskresult.user_interrupt, 
"interrupted by user", "")
+        try:
+            if (g_configuration.ssh_transfer == 
Configuration.TransferMode.scp): 
+                try:
+                    scp = SCPClient (self.transport)
+                    scp.get (task.src, task.dest)
+                except SCPException as e:
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' :")
+                    result = TaskExecutionResult(Tasks.Taskresult.fail, 
str(e), "")  
+                    pass                
+            if (g_configuration.ssh_transfer == 
Configuration.TransferMode.sftp):
+                sftp = paramiko.SFTPClient.from_transport (self.transport)
+                sftp.get (task.src, task.dest)
+                sftp.close()
+        except paramiko.SSHException as e:
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :" 
+ str(e))
+            result = TaskExecutionResult(Tasks.Taskresult.fail, str(e), "")
+            pass
+        except (OSError, IOError) as e:
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' : " 
+ str(e))
+            result = TaskExecutionResult(Tasks.Taskresult.src_file_not_found, 
str(e), "")  
+            pass     
+        if (None == result):          
+            result = TaskExecutionResult(Tasks.Taskresult.success, "Store 
source '"+task.src+"' in '" +task.dest+"'", "")      
+        return result     
     
 class PlanetLabWorker (RemoteSSHWorker):
     def connect (self):
@@ -415,9 +470,9 @@
         raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_run (self,task):
         raise NotImplementedError (inspect.stack()[0][3]) 
-    def exec_put (self):
+    def exec_put (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
-    def exec_get (self):
+    def exec_get (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
 
 class NodeWorker ():




reply via email to

[Prev in Thread] Current Thread [Next in Thread]