# # # add_file "mtn.py" # content [ce95ba92be3d66183fe9170e2885b676e039c404] # ============================================================ --- mtn.py ce95ba92be3d66183fe9170e2885b676e039c404 +++ mtn.py ce95ba92be3d66183fe9170e2885b676e039c404 @@ -0,0 +1,219 @@ + +import os +import re +import pipes +import select +import threading + +# regular expressions that are of general use when +# validating monotone output +def group_compile(r): + return re.compile('('+r+')') + +revision_re = r'[A-Fa-f0-9]{40}' +revision_re_c = group_compile(revision_re) +name_re = r'^[\S]+' +name_re_c = group_compile(name_re) + +class MonotoneException(Exception): + pass + +class Revision(str): + def __init__(self, v): + str.__init__(v) + if not revision_re_c.match(self): + raise MonotoneException("Not a valid revision ID") + def abbrev(self): + return '[' + self[:8] + '..]' + +class Runner: + def __init__(self, monotone, database): + self.base_command = [monotone, "--db=%s" % pipes.quote(database)] + +packet_header_re = re.compile(r'^(\d+):(\d+):([lm]):(\d+):') + +class Automate(Runner): + """Runs commands via a particular monotone process. This + process is started the first time run() is called, and + stopped when this class instance is deleted. + + If an error occurs, the monotone process may need to be + stopped and a new one created. + """ + def __init__(self, *args, **kwargs): + Runner.__init__(*[self] + list(args), **kwargs) + self.lock = threading.Lock() + self.process = None + + def __process_required(self): + if self.process != None: + return + from utility import set_nonblocking + to_run = self.base_command + ['automate', 'stdio'] + self.child_stdin, self.child_stdout, self.child_stderr = os.popen3(to_run) + map (set_nonblocking, [ self.child_stdin, + self.child_stdout, + self.child_stderr ]) + + def run(self, command, args): + if not self.lock.acquire(False): + raise MonotoneException("Automate process can't be called: it is already locked.") + self.__process_required() + + enc = "l%d:%s" % (len(command), command) + enc += ''.join(map(lambda x: "%d:%s" % (len(x), x), args)) + 'e' + self.child_stdin.write(enc) + self.child_stdin.flush() + + import sys + def read_result_packets(): + buffer = "" + while True: + r_stdin, r_stdout, r_stderr = select.select([self.child_stdout], [], [], None) + if not r_stdin and not r_stdout and not r_stderr: + break + + if self.child_stdout in r_stdin: + data = self.child_stdout.read() + if data == "": + break + buffer += data + + # loop, trying to get complete packets out of our buffer + complete, in_packet = False, False + while not complete and buffer != '': + if not in_packet: + m = packet_header_re.match(buffer) + if not m: + break + in_packet = True + cmdnum, errnum, pstate, length = m.groups() + errnum = int(cmdnum) + length = int(length) + header_length = m.end(m.lastindex) + 1 # the '1' is the colon + + if len(buffer) < length + header_length: + # not enough data read from client yet; go round + break + else: + result = buffer[header_length:header_length+length] + buffer = buffer[header_length+length:] + complete = pstate == 'l' + in_packet = False + yield errnum, complete, result + + if complete: + break + + # get our response, and yield() it back one line at a time + code_max = -1 + for code, is_last, data in read_result_packets(): + if code and code > code_max: + code_max = code + for line in data.split('\n'): + yield line + if code_max > 0: + raise MonotoneException("error code %d in automate packet." % code_max) + self.lock.release() + +class Standalone(Runner): + """Runs commands by running monotone. One monotone process + per command""" + + def run(self, command, args): + # as we pass popen3 as sequence, it executes monotone with these + # arguments - and does not pass them through the shell according + # to help(os.popen3) + to_run = self.base_command + [command] + args + child_stdin, child_stdout, child_stderr = os.popen3(to_run) + for line in child_stdout: + yield line + stderr_data = child_stderr.read() + if len(stderr_data) > 0: + raise MonotoneException("data on stderr for command '%s': %s" % (command, + stderr_data)) +class MtnObject: + def __init__(self, obj_type): + self.obj_type = obj_type + +class Tag(MtnObject): + def __init__(self, name, revision, author): + MtnObject.__init__(self, "tag") + self.name, self.revision, self.author = name, Revision(revision), author + +class Branch(MtnObject): + def __init__(self, name): + MtnObject.__init__(self, "branch") + self.name = name + +basic_io_name_tok = re.compile(r'^(\S+)') + +def basic_io_from_stream(gen): + # all of these x_consume functions return parsed string + # token to add to stanza, name of next consume function to call + # new value of line (eg. with consumed tokens removed) + + def hex_consume(line): + m = revision_re_c.match(line[1:]) + if line[0] != '[' or not m: + raise MonotoneException("This is not a hex token: %s" % line) + end_of_match = m.end(m.lastindex) + if line[end_of_match+1] != ']': + raise MonotoneException("Hex token ends in character other than ']': %s" % line) + return Revision(m.groups()[0]), choose_consume, line[end_of_match+2:] + + def name_consume(line): + m = name_re_c.match(line) + if not m: + raise MonotoneException("Not a name: %s" % line) + end_of_match = m.end(m.lastindex) + return m.groups()[0], choose_consume, line[end_of_match:] + + def choose_consume(line): + line = line.lstrip() + if line[0] == '[': + consumer = hex_consume + elif line[0] == '\\': + consumer = string_consume + else: + consumer = name_consume + return None, consumer, line + + consumer = choose_consume + current_stanza = [] + for line in (t.strip() for t in gen): + print "read line:", line + if line == '' and current_stanza: + yield current_stanza + current_stanza = [] + continue + + while line: + new_token, consumer, line = consumer(line) + if new_token != None: + current_stanza.append(new_token) + +class Operations: + def __init__(self, runner_args): + self.standalone = apply(Standalone, runner_args) + self.automate = apply(Automate, runner_args) + + def tags(self): + for line in (t.strip() for t in self.standalone.run('ls', ['tags'])): + if not line: + continue + yield apply(Tag, line.split(' ', 2)) + + def branches(self): + for line in (t.strip() for t in self.standalone.run('ls', ['branches'])): + if not line: + continue + yield apply(Branch, (line,)) + + def graph(self): + for line in self.automate.run('graph', []): + yield line + + def get_revision(self, id): + for stanza in basic_io_from_stream(self.automate.run('get_revision', [id])): + yield stanza