[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r30574 - eclectic/gplmt/gplmt
From: |
gnunet |
Subject: |
[GNUnet-SVN] r30574 - eclectic/gplmt/gplmt |
Date: |
Wed, 6 Nov 2013 18:46:37 +0100 |
Author: peterb
Date: 2013-11-06 18:46:36 +0100 (Wed, 06 Nov 2013)
New Revision: 30574
Modified:
eclectic/gplmt/gplmt/Configuration.py
eclectic/gplmt/gplmt/Tasks.py
eclectic/gplmt/gplmt/Worker.py
Log:
Added support for "node", "start_absolute", "stop_absolute", "start_relative"
and "stop_relative" tags in task lists
Modified: eclectic/gplmt/gplmt/Configuration.py
===================================================================
--- eclectic/gplmt/gplmt/Configuration.py 2013-11-06 17:32:36 UTC (rev
30573)
+++ eclectic/gplmt/gplmt/Configuration.py 2013-11-06 17:46:36 UTC (rev
30574)
@@ -41,7 +41,7 @@
assert (None != logger)
self.gplmt_filename = filename
self.gplmt_logger = logger
- self.gplmt_parallelism = 0
+ self.gplmt_parallelism = sys.maxint
self.gplmt_notifications = ""
self.gplmt_taskfile = None
self.gplmt_nodesfile = None
Modified: eclectic/gplmt/gplmt/Tasks.py
===================================================================
--- eclectic/gplmt/gplmt/Tasks.py 2013-11-06 17:32:36 UTC (rev 30573)
+++ eclectic/gplmt/gplmt/Tasks.py 2013-11-06 17:46:36 UTC (rev 30574)
@@ -25,7 +25,9 @@
import Util
import sys
import os
+ import re
import gplmt
+ from datetime import datetime
from xml.parsers import expat
from minixsv import pyxsval as xsv
from elementtree import ElementTree
@@ -76,6 +78,11 @@
self.dest = None
self.command_file = None
self.output_prefix = None
+ self.node = ""
+ self.start_absolute = datetime.min
+ self.stop_absolute = datetime.max
+ self.start_relative = 0
+ self.stop_relative = sys.maxint
def copy (self):
t = Task ()
@@ -93,6 +100,11 @@
t.dest = self.dest
t.command_file = self.command_file
t.output_prefix = self.output_prefix
+ t.node = self.node
+ t.start_absolute = self.start_absolute
+ t.stop_absolute = self.stop_absolute
+ t.start_relative = self.start_relative
+ t.stop_relative = self.stop_relative
return t
def log (self):
glogger.log ("Task " + str(self.id) + ": " + self.name)
@@ -119,6 +131,12 @@
def __init__(self):
self.set = list()
+def parse_relative (text):
+ regex =
re.compile('(?P<sign>-?)P(?:(?P<years>\d+)Y)?(?:(?P<months>\d+)M)?(?:(?P<days>\d+)D)?(?:T(?:(?P<hours>\d+)H)?(?:(?P<minutes>\d+)M)?(?:(?P<seconds>\d+)S)?)?')
+ duration = regex.match(text).groupdict(0)
+
+ return int(duration['seconds']) + 60 * (int(duration['minutes']) + 60 *
(int(duration['hours']) + 24 * \
+ (int(duration['days']) + 31 * int(duration['months']) + 365 * int(duration
['years']))))
def handle_task (elem, tasks):
t = Task ()
@@ -194,6 +212,38 @@
t.dest = child.text
if ('' != g_configuration.gplmt_userdir and Operation.get ==
t.type):
t.dest = os.path.join(g_configuration.gplmt_userdir,
os.path.basename(t.dest))
+
+ if ((child.tag == "node") and (child.text != None)):
+ t.node = child.text
+ print "Node: " + t.node
+
+ if ((child.tag == "start_absolute") and (child.text != None)):
+ try:
+ t.start_absolute = datetime.strptime(child.text,
"%Y-%m-%dT%H:%M:%S")
+ except ValueError:
+ print "Invalid absolute start time '" +child.text+ "' for task
id " + str (t.id) + " name " + t.name
+ print "start_absolute: " + t.start_absolute.strftime("%A, %d. %B
%Y %I:%M%p")
+
+ if ((child.tag == "stop_absolute") and (child.text != None)):
+ try:
+ t.stop_absolute = datetime.strptime(child.text,
"%Y-%m-%dT%H:%M:%S")
+ except ValueError:
+ print "Invalid absolute stop time '" +child.text+ "' for task
id " + str (t.id) + " name " + t.name
+ print "stop_absolute: " + t.stop_absolute.strftime("%A, %d. %B %Y
%I:%M%p")
+
+ if ((child.tag == "start_relative") and (child.text != None)):
+ try:
+ t.start_relative = parse_relative(child.text)
+ except ValueError:
+ print "Invalid relative start time '" +child.text+ "' for task
id " + str (t.id) + " name " + t.name
+ print "start_relative: " + str(t.start_relative)
+
+ if ((child.tag == "stop_relative") and (child.text != None)):
+ try:
+ t.stop_relative = parse_relative(child.text)
+ except ValueError:
+ print "Invalid relative stop time '" +child.text+ "' for task
id " + str (t.id) + " name " + t.name
+ print "stop_relative: " + str(t.stop_relative)
if (False == t.check()):
print "Parsed invalid task with id " + str (t.id) + " name '" + t.name
+ "'"
Modified: eclectic/gplmt/gplmt/Worker.py
===================================================================
--- eclectic/gplmt/gplmt/Worker.py 2013-11-06 17:32:36 UTC (rev 30573)
+++ eclectic/gplmt/gplmt/Worker.py 2013-11-06 17:46:36 UTC (rev 30574)
@@ -31,6 +31,7 @@
import signal
import inspect
import subprocess
+import datetime
try:
import gplmt.Configuration as Configuration
@@ -93,7 +94,8 @@
threading.Thread.__init__(self)
self.threadID = threadID
self.node = node
- self.tasks = tasks
+ self.tasks = tasks
+ self.timer = None
def connect (self):
raise NotImplementedError (inspect.stack()[0][3])
def disconnect (self):
@@ -105,8 +107,10 @@
def exec_put (self, task):
raise NotImplementedError (inspect.stack()[0][3])
def exec_get (self, task):
- raise NotImplementedError (inspect.stack()[0][3])
- def run(self):
+ raise NotImplementedError (inspect.stack()[0][3])
+ def interrupt_task (self):
+ raise NotImplementedError (inspect.stack()[0][3])
+ def run(self):
global interrupt
tasklist_success = True
# Connecting
@@ -130,8 +134,41 @@
if (interrupt):
g_notifications.tasklist_completed (self.node, self.tasks,
Tasks.Taskresult.user_interrupt, "")
# Executing Tasks
- while (None != task and not interrupt):
- g_logger.log (self.node.hostname + " : Running task id "
+str(task.id)+" '" + task.name + "'")
+ while (None != task and not interrupt):
+
+ if (None != self.timer):
+ self.timer.cancel()
+ self.timer = None
+
+ if (task.node and task.node != self.node.hostname):
+ g_logger.log (self.node.hostname + " : Ignoring task due to
set node attribute");
+ task = self.tasks.get()
+ continue
+
+ delta = int(max((task.start_absolute -
datetime.datetime.now()).total_seconds(), task.start_relative))
+
+ if (delta > 0):
+ g_logger.log (self.node.hostname + " : Continuing execution in
" + str(delta) + " seconds")
+ for x in range(0, delta):
+ time.sleep(1)
+ if (interrupt):
+ g_notifications.tasklist_completed (self.node,
self.tasks, Tasks.Taskresult.user_interrupt, "")
+ g_notifications.task_completed (self.node, task,
Tasks.Taskresult.user_interrupt, "task was interrupted", "")
+ return
+
+
+
+ g_logger.log (self.node.hostname + " : Running task id "
+str(task.id)+" '" + task.name + "'")
+
+
+ delta = int(min((task.stop_absolute -
datetime.datetime.now()).total_seconds(), task.stop_relative))
+
+ if (delta > 0):
+ g_logger.log (self.node.hostname + " : Task will be
interrupted in " + str(delta) + " seconds")
+ self.timer = threading.Timer(delta, self.interrupt_task)
+ self.timer.start()
+
+
g_notifications.task_started (self.node, task, "")
task_result = None
try:
@@ -217,9 +254,14 @@
return TaskExecutionResult(Tasks.Taskresult.success, "exec_put
successful", "")
def exec_get (self, task):
print "TestWorker gets '" + task.name + "' " + task.src + "' '" +
task.dest+ "'"
- return TaskExecutionResult(Tasks.Taskresult.success, "exec_get
successful", "")
+ return TaskExecutionResult(Tasks.Taskresult.success, "exec_get
successful", "")
+ def interrupt_task (self):
+ print "TestWorker task is interrupted by timeout"
class LocalWorker (AbstractWorker):
+ def __init__(self, threadID, node, tasks):
+ AbstractWorker.__init__(self, threadID, node, tasks)
+ self.process = None
def connect (self):
g_logger.log ("LocalWorker connects to '" + self.node.hostname + "'")
try:
@@ -241,7 +283,10 @@
output = ""
found = False
try:
- output = subprocess.check_output(task.command + " " +
task.arguments, shell=True)
+ self.process = subprocess.Popen("exec " + task.command,
stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)
+ stdoutdata, stderrdata = self.process.communicate()
+ output = stdoutdata
+ #subprocess.check_output(task.command + " " + task.arguments,
shell=True)
output = output.rstrip()
except subprocess.CalledProcessError as e:
returncode = e.returncode
@@ -259,10 +304,17 @@
def exec_put (self, task):
raise NotImplementedError (inspect.stack()[0][3])
def exec_get (self, task):
- raise NotImplementedError (inspect.stack()[0][3])
+ raise NotImplementedError (inspect.stack()[0][3])
+ def interrupt_task (self):
+ g_logger.log (self.node.hostname + " : Task interrupted by timeout")
+ if (None != self.process):
+ self.process.terminate()
class RemoteSSHWorker (AbstractWorker):
+ def __init__(self, threadID, node, tasks):
+ AbstractWorker.__init__(self, threadID, node, tasks)
+ self.task_interrupted = False
def connect (self):
self.ssh = None
if (interrupt):
@@ -369,6 +421,7 @@
return self.exec_run (t)
def exec_run (self, task):
global interrupt
+ self.task_interrupted = False
message = "undefined"
output = ""
if(interrupt):
@@ -402,6 +455,11 @@
if(interrupt):
result = Tasks.Taskresult.user_interrupt
break
+ if (self.task_interrupted):
+ channel.close()
+ exit_status = 0
+ break
+
if (timeout != -1):
delta = time.time() - start_time
if (delta > timeout):
@@ -427,7 +485,7 @@
stderr_data += data
if not got_data:
break
- if (result == Tasks.Taskresult.success):
+ if (not self.task_interrupted and result == Tasks.Taskresult.success):
exit_status = channel.recv_exit_status ()
if (result == Tasks.Taskresult.success):
if (task.expected_return_code != -1):
@@ -520,7 +578,10 @@
pass
if (None == result):
result = TaskExecutionResult(Tasks.Taskresult.success, "Store
source '"+task.src+"' in '" +task.dest+"'", "")
- return result
+ return result
+ def interrupt_task (self):
+ g_logger.log (self.node.hostname + " : Task interrupted by timeout")
+ self.task_interrupted = True
class PlanetLabWorker (RemoteSSHWorker):
def connect (self):
@@ -588,7 +649,7 @@
def start (self):
g_logger.log ("Starting execution for node " + self.node.hostname)
self.thread.start()
-
+
class Worker:
def __init__(self, logger, configuration, target, nodes, tasks,
notifications):
global g_logger;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r30574 - eclectic/gplmt/gplmt,
gnunet <=