gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r25082 - gnunet-planetlab/gplmt/gplmt


From: gnunet
Subject: [GNUnet-SVN] r25082 - gnunet-planetlab/gplmt/gplmt
Date: Wed, 21 Nov 2012 11:04:00 +0100

Author: otarabai
Date: 2012-11-21 11:04:00 +0100 (Wed, 21 Nov 2012)
New Revision: 25082

Modified:
   gnunet-planetlab/gplmt/gplmt/Tasks.py
   gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
Support for interrupt signal in threads


Modified: gnunet-planetlab/gplmt/gplmt/Tasks.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Tasks.py       2012-11-21 09:53:27 UTC (rev 
25081)
+++ gnunet-planetlab/gplmt/gplmt/Tasks.py       2012-11-21 10:04:00 UTC (rev 
25082)
@@ -47,6 +47,7 @@
     return_value_did_not_match = 3
     output_did_not_match = 4
     src_file_not_found = 5
+    user_interrupt = 6
     
 
 class Operation:

Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py      2012-11-21 09:53:27 UTC (rev 
25081)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py      2012-11-21 10:04:00 UTC (rev 
25082)
@@ -32,13 +32,38 @@
     import time
     import sys
     import select
+    import signal
     from gplmt.SCP import SCPClient
 
 except ImportError: 
     print "That's a bug! please check README" 
     sys.exit(1)
 
+interrupt = False
+def signal_handler(signal, frame):
+    global interrupt
+    interrupt = True
+    print "User interrupt received!"
+    time1 = time.time()
+    # for the next <5> secs check if all threads are finished
+    threads_done = False
+    timeout = False
+    while(not threads_done):
+        threads_done = True
+        for w in workersList:
+            if(w.thread.isAlive()):
+                if(timeout):
+                    g_logger.log ("Thread taking too long, killing it...")
+                    w.thread._Thread__stop()
+                else:
+                    threads_done = False
+                    break
+        time.sleep(0.1)
+        if(time.time() - time1 > 5):
+            timeout = True
+    sys.exit(0)
 
+signal.signal(signal.SIGINT, signal_handler)
 
 try:
     import paramiko
@@ -50,6 +75,7 @@
 g_logger = None
 g_notifications = None
 g_configuration = None
+workersList = list()
 
 class NodeWorkerThread (threading.Thread):
     def __init__(self, threadID, node, tasks):
@@ -93,6 +119,10 @@
         return self.exec_run(t, transport)
         
     def exec_run (self, task, transport):
+        global interrupt
+        if(interrupt):
+            g_logger.log (self.node + " : Task '"+ task.name + "' interrupted 
by user")
+            return Tasks.Taskresult.user_interrupt
         if ((task.command == None) and (task.arguments == None)):
             g_logger.log (self.node + " : Task '"+ task.name + "' failed: no 
command to execute")
             return Tasks.Taskresult.fail
@@ -118,6 +148,9 @@
         stdout_data = ""
         stderr_data = ""
         while 1:
+            if(interrupt):
+                result = Tasks.Taskresult.user_interrupt
+                break
             if (timeout != -1):
                 delta = time.time() - start_time
                 if (delta > timeout):
@@ -171,6 +204,8 @@
             g_logger.log (self.node + " : Task '"+ task.name + "' successful")
         elif (result == Tasks.Taskresult.timeout):
             g_logger.log (self.node + " : Task '"+ task.name + "' with 
timeout")
+        elif (result == Tasks.Taskresult.user_interrupt):
+            g_logger.log (self.node + " : Task '"+ task.name + "' interrupted 
by user")
         else: 
             g_logger.log (self.node + " : Task '"+ task.name + "' failed")
         return result
@@ -218,8 +253,11 @@
         return result    
           
     def run(self):
+        global interrupt
         g_logger.log (self.node + " : Starting tasklist " + self.tasks.name)
         task = self.tasks.get()
+        if(interrupt):
+            return Tasks.Taskresult.user_interrupt
         try: 
             ssh = paramiko.SSHClient()
             if (g_configuration.ssh_use_known_hosts):
@@ -262,7 +300,7 @@
         transport = ssh.get_transport()        
         success = True
         result = Tasks.Taskresult.success
-        while (None != task):
+        while (None != task and not interrupt):
             g_logger.log (self.node + " : Running task id " +str(task.id)+" '" 
+ task.name + "'")
             g_notifications.task_started (self.node, task)
             if (task.__class__.__name__ == "Task"):
@@ -287,6 +325,11 @@
                 transport.close()
                 success = False
                 break
+            # If received user interrupt, close channel and break execution
+            elif (result == Tasks.Taskresult.user_interrupt):
+                transport.close()
+                success = False
+                break
             task = self.tasks.get()
         
         ssh.close()
@@ -331,13 +374,18 @@
         g_logger = logger;
     def start (self):
         g_logger.log ("Starting execution")
-        workersList = list()
         for n in self.nodes.nodes:
             nw = NodeWorker (n, self.tasks.copy())
             workersList.append(nw)
             nw.start()
         # block main thread until all worker threads are finished to print 
summary
-        for w in workersList:
-            w.thread.join()
+        threads_done = False
+        while(not threads_done):
+            threads_done = True
+            for w in workersList:
+                if(w.thread.isAlive()):
+                    threads_done = False
+                    break
+            time.sleep(0.5)
         g_notifications.summarize()
         




reply via email to

[Prev in Thread] Current Thread [Next in Thread]