[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r25082 - gnunet-planetlab/gplmt/gplmt
From: |
gnunet |
Subject: |
[GNUnet-SVN] r25082 - gnunet-planetlab/gplmt/gplmt |
Date: |
Wed, 21 Nov 2012 11:04:00 +0100 |
Author: otarabai
Date: 2012-11-21 11:04:00 +0100 (Wed, 21 Nov 2012)
New Revision: 25082
Modified:
gnunet-planetlab/gplmt/gplmt/Tasks.py
gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
Support for interrupt signal in threads
Modified: gnunet-planetlab/gplmt/gplmt/Tasks.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Tasks.py 2012-11-21 09:53:27 UTC (rev
25081)
+++ gnunet-planetlab/gplmt/gplmt/Tasks.py 2012-11-21 10:04:00 UTC (rev
25082)
@@ -47,6 +47,7 @@
return_value_did_not_match = 3
output_did_not_match = 4
src_file_not_found = 5
+ user_interrupt = 6
class Operation:
Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py 2012-11-21 09:53:27 UTC (rev
25081)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py 2012-11-21 10:04:00 UTC (rev
25082)
@@ -32,13 +32,38 @@
import time
import sys
import select
+ import signal
from gplmt.SCP import SCPClient
except ImportError:
print "That's a bug! please check README"
sys.exit(1)
+interrupt = False
+def signal_handler(signal, frame):
+ global interrupt
+ interrupt = True
+ print "User interrupt received!"
+ time1 = time.time()
+ # for the next <5> secs check if all threads are finished
+ threads_done = False
+ timeout = False
+ while(not threads_done):
+ threads_done = True
+ for w in workersList:
+ if(w.thread.isAlive()):
+ if(timeout):
+ g_logger.log ("Thread taking too long, killing it...")
+ w.thread._Thread__stop()
+ else:
+ threads_done = False
+ break
+ time.sleep(0.1)
+ if(time.time() - time1 > 5):
+ timeout = True
+ sys.exit(0)
+signal.signal(signal.SIGINT, signal_handler)
try:
import paramiko
@@ -50,6 +75,7 @@
g_logger = None
g_notifications = None
g_configuration = None
+workersList = list()
class NodeWorkerThread (threading.Thread):
def __init__(self, threadID, node, tasks):
@@ -93,6 +119,10 @@
return self.exec_run(t, transport)
def exec_run (self, task, transport):
+ global interrupt
+ if(interrupt):
+ g_logger.log (self.node + " : Task '"+ task.name + "' interrupted
by user")
+ return Tasks.Taskresult.user_interrupt
if ((task.command == None) and (task.arguments == None)):
g_logger.log (self.node + " : Task '"+ task.name + "' failed: no
command to execute")
return Tasks.Taskresult.fail
@@ -118,6 +148,9 @@
stdout_data = ""
stderr_data = ""
while 1:
+ if(interrupt):
+ result = Tasks.Taskresult.user_interrupt
+ break
if (timeout != -1):
delta = time.time() - start_time
if (delta > timeout):
@@ -171,6 +204,8 @@
g_logger.log (self.node + " : Task '"+ task.name + "' successful")
elif (result == Tasks.Taskresult.timeout):
g_logger.log (self.node + " : Task '"+ task.name + "' with
timeout")
+ elif (result == Tasks.Taskresult.user_interrupt):
+ g_logger.log (self.node + " : Task '"+ task.name + "' interrupted
by user")
else:
g_logger.log (self.node + " : Task '"+ task.name + "' failed")
return result
@@ -218,8 +253,11 @@
return result
def run(self):
+ global interrupt
g_logger.log (self.node + " : Starting tasklist " + self.tasks.name)
task = self.tasks.get()
+ if(interrupt):
+ return Tasks.Taskresult.user_interrupt
try:
ssh = paramiko.SSHClient()
if (g_configuration.ssh_use_known_hosts):
@@ -262,7 +300,7 @@
transport = ssh.get_transport()
success = True
result = Tasks.Taskresult.success
- while (None != task):
+ while (None != task and not interrupt):
g_logger.log (self.node + " : Running task id " +str(task.id)+" '"
+ task.name + "'")
g_notifications.task_started (self.node, task)
if (task.__class__.__name__ == "Task"):
@@ -287,6 +325,11 @@
transport.close()
success = False
break
+ # If received user interrupt, close channel and break execution
+ elif (result == Tasks.Taskresult.user_interrupt):
+ transport.close()
+ success = False
+ break
task = self.tasks.get()
ssh.close()
@@ -331,13 +374,18 @@
g_logger = logger;
def start (self):
g_logger.log ("Starting execution")
- workersList = list()
for n in self.nodes.nodes:
nw = NodeWorker (n, self.tasks.copy())
workersList.append(nw)
nw.start()
# block main thread until all worker threads are finished to print
summary
- for w in workersList:
- w.thread.join()
+ threads_done = False
+ while(not threads_done):
+ threads_done = True
+ for w in workersList:
+ if(w.thread.isAlive()):
+ threads_done = False
+ break
+ time.sleep(0.5)
g_notifications.summarize()
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r25082 - gnunet-planetlab/gplmt/gplmt,
gnunet <=