# # # delete "tracmtn/automate_new.py" # # add_dir "twisted" # # add_dir "twisted/plugins" # # add_file "twisted/plugins/mserv.py" # content [86e1d15b20fd7b081a7258df0d1e74872049a5b7] # ============================================================ --- twisted/plugins/mserv.py 86e1d15b20fd7b081a7258df0d1e74872049a5b7 +++ twisted/plugins/mserv.py 86e1d15b20fd7b081a7258df0d1e74872049a5b7 @@ -0,0 +1,560 @@ +# -*- python -*- +""" +Proxying Daemon for the Monotone Automation Interface + +Currently implemented as a twist(e)d plugin. + +Copyright 2008 Thomas Moschny (address@hidden) + +{{{ +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; either version 2 of the License, or (at +your option) any later version. + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 +USA +}}} +""" + +from zope.interface import implements +from twisted.application import service, internet, strports +from twisted.internet import reactor, defer, protocol, error +from twisted.plugin import IPlugin +from twisted.python import log, usage +from twisted.web import static, server, resource +from collections import deque +from cgi import escape +from tracmtn import basic_io +import os +import re +import simplejson + +# ---------------------------------------------------------------------- + +# current URL scheme: +# DONE /branches +# DONE /certs/REV_ID +# DONE /file/FILE_ID (raw) +# /file_length/FILE_ID +# DONE /manifest/REV_ID +# DONE /revision/REV_ID +# DONE /revisions/leaves +# DONE /revisions/roots +# DONE /revisions/parents/REV_ID +# DONE /revisions/children/REV_ID +# DONE /revisions/ancestors/REV_ID +# DONE /revisions/descendants/REV_ID +# DONE /revisions/select/SELECTOR +# DONE /ancestry +# DONE /tags +# DONE /cmd (raw) +# DONE /version + +# ---------------------------------------------------------------------- + +# Todo: +# - find a way to get file lengths fast +# - pass reactor around (reasonable?) +# - send partial results (hard) +# - DONE handle zero length results (e.g. from heads) +# - DONE pass back an error in case of a non-zero status +# - DONE use logger instead of print statements + +# ---------------------------------------------------------------------- + +PACKET_HEADER = re.compile(r''' + (?P\d+): + (?P\d+): + (?P(m|l)): + (?P\d+): +''', re.VERBOSE) + +# ---------------------------------------------------------------------- + +class AutomateException(Exception): + pass + +# ---------------------------------------------------------------------- + +class AutomateProtocol(protocol.ProcessProtocol): + + def __init__(self): + self.connected = False + self.queue = deque() + self.data = '' + self.curdata = '' + self.status = 0 + self.to_read = 0 + self.last = False + + def log(self, s): + log.msg("mtn protocol: %s" % s) + + def outReceived(self, data): + self.data += data + while self.data: + if self.to_read: + length = min(self.to_read, len(self.data)) + self.curdata += self.data[:length] + self.data = self.data[length:] + self.to_read -= length + else: + m = PACKET_HEADER.match(self.data) + if m: + self.status = int(m.group('status')) + self.to_read = int(m.group('size')) + self.last = m.group('last') == 'l' + self.data = self.data[m.end():] + else: + break # wait for more data + if self.last and not self.to_read: + cur = self.queue.popleft() + if self.status == 0: + cur.d.callback(self.curdata) + else: + cur.d.errback(AutomateException(self.curdata)) + self.curdata = '' + + def connectionMade(self): + self.log("connected") + self.connected = True + + def errReceived(self, data): + self.log("got stderr: '%s'" % data.strip()) + + def processEnded(self, reason): + self.log("process ended: %s" % reason.value.message) + self.log("shutting down") + reactor.stop() # fatal + + def cmd(self, query): + #self.log("sending '%s'" % query.cmd) + if self.connected: + self.queue.append(query) + self.transport.write(query.cmd) + else: # fixme: danger of endless loop? + self.log("not connected, postponing cmd") + reactor.callLater(0, self.cmd, query) + +# ---------------------------------------------------------------------- + +class Query(object): + + def __init__(self, cmd): + self.cmd = cmd + self.d = defer.Deferred() + +# ---------------------------------------------------------------------- + +class MtnService(service.Service): + + def __init__(self, binary, database): + # no super ctor + self.binary = binary + self.database = database + self.protocol = AutomateProtocol() + + def startService(self): + reactor.spawnProcess(self.protocol, self.binary, + [self.binary, '--norc', '--root=.', + '--automate-stdio-size=%d' % (100*1024), + '--db=%s' % self.database, + 'automate', 'stdio']) + + def cmd(self, cmd, args=[], opts={}): + + def assemble_cmdstr(cmd, args, opts): + """Assemble the cmdline from command, args and opts.""" + + def lstr(string): + """Prepend string with its length and a colon.""" + return "%d:%s" % (len(string), string) + + if opts: + cmdstr = "o%se" % ''.join( + [lstr(name) + lstr(val) for name, val in opts.iteritems()]) + else: + cmdstr = "" + + cmdstr += "l%s%se" % ( + lstr(cmd), ''.join([lstr(arg) for arg in args])) + return cmdstr + + query = Query(assemble_cmdstr(cmd, args, opts)) + self.protocol.cmd(query) + return query.d + +# ---------------------------------------------------------------------- + +class EmptyResource(resource.Resource): + pass + +# ---------------------------------------------------------------------- + +class MonotoneResource(resource.Resource): + + isLeaf = True + + def __init__(self, mtnservice): + resource.Resource.__init__(self) + self.mtnservice = mtnservice + + @staticmethod + def eb(request, failure): + request.setResponseCode(400) + request.write("
%s
" % failure.getErrorMessage()) + request.finish() + + @staticmethod + def cb(request, result, raw=False): + if isinstance(result, basestring): + request.setHeader("content-type", "application/octet-stream") + else: + result = simplejson.dumps(result, indent=2) + request.setHeader("content-type", "application/json") + if 'debug' in request.args: + # overwrite header + request.setHeader("content-type", "text/plain") + request.write(result) + request.finish() + +# ---------------------------------------------------------------------- + +class CommandResource(MonotoneResource): + + def __init__(self, mtnservice): + MonotoneResource.__init__(self, mtnservice) + + def render_GET(self, request): + if request.postpath and request.postpath[0]: + cmd, args = request.postpath[0], request.postpath[1:] + self.mtnservice.cmd(cmd, args).addCallback( + lambda v: self.cb(request, v, True)).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + + request.setResponseCode(400) + return "No command given." + +# ---------------------------------------------------------------------- + +class AncestryResource(MonotoneResource): + + def __init__(self, mtnservice): + MonotoneResource.__init__(self, mtnservice) + + def render_GET(self, request): + + def proc(result): + parents = {} + for l in result.splitlines(): + revs = l.split(' ') + parents[revs[0]] = revs[1:] + return parents + + self.mtnservice.cmd('graph').addCallback( + lambda v: self.cb(request, proc(v))).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + +# ---------------------------------------------------------------------- + +class BranchesResource(MonotoneResource): + + def __init__(self, mtnservice): + MonotoneResource.__init__(self, mtnservice) + + def render_GET(self, request): + + def proc(result): + return result.splitlines() + + self.mtnservice.cmd('branches').addCallback( + lambda v: self.cb(request, proc(v))).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + +# ---------------------------------------------------------------------- + +class RevisionsResource(MonotoneResource): + + def __init__(self, mtnservice, what, needs_arg): + MonotoneResource.__init__(self, mtnservice) + self.what = what + self.needs_arg = needs_arg + + def render_GET(self, request): + + rev = request.postpath[0:1] + if self.needs_arg and not rev: + request.setResponseCode(400) + return "Missing arg." + + def proc(result): + return result.splitlines() + + self.mtnservice.cmd(self.what, rev).addCallback( + lambda v: self.cb(request, proc(v))).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + +# ---------------------------------------------------------------------- + +class CertsResource(MonotoneResource): + + def __init__(self, mtnservice): + MonotoneResource.__init__(self, mtnservice) + + def render_GET(self, request): + rev = request.postpath[0:1] + if not rev: + request.setResponseCode(400) + return "Missing revision." + + def proc(result): + certs, cert = {}, {} + for key, values in basic_io.items(result): + if key == 'trust': + if values[0]=='trusted' and \ + cert['signature'] == 'ok': + certs.setdefault(cert['name'], []).append( + {'value' : cert['value'], + 'key': cert['key']}) + else: + cert[key] = values[0] + return certs + + self.mtnservice.cmd('certs', rev).addCallback( + lambda v: self.cb(request, proc(v))).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + +# ---------------------------------------------------------------------- + +class TagsResource(MonotoneResource): + + def __init__(self, mtnservice): + MonotoneResource.__init__(self, mtnservice) + + def render_GET(self, request): + + def proc(result): + tags = {} + for key, values in basic_io.items(result): + if key == 'tag': + tag = {} + tags.setdefault(values[0], []).append(tag) + elif key in ('revision', 'signer'): + tag[key] = values[0] + elif key == 'branches': + tag[key] = values + return tags + + self.mtnservice.cmd('tags').addCallback( + lambda v: self.cb(request, proc(v))).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + +# ---------------------------------------------------------------------- + +class FileResource(MonotoneResource): + + def __init__(self, mtnservice): + MonotoneResource.__init__(self, mtnservice) + + def render_GET(self, request): + node_id = request.postpath[0:1] + if not node_id: + request.setResponseCode(400) + return "Missing node id." + + self.mtnservice.cmd('get_file', node_id).addCallback( + lambda v: self.cb(request, v)).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + +# ---------------------------------------------------------------------- + +class ManifestResource(MonotoneResource): + + def __init__(self, mtnservice): + MonotoneResource.__init__(self, mtnservice) + + def render_GET(self, request): + + rev = request.postpath[0:1] + if not rev: + request.setResponseCode(400) + return "Missing revision." + + def proc(result): + + def enter(path, kind, content): + path = path + if kind == 'dir': + manifest.setdefault('dirs', []).append(path) + elif kind == 'file': + manifest.setdefault('files', {})[path] = content + + # stanzas have variable length, trigger on next 'path' ... + manifest, path, kind, content = {}, None, None, None + for key, values in basic_io.items(result): + if key in ('dir', 'file'): + if path is not None: + enter(path, kind, content) + path, kind, content = values[0], key, None + elif key == 'content': + content = values[0] + elif key == 'attr': + manifest.setdefault('attrs', {}).setdefault( + path, {})[values[0]] = values[1] + + if path is not None: # ... or eof + enter(path, kind, content) + + return manifest + + result = self.mtnservice.cmd('get_manifest_of', rev).addCallback( + lambda v: self.cb(request, proc(v))).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + +# ---------------------------------------------------------------------- + +class RevisionResource(MonotoneResource): + + def __init__(self, mtnservice): + MonotoneResource.__init__(self, mtnservice) + + def render_GET(self, request): + + rev = request.postpath[0:1] + if not rev: + request.setResponseCode(400) + return "Missing revision." + + def proc(result): + changesets, changeset = {}, {} + + # fime: use present time instead of past time? + for key, values in basic_io.items(result): + if key == 'old_revision': + changeset = {} + changesets[values[0]] = changeset + path = arg = None + clear = False + elif key == 'delete': + changeset.setdefault('deleted', []).append(values[0]) + elif key in ('rename', 'add_file', 'patch', 'set', 'clear'): + path = values[0] + elif key == 'clear': + path = values[0] + clear = True + elif key == 'from': + arg = values[0] + elif key == 'to': + if path != None: + if arg is not None: + changeset.setdefault('patched', {})[path] = (arg, values[0]) + path = arg = None + else: + changeset.setdefault('renamed', {})[values[0]] = path + path = None + elif key == 'content': + if path != None: + changeset.setdefault('files_added', {})[path] = values[0] + path = None + elif key == 'add_dir': + changeset.setdefault('dirs_added', []).append(values[0]) + elif key == 'attr': + if clear: + changeset.setdefault('attrs_cleared', {}).setdefault(path, []).append(values[0]) + path = None + clear = False + else: + arg = values[0] + elif key == 'value': + if path != None and arg != None: + changeset.setdefault('attrs_set', {}).setdefault(path, {})[arg] = values[0] + path = arg = None + # phew. + return changesets + + result = self.mtnservice.cmd('get_revision', rev).addCallback( + lambda v: self.cb(request, proc(v))).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + +# ---------------------------------------------------------------------- + +class VersionResource(MonotoneResource): + + def __init__(self, mtnservice): + MonotoneResource.__init__(self, mtnservice) + + def render_GET(self, request): + + def proc(result): + return map(int, result.strip().split('.')) + + result = self.mtnservice.cmd('interface_version').addCallback( + lambda v: self.cb(request, proc(v))).addErrback( + lambda v: self.eb(request, v)) + return server.NOT_DONE_YET + +# ---------------------------------------------------------------------- + +class Options(usage.Options): + optParameters = [ + ['port', 'p', '8080', 'port to listen on'], + ['db', 'd', os.getenv('HOME')+'/.monotone/monotone.mtn', 'database to serve'], + ['binary', 'b', '/usr/bin/mtn', 'path to monotone binary'], + ] + +class MonotoneServiceMaker(object): + implements(service.IServiceMaker, IPlugin) + tapname = 'mserv' + description = 'Access the Monotone automate interface.' + options = Options + + def makeService(self, options): + + mtnservice = MtnService('/usr/bin/mtn', options['db']) + + root = EmptyResource() + root.putChild('cmd', CommandResource(mtnservice)) + root.putChild('branches', BranchesResource(mtnservice)) + root.putChild('certs', CertsResource(mtnservice)) + root.putChild('tags', TagsResource(mtnservice)) + root.putChild('file', FileResource(mtnservice)) + root.putChild('manifest', ManifestResource(mtnservice)) + root.putChild('version', VersionResource(mtnservice)) + root.putChild('revision', RevisionResource(mtnservice)) + root.putChild('ancestry', AncestryResource(mtnservice)) + + revisions = EmptyResource() + root.putChild('revisions', revisions) + for w in ('parents', 'children', 'ancestors', 'descendants', 'select'): + revisions.putChild(w, RevisionsResource(mtnservice, w, True)) + for w in ('leaves', 'roots'): + revisions.putChild(w, RevisionsResource(mtnservice, w, False)) + + webservice = strports.service(options['port'], server.Site(root)) + + multiservice = service.MultiService() + + mtnservice.setServiceParent(multiservice) + webservice.setServiceParent(multiservice) + + return multiservice + + +serviceMaker = MonotoneServiceMaker()