gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28173 - gnunet-planetlab/gplmt/gplmt


From: gnunet
Subject: [GNUnet-SVN] r28173 - gnunet-planetlab/gplmt/gplmt
Date: Thu, 18 Jul 2013 17:38:15 +0200

Author: wachs
Date: 2013-07-18 17:38:15 +0200 (Thu, 18 Jul 2013)
New Revision: 28173

Modified:
   gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
changes to ssh worker


Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-18 14:48:06 UTC (rev 
28172)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py      2013-07-18 15:38:15 UTC (rev 
28173)
@@ -39,7 +39,7 @@
     from gplmt.SCP import SCPClient
     from gplmt.SCP import SCPException
 except ImportError as e: 
-    print "That's a bug! please check README: " + str(e)  
+    print "That's a bug! please check README: " + __file__ + " : " + str(e)    
     sys.exit(1)
     
 
@@ -229,21 +229,184 @@
     def exec_get (self):
         raise NotImplementedError (inspect.stack()[0][3]) 
 
+
 class RemoteSSHWorker (AbstractWorker):
     def connect (self):
-        raise NotImplementedError (inspect.stack()[0][3])    
+        try: 
+            ssh = paramiko.SSHClient()
+            if (g_configuration.ssh_use_known_hosts):
+                g_logger.log (self.node.hostname + " : Loading known hosts")
+                ssh.load_system_host_keys ()
+            # Automatically add new hostkeys
+            if (g_configuration.ssh_add_unkown_hostkeys == True):
+                ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+            # check for private key existance
+            keyfile = None
+            if (g_configuration.ssh_keyfile != None): 
+                if (os.path.exists (g_configuration.ssh_keyfile)):
+                    g_logger.log (self.node.hostname + " : Found " + 
g_configuration.ssh_keyfile)
+                    keyfile = g_configuration.ssh_keyfile
+                else:
+                    g_logger.log (self.node.hostname + " : Not found " + 
g_configuration.ssh_keyfile)
+            g_logger.log (self.node.hostname + " : Trying to connect to '" 
+Util.print_ssh_connection (self.node) + "'")
+            if self.node.username is not None: #credentials are supplied in 
node file
+                if (self.node.password is not None):                    
+                    ssh.connect (self.node.hostname,
+                                port=self.node.port or 22,
+                                username=self.node.username,
+                                password=self.node.password,
+                                timeout=10)
+                else:
+                    ssh.connect (self.node.hostname,
+                                 port=self.node.port or 22,
+                                 username=self.node.username,
+                                 timeout=10)                                   
     
+            elif ("" != g_configuration.ssh_username):
+                g_logger.log (self.node.hostname + " : Using private keyfile 
'" +str(keyfile)+ "'")
+                ssh.connect (self.node.hostname,
+                         port=self.node.port or 22,
+                         username=g_configuration.ssh_username, 
+                         password=g_configuration.ssh_password,
+                         timeout=10,
+                         key_filename=keyfile)
+            elif ("" != g_configuration.ssh_password):
+                g_logger.log (self.node.hostname + " : Trying to connect to " 
+ 
+                              self.node.hostname + 
+                              " using password '" + 
g_configuration.ssh_password+ 
+                              "' and private keyfile '" +str(keyfile)+ "'")
+                ssh.connect (self.node.hostname,
+                             port=self.node.port or 22,
+                             password=g_configuration.ssh_password,
+                             timeout=10,
+                             key_filename=keyfile)
+            else:
+                ssh.connect (self.node.hostname,
+                             port=self.node.port or 22,
+                             timeout=10,
+                             key_filename=keyfile)
+            self.transport = ssh.get_transport()                         
+        except (IOError,
+                paramiko.SSHException,
+                paramiko.BadHostKeyException, 
+                paramiko.AuthenticationException,
+                socket.error) as e:  
+            g_logger.log (self.node.hostname + " : Error while trying to 
connect: " + str(e))
+            return False
+        g_logger.log (self.node.hostname + " : Connected!")
+        return True
     def disconnect (self):       
         raise NotImplementedError (inspect.stack()[0][3])     
     def exec_run_per_host (self, task):
         raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_run (self, task):
-        raise NotImplementedError (inspect.stack()[0][3]) 
+        global interrupt
+        message = "undefined"
+        output = ""
+        if(interrupt):
+            message = "'"+ task.name +  "' interrupted by user"
+            g_logger.log (self.node.hostname + " : Task '"+ message)
+            return TaskExecutionResult(Tasks.Taskresult.user_interrupt, 
"interrupted by user", "")
+        if ((task.command == None) and (task.arguments == None)):
+            message = "'"+ task.name +  "' no command to execute"
+            g_logger.log (self.node.hostname + " : Task " + message)
+            return TaskExecutionResult(Tasks.Taskresult.fail, "no command to 
execute", "")
+        try:
+            channel = self.transport.open_session()
+            channel.settimeout(1.0)
+            channel.get_pty ()
+            #print self.node + " CMD: " + task.command + " " + task.arguments
+            channel.exec_command(task.command + " " + task.arguments)
+            
+        except SSHException as e:
+            g_logger.log (self.node.hostname + " : Error while trying to 
connect: " + str(e))
+        
+        if (task.timeout > 0):
+            timeout = task.timeout
+        else:
+            timeout = -1
+        result = Tasks.Taskresult.success
+        exit_status = -1
+        start_time = time.time ()
+        
+        stdout_data = ""
+        stderr_data = ""
+        
+        while 1:
+            if(interrupt):
+                result = Tasks.Taskresult.user_interrupt
+                break
+            if (timeout != -1):
+                delta = time.time() - start_time
+                if (delta > timeout):
+                    g_logger.log (self.node.hostname + " : Timeout after " 
+str(delta) +" seconds")                    
+                    result = Tasks.Taskresult.timeout
+                    break
+            (r, w, e) = select.select([channel], [], [], 1)
+            if r:
+                got_data = False
+                if channel.recv_ready():
+                    data = r[0].recv(4096)
+                    if data:
+                        got_data = True
+                        g_logger.log (self.node.hostname + " : " + data)
+                        output += data
+                        stdout_data += data
+                if channel.recv_stderr_ready():
+                    data = r[0].recv_stderr(4096)
+                    if data:
+                        got_data = True
+                        g_logger.log (self.node.hostname + " : " + data)
+                        output += data
+                        stderr_data += data
+                if not got_data:
+                    break
+        
+        if (result == Tasks.Taskresult.success):
+            exit_status = channel.recv_exit_status ()  
+        
+        if (result == Tasks.Taskresult.success):
+            if (task.expected_return_code != -1):                    
+                if (exit_status != task.expected_return_code):
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' completed after "+ str(time.time() - start_time) +" sec, but exit code " 
+str(exit_status)+ " was not as expected " + str(task.expected_return_code))
+                    g_logger.log (stdout_data)
+                    g_logger.log (stderr_data)
+                    result = Tasks.Taskresult.return_value_did_not_match
+                else:
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code " 
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
+            
+            if (task.expected_output != None):
+                output_contained = False
+                if (task.expected_output in stdout_data):
+                    output_contained = True
+                if (task.expected_output in stderr_data):
+                        output_contained = True                
+                if (output_contained == True):
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' expected output '"+task.expected_output+"' was found")
+                else:
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' expected output '"+task.expected_output+"' was not found")
+                    result = Tasks.Taskresult.output_did_not_match
+                    
+        if (result == Tasks.Taskresult.success):
+            message = "'"+ task.name +  "' successful"
+            g_logger.log (self.node.hostname + " : Task " + message)
+        elif (result == Tasks.Taskresult.timeout):
+            message = "'"+ task.name +  "' with timeout"
+            g_logger.log (self.node.hostname + " : Task "+ message)
+        elif (result == Tasks.Taskresult.user_interrupt):
+            message = "'"+ task.name +  "' interrupted by user"
+            g_logger.log (self.node.hostname + " : Task "+ message)
+        else: 
+            message = "'"+ task.name +  "' failed"
+            g_logger.log (self.node.hostname + " : Task "+ message)
+        return TaskExecutionResult(result, message, output)
+
+        return TaskExecutionResult(Tasks.Taskresult.success, "exec_run 
successful", "")   
     def exec_put (self):
         raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_get (self):
         raise NotImplementedError (inspect.stack()[0][3]) 
     
-class PlanetLabWorker (AbstractWorker):
+class PlanetLabWorker (RemoteSSHWorker):
     def connect (self):
         raise NotImplementedError (inspect.stack()[0][3]) 
     def disconnect (self):       
@@ -256,7 +419,6 @@
         raise NotImplementedError (inspect.stack()[0][3]) 
     def exec_get (self):
         raise NotImplementedError (inspect.stack()[0][3]) 
-    
 
 class NodeWorker ():
     def __init__(self, target, node, tasks):




reply via email to

[Prev in Thread] Current Thread [Next in Thread]