# # patch "dumb.py" # from [3366d2e3486074868b7dd886f5fd436e7b88c30b] # to [528fa3913c0f2e157f322668847e2d65458f4cbd] # # patch "merkle_dir.py" # from [1ed46dcbd03429fb73b53090b6fd291c325972f5] # to [d62f79494aabfa58b47e3dece80f4f025b4440d5] # # patch "monotone.py" # from [5f7331d14d941ef0f6d5e6be9526e02ab7059ec5] # to [a60a03bd596164fa7a6bd99d7007b4898f033da2] # ======================================================================== --- dumb.py 3366d2e3486074868b7dd886f5fd436e7b88c30b +++ dumb.py 528fa3913c0f2e157f322668847e2d65458f4cbd @@ -1,22 +1,22 @@ import sha from sets import Set import os import os.path from cStringIO import StringIO -import merkle_dir -import fs +from merkle_dir import MerkleDir +from fs import readable_fs_for_url, writeable_fs_for_url import zlib def do_full_import(monotone, url): monotone.ensure_db() - md = merkle_dir.MerkleDir(fs.readable_fs_for_url(url)) - def all_data(): - for id, data in md.all_chunks(): - yield zlib.decompress(data) - monotone.feed(all_data()) + md = MerkleDir(readable_fs_for_url(url)) + feeder = monotone.feeder() + for id, data in md.all_chunks(): + feeder.write(zlib.decompress(data)) + feeder.close() def do_export(monotone, url): - md = merkle_dir.MerkleDir(fs.writeable_fs_for_url(url)) + md = MerkleDir(writeable_fs_for_url(url)) md.begin() curr_ids = Set(md.all_ids()) for rid in monotone.toposort(monotone.revisions_list()): @@ -60,3 +60,49 @@ rdata.write(monotone.get_file_packet(new_fid)) md.add(rid, zlib.compress(rdata.getvalue())) md.commit() + + +def do_push(monotone, local_url, target_url): + print "Exporting changes from monotone db to %s" % (local_url,) + do_export(monotone, local_url) + print "Pushing changes from %s to %s" % (local_url, target_url) + local_md = MerkleDir(readable_fs_for_url(local_url)) + target_md = MerkleDir(writeable_fs_for_url(target_url)) + added = 0 + def count_new(id, data): + added += 1 + local_md.push(target_md, count_new) + print "Pushed %s packets to %s" % (added, target_url) + +def do_pull(monotone, local_url, source_url): + print "Pulling changes from %s to %s" % (source_url, local_url) + local_md = MerkleDir(writeable_fs_for_url(local_url)) + source_md = MerkleDir(readable_fs_for_url(source_url)) + feeder = monotone.feeder() + added = 0 + def feed_new(id, data): + feeder.write(zlib.decompress(data)) + added += 1 + local_md.pull(source_md, feed_new) + feeder.close() + print "Pulled and imported %s packets from %s" % (added, source_url) + +def do_sync(monotone, local_url, other_url): + print "Exporting changes from monotone db to %s" % (local_url,) + do_export(monotone, local_url) + print "Synchronizing %s and %s" % (local_url, other_url) + local_md = MerkleDir(writeable_fs_for_url(local_url)) + other_md = MerkleDir(writeable_fs_for_url(other_url)) + feeder = monotone.feeder() + pulled = 0 + pushed = 0 + def feed_pull(id, data): + feeder.write(zlib.decompress(data)) + pulled += 1 + def count_push(id, data): + pushed += 1 + local_md.sync(other_md, feed_pull, count_push) + feeder.close() + print "Pulled and imported %s packets from %s" % (pulled, other_url) + print "Pushed %s packets to %s" % (pushed, other_url) + ======================================================================== --- merkle_dir.py 1ed46dcbd03429fb73b53090b6fd291c325972f5 +++ merkle_dir.py d62f79494aabfa58b47e3dece80f4f025b4440d5 @@ -79,7 +79,7 @@ # -- check for any missing files left by non-atomic renames # -- remove the lockdir -class HashFile: +class _HashFile: prefix = "" values = () @@ -138,12 +138,12 @@ elif versus.get(key) != value: yield key -class RootHash(HashFile): +class _RootHash(_HashFile): prefix = "subtree" values = ("hash",) value_type = str -class ChildHash(HashFile): +class _ChildHash(_HashFile): prefix = "chunk" values = ("offset", "length") value_type = int @@ -213,7 +213,7 @@ if self._root_hash is not None: return self._root_hash data = self._fs.fetch([self._hashes_prefix])[self._hashes_prefix] - self._root_hash = RootHash() + self._root_hash = _RootHash() if data is not None: self._root_hash.load(data) return self._root_hash @@ -224,7 +224,7 @@ self._fs.put({self._hashes_prefix: obj.export()}) # pass an iterable of prefixes - # returns a dict {prefix -> ChildHash object} + # returns a dict {prefix -> _ChildHash object} def _get_child_hashes(self, prefixes): child_hashes = {} needed = [] @@ -236,7 +236,7 @@ if needed: datas = self._fs.fetch([self._hashes_prefix + n for n in needed]) for fname, data in datas.items(): - ch = ChildHash() + ch = _ChildHash() if data is not None: ch.load(data) prefix = fname[len(self._hashes_prefix):] ======================================================================== --- monotone.py 5f7331d14d941ef0f6d5e6be9526e02ab7059ec5 +++ monotone.py a60a03bd596164fa7a6bd99d7007b4898f033da2 @@ -5,6 +5,22 @@ class MonotoneError (Exception): pass +class Feeder: + def __init__(self, process): + self.process = process + + # this is technically broken; we might deadlock. + # subprocess.Popen.communicate uses threads to do this; that'd be + # better. + def write(self, data): + self.process.stdin.write(data) + + def close(self): + self.process.stdin.close() + stdout, stderr = process.communicate() + if process.returncode: + raise MonotoneError, stderr + class Monotone: def __init__(self, db, executable="monotone"): self.db = db @@ -70,20 +86,12 @@ return stdout # feeds stuff into 'monotone read' - def feed(self, iterator): + def feeder(self): process = subprocess.Popen([self.executable, "--db", self.db, "read"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - # this is technically broken; we might deadlock. - # subprocess.Popen.communicate uses threads to do this; that'd be - # better. - for chunk in iterator: - process.stdin.write(chunk) - process.stdin.close() - stdout, stderr = process.communicate() - if process.returncode: - raise MonotoneError, stderr + return Feeder(process) # copied wholesale from viewmtn (08fd7bf8143512bfcabe5f65cf40013e10b89d28)'s # monotone.py. hacked to remove the []s from hash values, and to leave in