[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28161 - gnunet-planetlab/gplmt/gplmt
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28161 - gnunet-planetlab/gplmt/gplmt |
Date: |
Thu, 18 Jul 2013 15:05:17 +0200 |
Author: wachs
Date: 2013-07-18 15:05:16 +0200 (Thu, 18 Jul 2013)
New Revision: 28161
Modified:
gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
worker loop
Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py 2013-07-18 12:56:27 UTC (rev
28160)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py 2013-07-18 13:05:16 UTC (rev
28161)
@@ -30,13 +30,10 @@
import select
import signal
-
-
-
try:
import gplmt.Configuration as Configuration
import gplmt.Util as Util
- import gplmt.Tasks as Tasklist
+ import gplmt.Tasks as Tasks
import gplmt.Targets as Targets
from gplmt.SCP import SCPClient
from gplmt.SCP import SCPException
@@ -89,343 +86,100 @@
g_configuration = None
workersList = list()
-class NodeWorkerThread (threading.Thread):
- def __init__(self, threadID, node, tasks):
- threading.Thread.__init__(self)
- self.threadID = threadID
- self.node = node
- self.tasks = tasks
-
- def exec_run_per_host (self, task, transport):
- found = False
- default = None
- cmd = None
- try:
- f = open(task.command_file, 'r')
- for line in f:
- if (line[0] == '#'):
- continue;
- sline = line.split (';',2)
- if (sline[0] == self.node.hostname):
- cmd = sline[1].strip()
- found = True
- if (sline[0] == ''):
- default = sline[1].strip()
- f.close()
- except IOError as e:
- print str(e)
-
- t = task.copy()
- if (found == True):
- g_logger.log (self.node.hostname + " : Found specific command '"+
cmd + "'")
- t.command = cmd
- t.arguments = ""
- elif ((found == False) and (default != None)):
- g_logger.log (self.node.hostname + " : Using default command '"+
default + "'")
- t.command = default
- t.arguments = ""
- else:
- g_logger.log (self.node.hostname + " : Task '"+ task.name + "'
failed: no command to execute")
- return Tasklist.Taskresult.fail
-
- return self.exec_run(t, transport)
-
- def exec_run (self, task, transport):
- global interrupt
- message = "undefined"
- output = ""
- if(interrupt):
- message = "'"+ task.name + "' interrupted by user"
- g_logger.log (self.node.hostname + " : Task '"+ message)
- return TaskExecutionResult(Tasklist.Taskresult.user_interrupt,
"interrupted by user", "")
- if ((task.command == None) and (task.arguments == None)):
- message = "'"+ task.name + "' no command to execute"
- g_logger.log (self.node.hostname + " : Task " + message)
- return TaskExecutionResult(Tasklist.Taskresult.fail, "no command
to execute", "")
- try:
- channel = transport.open_session()
- channel.settimeout(1.0)
- channel.get_pty ()
- #print self.node + " CMD: " + task.command + " " + task.arguments
- channel.exec_command(task.command + " " + task.arguments)
-
- except SSHException as e:
- g_logger.log (self.node.hostname + " : Error while trying to
connect: " + str(e))
-
- if (task.timeout > 0):
- timeout = task.timeout
- else:
- timeout = -1
- result = Tasklist.Taskresult.success
- exit_status = -1
- start_time = time.time ()
-
- stdout_data = ""
- stderr_data = ""
-
- while 1:
- if(interrupt):
- result = Tasklist.Taskresult.user_interrupt
- break
- if (timeout != -1):
- delta = time.time() - start_time
- if (delta > timeout):
- g_logger.log (self.node.hostname + " : Timeout after "
+str(delta) +" seconds")
- result = Tasklist.Taskresult.timeout
- break
- (r, w, e) = select.select([channel], [], [], 1)
- if r:
- got_data = False
- if channel.recv_ready():
- data = r[0].recv(4096)
- if data:
- got_data = True
- g_logger.log (self.node.hostname + " : " + data)
- output += data
- stdout_data += data
- if channel.recv_stderr_ready():
- data = r[0].recv_stderr(4096)
- if data:
- got_data = True
- g_logger.log (self.node.hostname + " : " + data)
- output += data
- stderr_data += data
- if not got_data:
- break
-
- if (result == Tasklist.Taskresult.success):
- exit_status = channel.recv_exit_status ()
-
- if (result == Tasklist.Taskresult.success):
- if (task.expected_return_code != -1):
- if (exit_status != task.expected_return_code):
- g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' completed after "+ str(time.time() - start_time) +" sec, but exit code "
+str(exit_status)+ " was not as expected " + str(task.expected_return_code))
- g_logger.log (stdout_data)
- g_logger.log (stderr_data)
- result = Tasklist.Taskresult.return_value_did_not_match
- else:
- g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code "
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
-
- if (task.expected_output != None):
- output_contained = False
- if (task.expected_output in stdout_data):
- output_contained = True
- if (task.expected_output in stderr_data):
- output_contained = True
- if (output_contained == True):
- g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' expected output '"+task.expected_output+"' was found")
- else:
- g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' expected output '"+task.expected_output+"' was not found")
- result = Tasklist.Taskresult.output_did_not_match
-
- if (result == Tasklist.Taskresult.success):
- message = "'"+ task.name + "' successful"
- g_logger.log (self.node.hostname + " : Task " + message)
- elif (result == Tasklist.Taskresult.timeout):
- message = "'"+ task.name + "' with timeout"
- g_logger.log (self.node.hostname + " : Task "+ message)
- elif (result == Tasklist.Taskresult.user_interrupt):
- message = "'"+ task.name + "' interrupted by user"
- g_logger.log (self.node.hostname + " : Task "+ message)
- else:
- message = "'"+ task.name + "' failed"
- g_logger.log (self.node.hostname + " : Task "+ message)
- return TaskExecutionResult(result, message, output)
- def exec_put (self, task, transport):
- if (False == os.path.exists (task.src)):
- return Tasklist.Taskresult.src_file_not_found
-
- result = Tasklist.Taskresult.success
- try:
- if (g_configuration.ssh_transfer ==
Configuration.TransferMode.scp):
- try:
- scp = SCPClient (transport)
- scp.put (task.src, task.dest)
- except SCPException as e:
- g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' :" + str(e))
- result = Tasklist.Taskresult.fail
- pass
- if (g_configuration.ssh_transfer ==
Configuration.TransferMode.sftp):
- sftp = paramiko.SFTPClient.from_transport (transport)
- sftp.put(task.src, task.dest)
- sftp.close()
- except paramiko.SSHException as e:
- g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :"
+ str(e))
- result = Tasklist.Taskresult.fail
- pass
- except (OSError, IOError) as e:
- g_logger.log (self.node + " : Task '"+ task.name + "' : " + str(e))
- result = Tasklist.Taskresult.src_file_not_found
- pass
- return result
-
- def exec_get (self, task, transport):
- result = Tasklist.Taskresult.success
- try:
- if (g_configuration.ssh_transfer ==
Configuration.TransferMode.scp):
- try:
- scp = SCPClient (transport)
- scp.get (task.src, task.dest)
- except SCPException as e:
- g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' :" + str(e))
- result = Tasklist.Taskresult.fail
- pass
- if (g_configuration.ssh_transfer ==
Configuration.TransferMode.sftp):
- sftp = paramiko.SFTPClient.from_transport (transport)
- sftp.get (task.src, task.dest)
- sftp.close()
- except paramiko.SSHException as e:
- g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :"
+ str(e))
- result = Tasklist.Taskresult.fail
- pass
- except (OSError, IOError) as e:
- g_logger.log (self.node.hostname + " : Task '"+ task.name + "' : "
+ str(e))
- result = Tasklist.Taskresult.src_file_not_found
- pass
- return result
-
- def run(self):
- global interrupt
- g_logger.log (self.node.hostname + " : Starting tasklist " +
self.tasks.name)
- task = self.tasks.get()
- if(interrupt):
- return Tasklist.Taskresult.user_interrupt
- try:
- ssh = paramiko.SSHClient()
- if (g_configuration.ssh_use_known_hosts):
- g_logger.log (self.node.hostname + " : Loading known hosts")
- ssh.load_system_host_keys ()
-
- # Automatically add new hostkeys
- if (g_configuration.ssh_add_unkown_hostkeys == True):
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- # check for private key existance
- keyfile = None
- if (g_configuration.ssh_keyfile != None):
- if (os.path.exists (g_configuration.ssh_keyfile)):
- g_logger.log (self.node.hostname + " : Found " +
g_configuration.ssh_keyfile)
- keyfile = g_configuration.ssh_keyfile
- else:
- g_logger.log (self.node.hostname + " : Not found " +
g_configuration.ssh_keyfile)
-
-
- g_logger.log (self.node.hostname + " : Trying to connect to '"
+Util.print_ssh_connection (self.node) + "'")
- if self.node.username is not None: #credentials are supplied in
node file
- if (self.node.password is not None):
- #print 'HERE 1!' + self.node.hostname +
str(self.node.port or 22) + self.node.username
- ssh.connect (self.node.hostname,
- port=self.node.port or 22,
- username=self.node.username,
- password=self.node.password,
- timeout=10)
- else:
- #print 'HERE 2!' + self.node.hostname +
str(self.node.port or 22) + self.node.username
- ssh.connect (self.node.hostname,
- port=self.node.port or 22,
- username=self.node.username,
- timeout=10)
- elif ("" != g_configuration.ssh_username):
- g_logger.log (self.node.hostname + " : Using private keyfile
'" +str(keyfile)+ "'")
- ssh.connect (self.node.hostname,
- port=self.node.port or 22,
- username=g_configuration.ssh_username,
- password=g_configuration.ssh_password,
- timeout=10,
- key_filename=keyfile)
- else:
- g_logger.log (self.node.hostname + " : Trying to connect to "
+
- self.node.hostname +
- " using password '" + g_configuration.ssh_password+
- "' and private keyfile '" +str(keyfile)+ "'")
- ssh.connect (self.node.hostname,
- port=self.node.port or 22,
- password=g_configuration.ssh_password,
- timeout=10,
- key_filename=keyfile)
- except (IOError,
- paramiko.SSHException,
- paramiko.BadHostKeyException,
- paramiko.AuthenticationException,
- socket.error) as e:
- g_logger.log (self.node.hostname + " : Error while trying to
connect: " + str(e))
- g_notifications.node_connected (self.node.hostname, False, str(e))
- g_notifications.tasklist_completed (self.node.hostname,
self.tasks, False, "")
- return
-
- g_notifications.node_connected (self.node.hostname, True, "")
- transport = ssh.get_transport()
- success = True
- task_result = Tasklist.Taskresult.success
- while (None != task and not interrupt):
- g_logger.log (self.node.hostname + " : Running task id "
+str(task.id)+" '" + task.name + "'")
- g_notifications.task_started (self.node.hostname, task, "")
- if (task.__class__.__name__ == "Task"):
- if (task.type == Tasklist.Operation.run):
- task_result = self.exec_run (task, transport)
- g_notifications.task_completed (self.node.hostname, task,
task_result.result, task_result.message, task_result.output)
- elif (task.type == Tasklist.Operation.put):
- task_result = self.exec_put (task, transport)
- g_notifications.task_completed (self.node.hostname, task,
task_result, "")
- elif (task.type == Tasklist.Operation.get):
- task_result = self.exec_get (task, transport)
- g_notifications.task_completed (self.node.hostname, task,
task_result, "")
- elif (task.type == Tasklist.Operation.run_per_host):
- task_result = self.exec_run_per_host (task, transport)
- g_notifications.task_completed (self.node.hostname, task,
task_result, "")
- else:
- print "UNSUPPORTED OPERATION!"
- elif (task.__class__.__name__ == "Taskset"):
- g_logger.log (self.node.hostname + " : Running task set")
- if ((task.stop_on_fail == True) and (task_result.result !=
Tasklist.Taskresult.success)):
- g_logger.log (self.node.hostname + " : Task failed and
therefore execution is stopped")
- g_notifications.task_completed (self.node.hostname, task,
task_result.result, task_result.message, task_result.output)
- transport.close()
- success = False
- break
- # If received user interrupt, close channel and break execution
- elif (task_result == Tasklist.Taskresult.user_interrupt):
- transport.close()
- success = False
- break
- task = self.tasks.get()
-
- ssh.close()
- g_notifications.node_disconnected (self.node.hostname, True, "")
- g_notifications.tasklist_completed (self.node.hostname, self.tasks,
success, "")
- g_logger.log (self.node.hostname + " : All tasks done for " +
self.node.hostname)
-
-
class AbstractWorker(threading.Thread):
def __init__(self, threadID, node, tasks):
threading.Thread.__init__(self)
self.threadID = threadID
self.node = node
self.tasks = tasks
- def exec_run_per_host (self):
+ def connect (self):
+ raise NotImplementedError
+ def disconnect (self):
+ raise NotImplementedError
+ def exec_run_per_host (self, task):
raise NotImplementedError
- def exec_run (self):
+ def exec_run (self, task):
raise NotImplementedError
def exec_put (self):
raise NotImplementedError
def exec_get (self):
raise NotImplementedError
def run(self):
- print "run"
+ global interrupt
+ # Connecting
+ res = self.connect()
+ if (False == res):
+ g_notifications.node_connected (self.node, False, "Failed to
connect")
+ return
+ else:
+ g_notifications.node_connected (self.node, True, "Connected
successfully")
+
+ # Starting tasklist
+ g_logger.log (self.node.hostname + " : Starting tasklist " +
self.tasks.name)
+ g_notifications.tasklist_started (self.node.hostname, self.tasks, "")
+ task = self.tasks.get()
+
+ #if (interrupt):
+ # return Tasklist.Taskresult.user_interrupt
+
+ # Executing Tasks
+ while (None != task and not interrupt):
+ g_logger.log (self.node.hostname + " : Running task id "
+str(task.id)+" '" + task.name + "'")
+ g_notifications.task_started (self.node.hostname, task, "")
+ if (task.type == Tasks.Operation.run):
+ task_result = self.exec_run (task)
+ assert (None != task_result)
+ g_notifications.task_completed (self.node.hostname, task,
task_result.result, task_result.message, task_result.output)
+ elif (task.type == Tasks.Operation.put):
+ task_result = self.exec_put (task)
+ assert (None != task_result)
+ g_notifications.task_completed (self.node.hostname, task,
task_result, "")
+ elif (task.type == Tasks.Operation.get):
+ task_result = self.exec_get (task)
+ assert (None != task_result)
+ g_notifications.task_completed (self.node.hostname, task,
task_result, "")
+ elif (task.type == Tasks.Operation.run_per_host):
+ task_result = self.exec_run_per_host (task)
+ assert (None != task_result)
+ g_notifications.task_completed (self.node.hostname, task,
task_result, "")
+ else:
+ print "UNSUPPORTED OPERATION!"
+ task = self.tasks.get()
+ #disconnect
+ res = self.disconnect()
+ if (False == res):
+ g_notifications.node_disconnected (self.node, False, "Failed to
disconnect")
+ else:
+ g_notifications.node_disconnected (self.node, True, "Disconnected
successfully")
+
+
class TestWorker (AbstractWorker):
- def exec_run_per_host (self):
- raise NotImplementedError
- def exec_run (self):
- raise NotImplementedError
+ def connect (self):
+ return True
+ def disconnect (self):
+ return True
+ def exec_run_per_host (self, task):
+ print "Executing per host " + task.name
+ return TaskExecutionResult(Tasks.Taskresult.success,
"exec_run_per_host successful", "")
+ def exec_run (self, task):
+ print "Executing " + task.name
+ return TaskExecutionResult(Tasks.Taskresult.success, "exec_run
successful", "")
def exec_put (self):
- raise NotImplementedError
+ print "Putting " + task.name
+ return TaskExecutionResult(Tasks.Taskresult.success, "exec_put
successful", "")
def exec_get (self):
- raise NotImplementedError
+ print "Getting " + task.name
+ return TaskExecutionResult(Tasks.Taskresult.success, "exec_get
successful", "")
class LocalWorker (AbstractWorker):
- def exec_run_per_host (self):
+ def connect (self):
+ raise NotImplementedError
+ def disconnect (self):
+ raise NotImplementedError
+ def exec_run_per_host (self, task):
raise NotImplementedError
- def exec_run (self):
+ def exec_run (self, task):
raise NotImplementedError
def exec_put (self):
raise NotImplementedError
@@ -433,9 +187,13 @@
raise NotImplementedError
class RemoteSSHWorker (AbstractWorker):
- def exec_run_per_host (self):
+ def connect (self):
+ raise NotImplementedError
+ def disconnect (self):
+ raise NotImplementedError
+ def exec_run_per_host (self, task):
raise NotImplementedError
- def exec_run (self):
+ def exec_run (self, task):
raise NotImplementedError
def exec_put (self):
raise NotImplementedError
@@ -443,9 +201,13 @@
raise NotImplementedError
class PlanetLabWorker (AbstractWorker):
- def exec_run_per_host (self):
+ def connect (self):
+ raise NotImplementedError
+ def disconnect (self):
+ raise NotImplementedError
+ def exec_run_per_host (self, task):
raise NotImplementedError
- def exec_run (self):
+ def exec_run (self,task):
raise NotImplementedError
def exec_put (self):
raise NotImplementedError
@@ -476,7 +238,6 @@
return
def start (self):
g_logger.log ("Starting execution for node " + self.node.hostname)
- g_notifications.tasklist_started (self.node.hostname, self.tasks, "")
self.thread.start()
class Worker:
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28161 - gnunet-planetlab/gplmt/gplmt,
gnunet <=