# # patch "dumb.py" # from [587c80aaf9ab28b6e17049f75dc8ddfb706eb064] # to [532954757a0191f36cdaf12f96d50425aa6afeba] # # patch "fs.py" # from [019161a560cc3092f62b01bbf7146d88339e1bb5] # to [28b941965155288861b38b1c478e5993a7a22cb7] # # patch "fs_sftp.py" # from [ae1409f264dc600ffe8109fca9f716c10e557e10] # to [c31b2800f0333335f2b4a9d6f8075563ad30bd1b] # # patch "merkle_dir.py" # from [736fb122be92473b73f6de96a0e9584da9700611] # to [9c2532dc27fc69184efb6d30fca499dfa23481f3] # # patch "monotone.py" # from [7bfd2db49f9fd5aeee6a7c0f16f78e1d82ea9308] # to [4049bb72eee1adb79206f2c27432dc1009ea5e57] # ======================================================================== --- dumb.py 587c80aaf9ab28b6e17049f75dc8ddfb706eb064 +++ dumb.py 532954757a0191f36cdaf12f96d50425aa6afeba @@ -21,75 +21,90 @@ def do_export(monotone, url): md = MerkleDir(writeable_fs_for_url(url)) - md.begin() - curr_ids = Set(md.all_ids()) - for rid in monotone.toposort(monotone.revisions_list()): - certs = monotone.get_cert_packets(rid) - for cert in certs: - id = sha.new(cert).hexdigest() - if id not in curr_ids: - data = zlib.compress(cert) - md.add(id, data) - if rid not in curr_ids: - rdata = StringIO() - revision_text = monotone.get_revision(rid) - revision_parsed = monotone.basic_io_parser(revision_text) - new_manifest = None - old_manifest = "" - new_files = {} - for stanza in revision_parsed: - stanza_type = stanza[0][0] - if stanza_type == "new_manifest": - new_manifest = stanza[0][1] - elif stanza_type == "old_revision": - if not old_manifest: - old_manifest = stanza[1][1] - elif stanza_type == "patch": - old_fid = stanza[1][1] - new_fid = stanza[2][1] - if not new_files.has_key(new_fid): - new_files[new_fid] = None - if old_fid: - new_files[new_fid] = old_fid + try: + md.begin() + curr_ids = Set(md.all_ids()) + for rid in monotone.toposort(monotone.revisions_list()): + certs = monotone.get_cert_packets(rid) + for cert in certs: + id = sha.new(cert).hexdigest() + if id not in curr_ids: + data = zlib.compress(cert) + md.add(id, data) + if rid not in curr_ids: + rdata = StringIO() + revision_text = monotone.get_revision(rid) + revision_parsed = monotone.basic_io_parser(revision_text) + new_manifest = None + old_manifest = "" + new_files = {} + for stanza in revision_parsed: + stanza_type = stanza[0][0] + if stanza_type == "new_manifest": + new_manifest = stanza[0][1] + elif stanza_type == "old_revision": + if not old_manifest: + old_manifest = stanza[1][1] + elif stanza_type == "patch": + old_fid = stanza[1][1] + new_fid = stanza[2][1] + if not new_files.has_key(new_fid): + new_files[new_fid] = None + if old_fid: + new_files[new_fid] = old_fid - rdata.write(monotone.get_revision_packet(rid)) - if old_manifest: - rdata.write(monotone.get_manifest_delta_packet(old_manifest, new_manifest)) - else: - rdata.write(monotone.get_manifest_packet(new_manifest)) - for new_fid, old_fid in new_files.items(): - if old_fid: - rdata.write(monotone.get_file_delta_packet(old_fid, new_fid)) + rdata.write(monotone.get_revision_packet(rid)) + if old_manifest: + rdata.write(monotone.get_manifest_delta_packet(old_manifest, + new_manifest)) else: - rdata.write(monotone.get_file_packet(new_fid)) - md.add(rid, zlib.compress(rdata.getvalue())) - md.commit() + rdata.write(monotone.get_manifest_packet(new_manifest)) + for new_fid, old_fid in new_files.items(): + if old_fid: + rdata.write(monotone.get_file_delta_packet(old_fid, + new_fid)) + else: + rdata.write(monotone.get_file_packet(new_fid)) + md.add(rid, zlib.compress(rdata.getvalue())) + md.commit() + except LockError: + raise + except: + md.rollback() +class CounterCallback: + def __init__(self): + self.added = 0 + def __call__(self, id, data): + self.added += 1 +class FeederCallback: + def __init__(self, feeder): + self.added = 0 + self.feeder = feeder + def __call__(self, id, data): + self.added += 1 + self.feeder.write(zlib.decompress(data)) + def do_push(monotone, local_url, target_url): print "Exporting changes from monotone db to %s" % (local_url,) do_export(monotone, local_url) print "Pushing changes from %s to %s" % (local_url, target_url) local_md = MerkleDir(readable_fs_for_url(local_url)) target_md = MerkleDir(writeable_fs_for_url(target_url)) - added = 0 - def count_new(id, data): - added += 1 - local_md.push(target_md, count_new) - print "Pushed %s packets to %s" % (added, target_url) + c = CounterCallback() + local_md.push(target_md, c) + print "Pushed %s packets to %s" % (c.added, target_url) def do_pull(monotone, local_url, source_url): print "Pulling changes from %s to %s" % (source_url, local_url) local_md = MerkleDir(writeable_fs_for_url(local_url)) source_md = MerkleDir(readable_fs_for_url(source_url)) feeder = monotone.feeder() - added = 0 - def feed_new(id, data): - feeder.write(zlib.decompress(data)) - added += 1 - local_md.pull(source_md, feed_new) + fc = FeederCallback(feeder) + local_md.pull(source_md, fc) feeder.close() - print "Pulled and imported %s packets from %s" % (added, source_url) + print "Pulled and imported %s packets from %s" % (fc.added, source_url) def do_sync(monotone, local_url, other_url): print "Exporting changes from monotone db to %s" % (local_url,) @@ -98,17 +113,12 @@ local_md = MerkleDir(writeable_fs_for_url(local_url)) other_md = MerkleDir(writeable_fs_for_url(other_url)) feeder = monotone.feeder() - pulled = 0 - pushed = 0 - def feed_pull(id, data): - feeder.write(zlib.decompress(data)) - pulled += 1 - def count_push(id, data): - pushed += 1 - local_md.sync(other_md, feed_pull, count_push) + pull_fc = FeederCallback(feeder) + push_c = CounterCallback() + local_md.sync(other_md, pull_fc, push_c) feeder.close() - print "Pulled and imported %s packets from %s" % (pulled, other_url) - print "Pushed %s packets to %s" % (pushed, other_url) + print "Pulled and imported %s packets from %s" % (pull_fc.added, other_url) + print "Pushed %s packets to %s" % (push_c.added, other_url) def main(name, args): pass ======================================================================== --- fs.py 019161a560cc3092f62b01bbf7146d88339e1bb5 +++ fs.py 28b941965155288861b38b1c478e5993a7a22cb7 @@ -1,15 +1,19 @@ # interface to FS-like things -from urlparse import urlsplit +import urlparse import os import os.path class BadURL(Exception): pass +# This is necessary to properly parse sftp:// urls +urlparse.uses_netloc.append("sftp") + def readable_fs_for_url(url): - (scheme, host, path, query, frag) = urlsplit(url, "file") + (scheme, host, path, query, frag) = urlparse.urlsplit(url, "file") if scheme == "file": + assert not host return LocalReadableFS(path) elif scheme in ("http", "https", "ftp"): import fs_http @@ -21,8 +25,9 @@ raise BadURL, url def writeable_fs_for_url(url): - (scheme, host, path, query, frag) = urlsplit(url, "file") + (scheme, host, path, query, frag) = urlparse.urlsplit(url, "file") if scheme == "file": + assert not host return LocalWriteableFs(path) elif scheme == "sftp": import fs_sftp @@ -109,11 +114,15 @@ def rollback_interrupted_puts(self, filenames): raise NotImplementedError + def rollback_name(self, filename): + return filename + ".back" + # returns true if mkdir succeeded, false if failed # used for locking def mkdir(self, filename): raise NotImplementedError + # should be a no-op if dir does not exist def rmdir(self, filename): raise NotImplementedError @@ -178,7 +187,10 @@ return 1 def rmdir(self, filename): - os.rmdir(self._fname(filename)) + try: + os.rmdir(self._fname(filename)) + except OSError: + pass def ensure_dir(self): name = self._fname("") ======================================================================== --- fs_sftp.py ae1409f264dc600ffe8109fca9f716c10e557e10 +++ fs_sftp.py c31b2800f0333335f2b4a9d6f8075563ad30bd1b @@ -1,8 +1,9 @@ # we need paramiko for sftp protocol support import paramiko import getpass import fs import os.path +import posixpath import base64 # All of this heavily cribbed from demo{,_simple}.py in the paramiko @@ -42,7 +43,8 @@ else: username = userspec if hostspec.find(":") >= 0: - hostname, port = hostspec.split(":") + hostname, port_str = hostspec.split(":") + port = int(port_str) else: hostname = hostspec port = 22 @@ -99,19 +101,60 @@ return self.client.open(self._fname(filename), "ab") def size(self, filename): - return self.client.stat(self._fname(filename)).st_size + try: + return self.client.stat(self._fname(filename)).st_size + except IOError: + return 0 + def _exists(self, full_fn): + try: + self.client.stat(full_fn) + except IOError: + return False + return True + def put(self, filenames): for fn, data in filenames.iteritems(): tmpname = self._fname("__tmp") tmph = self.client.open(tmpname, "wb") tmph.write(data) tmph.close() - self.client.rename(tmpname, self._fname(fn)) + ## This is a race! SFTP (at least until protocol draft 3, which + ## is what paramiko and openssh both implement) only has + ## non-clobbering rename. + full_fn = self._fname(fn) + # clobber any backup unconditionally, just in case, ignoring + # errors + try: + self.client.remove(full_fn + ".back") + except IOError: + pass + # then try moving it out of the way, again ignoring errors (maybe + # it doesn't exist to move, which is fine) + try: + self.client.rename(full_fn, full_fn + ".back") + except IOError: + pass + # finally, try clobbering it; this is he only operation that we + # actually care about the success of, and this time we do check + # for errors -- but do a local rollback if we can. + try: + self.client.rename(tmpname, full_fn) + except IOError: + if self._exists(full_fn + ".back"): + self.client.rename(full_fn + ".back", full_fn) + raise + # and clobber the backup we made (if it exists) + try: + self.client.remove(full_fn + ".back") + except IOError: + pass def rollback_interrupted_puts(self, filenames): - # for now, we assume we have atomic put - pass + for fn in filenames: + full_fn = self._fname(fn) + if not self._exists(full_fn) and self._exists(full_fn + ".back"): + self.client.rename(full_fn + ".back", full_fn) def mkdir(self, filename): try: @@ -121,23 +164,31 @@ return 1 def rmdir(self, filename): - self.client.rmdir(self._fname(filename)) + try: + self.client.rmdir(self._fname(filename)) + except IOError: + pass - def ensure_dir(self): + def ensure_dir(self, absdir=None): + if absdir is None: + absdir = self._fname("") + absdir = posixpath.normpath(absdir) + print "ensuring dir %s" % absdir try: - self.client.stat(self._fname("")) + self.client.stat(absdir) + print "stat succeeded" return - except IOError: + except IOError, e: + print "stat failed: %s" % (e,) pass # fall through to actually create dir + # logic cribbed from os.makedirs in python dist + head, tail = os.path.split(absdir) + if not tail: + head, tail = os.path.split(head) + if head and tail: + print "recursing to %s" % head + self.ensure_dir(head) + if tail == ".": + return + print "actually making %s" % absdir + self.client.mkdir(absdir) - pieces = [] - rest = self.dir - while rest: - (rest, next_piece) = os.path.split(rest) - pieces.insert(0, next_piece) - sofar = "" - for piece in pieces: - sofar = os.path.join(sofar, piece) - try: - self.client.mkdir(sofar) - except OSError: - pass ======================================================================== --- merkle_dir.py 736fb122be92473b73f6de96a0e9584da9700611 +++ merkle_dir.py 9c2532dc27fc69184efb6d30fca499dfa23481f3 @@ -249,6 +249,8 @@ # automatically updates root hash as well def _set_child_hashes(self, objs): self._need_lock() + if not objs: + return root_hash = self._get_root_hash() put_request = {} for prefix, obj in objs.iteritems(): @@ -279,7 +281,7 @@ assert id not in child_hashes[k] child_hashes[k].assign(id, location) print ("writing hashes for %s new ids to %s hash files" - % (len(self._ids_to_flush), len(bins))) + % (len(self._ids_to_flush), len(child_hashes))) self._set_child_hashes(child_hashes) self._ids_to_flush = [] @@ -360,8 +362,9 @@ target.add(id, data) if new_chunk_callback is not None: new_chunk_callback(id, data) - target.flush() target.commit() + except LockError: + raise except: target.rollback() raise ======================================================================== --- monotone.py 7bfd2db49f9fd5aeee6a7c0f16f78e1d82ea9308 +++ monotone.py 4049bb72eee1adb79206f2c27432dc1009ea5e57 @@ -6,19 +6,30 @@ pass class Feeder: - def __init__(self, process): - self.process = process + def __init__(self, args): + # We delay the actual process spawn, so as to avoid running monotone + # unless some packets are actually written (this is more efficient, + # and also avoids spurious errors from monotone when 'read' doesn't + # actually succeed in reading anything). + self.args = args + self.process = None # this is technically broken; we might deadlock. # subprocess.Popen.communicate uses threads to do this; that'd be # better. def write(self, data): + if self.process is None: + self.process = subprocess.Popen(self.args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) self.process.stdin.write(data) def close(self): - self.process.stdin.close() - stdout, stderr = process.communicate() - if process.returncode: + if self.process is None: + return + stdout, stderr = self.process.communicate() + if self.process.returncode: raise MonotoneError, stderr class Monotone: @@ -29,8 +40,11 @@ def init_db(self): self.run_monotone(["db", "init"]) + def db_exists(self): + return os.path.exists(self.db) + def ensure_db(self): - if not os.path.exists(self.db): + if not self.db_exists(): self.init_db() def revisions_list(self): @@ -87,11 +101,8 @@ # feeds stuff into 'monotone read' def feeder(self): - process = subprocess.Popen([self.executable, "--db", self.db, "read"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - return Feeder(process) + args = [self.executable, "--db", self.db, "read"] + return Feeder(args) # copied wholesale from viewmtn (08fd7bf8143512bfcabe5f65cf40013e10b89d28)'s # monotone.py. hacked to remove the []s from hash values, and to leave in