# # # patch "README" # from [6948dc9e6234ccdc6ac91e105a7f7ce565ca6b1b] # to [a05417592b1fd8d5c9931f66216f3d3c1ec4f122] # # patch "TODO" # from [324caa0e7bbe243e36b909a75c7bda1c916fc04d] # to [47ebd86bb1935279bed840375103cdb65daeb0cd] # # patch "dumb.py" # from [ed9e31afea876ce54c56a0407214759f88cce8ff] # to [81e3c939e7e2ad8602d53cc61f551142ed33a6d4] # # patch "merkle_dir.py" # from [659631f387cc5e65e18f8f63a1cd32bbc9018d93] # to [e44390cd190b70d97e46492f0c84fff137a521dd] # # patch "monotone.py" # from [b368c41d9ef4ab7b698f0d5638fe108c48843b1d] # to [a8536a1e47938b8d9ceea99c3cc62cf46c8be4bc] # # patch "plain.py" # from [4f57fdb4db70aecc41c6fa383c36f4ac44a052ad] # to [64e2790764e8452918cca06b2e9d8e63b70a8b3a] # # patch "setup.py" # from [cc4b8f4c21488a2f51bb505a8f3e5b9187b03b5d] # to [2eda98c2f22383d48c9bdf01d7aff388279a3521] # ============================================================ --- README 6948dc9e6234ccdc6ac91e105a7f7ce565ca6b1b +++ README a05417592b1fd8d5c9931f66216f3d3c1ec4f122 @@ -18,7 +18,6 @@ The monotone plain protocol always involves three entities: a (local) monotone database - a local transit merkle-directory a (possibly remote) merkle-directory to sync with The sync directories are called merkle-directories because essentially contain a two-level merkle trie plus a datafile with the revision data. @@ -50,7 +49,6 @@ [d:/repo/monotone-dumb.mtn] verbose = 0 repository = "dws://zzagorski.strony.wi.ps.pl/mtdumb/index.php:monotone-plain/" -local = file:C:\DOCUME~1\zbigg\USTAWI~1\Temp\_d_\repo\monotone-dumb.mtn-mtndumbtemp [d:/repo/xxx.mtn] rsakey = /home/zbigg/id_rsa_other ============================================================ --- TODO 324caa0e7bbe243e36b909a75c7bda1c916fc04d +++ TODO 47ebd86bb1935279bed840375103cdb65daeb0cd @@ -30,7 +30,7 @@ db... then we could have commands to list certs, dump info on a given cert, get a packet for a given cert, etc.) * merkle dir stuff that doesn't require loading entire chunks into - memory all the time + memory all the time, partialy done (revision data, keys), left: certs * pipelining on http read? (is urlgrabber thread-safe?) * possibly better rollback stuff? - truncate DATA when possible? (local, sftp (with a simple call ============================================================ --- dumb.py ed9e31afea876ce54c56a0407214759f88cce8ff +++ dumb.py 81e3c939e7e2ad8602d53cc61f551142ed33a6d4 @@ -9,12 +9,28 @@ import os import sha from sets import Set import os +import sys import os.path from cStringIO import StringIO -from merkle_dir import MerkleDir, LockError +from merkle_dir import MerkleDir, MemoryMerkleDir, LockError from fs import readable_fs_for_url, writeable_fs_for_url from monotone import Monotone +class partial: + def __init__(self, fn, *args): + self.__fn = fn + self.__args = args[:] + + def __call__(self, *args): + finalArgs = self.__args + args + return self.__fn(*finalArgs) + +class returnthis: + def __init__(self, value): + self.__value = value + def __call__(self): + return self.__value + class Dumbtone: def __init__(self, db, verbosity=0): @@ -39,117 +55,166 @@ class Dumbtone: for id, data in md.all_chunks(): feeder.write(data) feeder.close() + + def __make_revision_packet(self, rid): + rdata = StringIO() + revision_text = self.monotone.get_revision(rid) + revision_parsed = self.monotone.basic_io_parser(revision_text) + new_files = {} + for stanza in revision_parsed: + stanza_type = stanza[0][0] + if stanza_type == "add_file": + new_files[stanza[1][1]] = None + if self.verbosity > 0: + print stanza_type, ":", stanza[1][1] + elif stanza_type == "patch": + old_fid = stanza[1][1] + new_fid = stanza[2][1] + if not new_files.has_key(new_fid): + new_files[new_fid] = None + if old_fid: + new_files[new_fid] = old_fid + if self.verbosity > 0: + print stanza_type, ":", stanza[1][1],":", stanza[2][1] - def do_export(self, url): - md = MerkleDir(writeable_fs_for_url(url)) + for new_fid, old_fid in new_files.items(): + if old_fid: + if self.verbosity > 0: + print "get_file_delta:",old_fid, new_fid + fdp =self.monotone.get_file_delta_packet(old_fid, new_fid) + if self.verbosity > 0: + print "file_delta (", old_fid, ",", new_fid,"):",fdp + rdata.write(fdp) + else: + if self.verbosity > 0: + print "get file_packet:",new_fid + fpp = self.monotone.get_file_packet(new_fid) + if self.verbosity > 0: + print "file_packet(",new_fid,"):",fpp + rdata.write(fpp) + rdata.write(self.monotone.get_revision_packet(rid)) + return rdata.getvalue() + + def do_export(self, url, callback = None): + if url is None: + md = MemoryMerkleDir() + else: + md = MerkleDir(writeable_fs_for_url(url)) try: md.begin() curr_ids = Set(md.all_ids()) - keys = self.monotone.key_names() - for k in keys: - kp = self.monotone.get_pubkey_packet(k) - id = sha.new(kp).hexdigest() + keys = self.monotone.keys() + for stanza in keys: + keyid = stanza[0][1][0] + publicHash = stanza[1][1][0] + kp = partial(self.monotone.get_pubkey_packet,keyid) + ids = "\n".join((keyid,publicHash)) + id = sha.new(ids).hexdigest() if id not in curr_ids: md.add(id, kp) + if callback: callback(id, "", None) for rid in self.monotone.toposort(self.monotone.revisions_list()): - print "processing revision ", rid if rid not in curr_ids: - rdata = StringIO() - revision_text = self.monotone.get_revision(rid) - revision_parsed = self.monotone.basic_io_parser(revision_text) - new_files = {} - for stanza in revision_parsed: - stanza_type = stanza[0][0] - if stanza_type == "add_file": - new_files[stanza[1][1]] = None - if self.verbosity > 0: - print stanza_type, ":", stanza[1][1] - elif stanza_type == "patch": - old_fid = stanza[1][1] - new_fid = stanza[2][1] - if not new_files.has_key(new_fid): - new_files[new_fid] = None - if old_fid: - new_files[new_fid] = old_fid - if self.verbosity > 0: - print stanza_type, ":", stanza[1][1],":", stanza[2][1] - - for new_fid, old_fid in new_files.items(): - if old_fid: - if self.verbosity > 0: - print "get_file_delta:",old_fid, new_fid - fdp =self.monotone.get_file_delta_packet(old_fid, new_fid) - if self.verbosity > 0: - print "file_delta (", old_fid, ",", new_fid,"):",fdp - rdata.write(fdp) - else: - if self.verbosity > 0: - print "get file_packet:",new_fid - fpp = self.monotone.get_file_packet(new_fid) - if self.verbosity > 0: - print "file_packet(",new_fid,"):",fpp - rdata.write(fpp) - rdata.write(self.monotone.get_revision_packet(rid)) - md.add(rid, rdata.getvalue()) + md.add(rid, partial(self.__make_revision_packet,rid)) + if callback: callback(id, "", None) certs = self.monotone.get_cert_packets(rid) if self.verbosity > 0: print "rev ", rid, " certs:",certs for cert in certs: id = sha.new(cert).hexdigest() if id not in curr_ids: - md.add(id, cert) + md.add(id, returnthis(cert) ) + if callback: callback(id, "", None) md.commit() + if callback: + callback.finish() + return md except LockError: raise except: md.rollback() raise - class CounterCallback: - def __init__(self): + class PushCallback: + def __init__(self, message): + self._message = message + self._count = 0 + + def callback(self, id, data, total): + pass + + def __call__(self, id, data, total): + self._count += 1 + self.printProgress(total) + self.callback(id, data, total) + + def printProgress(self, total = None): + if self._message is None : return + if total is None: + if self._count != 0: + sys.stdout.write("\r" + self._message + " : %i ... " % ( self._count )) + else: + sys.stdout.write(self._message + " ... ") + else: + sys.stdout.write("\r" + self._message + " : %i / %i " % ( self._count, int(total))) + + def finish(self): + if self._message is None or self._count == 0: return + sys.stdout.write("\n") + + class CounterCallback(PushCallback): + def __init__(self, message = None): + Dumbtone.PushCallback.__init__(self,message) self.added = 0 - def __call__(self, id, data): + def callback(self, id, data, total): self.added += 1 - class FeederCallback: - def __init__(self, feeder): + class FeederCallback(PushCallback): + def __init__(self, feeder, message = None): + Dumbtone.PushCallback.__init__(self,message) self.added = 0 self.feeder = feeder - def __call__(self, id, data): + def callback(self, id, data, total): self.added += 1 self.feeder.write(data) - - def do_push(self, local_url, target_url, **kwargs): - print "Exporting changes from monotone db to %s" % (local_url,) - self.do_export(local_url) - print "Pushing changes from %s to %s" % (local_url, target_url) - local_md = MerkleDir(readable_fs_for_url(local_url, **kwargs)) + + def __prepare_local_md(self): + callback = Dumbtone.PushCallback("finding items to synchronize") + memory_md = self.do_export(None,callback) + return memory_md + + def do_push(self,target_url, **kwargs): + print "Pushing changes from DB to %s" % (target_url,) + memory_md = self.__prepare_local_md() + target_md = MerkleDir(writeable_fs_for_url(target_url, **kwargs)) - c = Dumbtone.CounterCallback() - local_md.push(target_md, c) - print "Pushed %s packets to %s" % (c.added, target_url) + callback = Dumbtone.CounterCallback("pushing packets") + memory_md.push(target_md, callback) + + print "Pushed %s packets to %s" % (callback.added, target_url) - def do_pull(self, local_url, source_url, **kwargs): - print "Pulling changes from %s to %s" % (source_url, local_url) - local_md = MerkleDir(writeable_fs_for_url(local_url, **kwargs)) + def do_pull(self, source_url, **kwargs): + print "Pulling changes from %s to database" % (source_url, ) + memory_md = self.__prepare_local_md() source_md = MerkleDir(readable_fs_for_url(source_url, **kwargs)) - self.monotone.ensure_db() + + self.monotone.ensure_db() feeder = self.monotone.feeder(self.verbosity) - fc = Dumbtone.FeederCallback(feeder) - local_md.pull(source_md, fc) + + fc = Dumbtone.FeederCallback(feeder, "pulling packets") + memory_md.pull(source_md, fc) feeder.close() - print "Pulled and imported %s packets from %s" % (fc.added, source_url) + print "Pulled and imported %s packets from %s" % (fc.added, source_url) - def do_sync(self, local_url, other_url, **kwargs): - print "Exporting changes from monotone db to %s" % (local_url,) - self.do_export(local_url) - print "Synchronizing %s and %s" % (local_url, other_url) - local_md = MerkleDir(writeable_fs_for_url(local_url, **kwargs)) + def do_sync(self, other_url, **kwargs): + print "Synchronizing database and %s" % (other_url,) + memory_md = self.__prepare_local_md() other_md = MerkleDir(writeable_fs_for_url(other_url, **kwargs)) feeder = self.monotone.feeder(self.verbosity) - pull_fc = Dumbtone.FeederCallback(feeder) - push_c = Dumbtone.CounterCallback() - local_md.sync(other_md, pull_fc, push_c) + pull_fc = Dumbtone.FeederCallback(feeder, "pulling packets") + push_c = Dumbtone.CounterCallback("pushing packets") + memory_md.sync(other_md, pull_fc, push_c) feeder.close() print "Pulled and imported %s packets from %s" % (pull_fc.added, other_url) print "Pushed %s packets to %s" % (push_c.added, other_url) + ============================================================ --- merkle_dir.py 659631f387cc5e65e18f8f63a1cd32bbc9018d93 +++ merkle_dir.py e44390cd190b70d97e46492f0c84fff137a521dd @@ -323,6 +323,7 @@ class MerkleDir: #### Compressing and adding new items # can only be called from inside a transaction. def add(self, id, data): + data = data() # print ">>>>>>>>>>\n",data,"<<<<<<<<<<<<<<<\n" cp_data = zlib.compress(data) self._add_verbatim(id, cp_data) @@ -388,10 +389,13 @@ class MerkleDir: # we build up a list of all chunks and then fetch them in a single # call, to give the chunk optimized and pipelining maximum # opportunity to work + total = len( new_chunks ) for id, data in self.get_chunks(new_chunks): target._add_verbatim(id, data) if new_chunk_callback is not None: - new_chunk_callback(id, data) + new_chunk_callback(id, data, total) + if new_chunk_callback: + new_chunk_callback.finish() target.commit() except LockError: raise @@ -404,5 +408,128 @@ class MerkleDir: def sync(self, other, new_self_chunk_callback=None, new_other_chunk_callback=None): - self.pull(other, new_self_chunk_callback) + self.pull(other, new_self_chunk_callback) self.push(other, new_other_chunk_callback) + + + +class MemoryMerkleDir(MerkleDir): + def __init__(self): + self._chunksCache = {} + self._hashes = {} + self._root_hash = None + self._ids_to_flush = [] + + def begin(self): + pass + + def commit(self): + self.flush() + + def rollback(self): + pass + + def flush(self): + self._flush_hashes() + + def _add_verbatim(self,id, data): + # memory merkle dir doesn't collect + # DATA, only hashes + pass + + def add(self, id, data): + self._ids_to_flush.append(id) + self._chunksCache[id] = data + + def all_ids(self): + return self._chunksCache.iterkeys() + + def all_chunks(self): + for id, chunkHandle in self._chunksCache.iteritems(): + yield id, self.__realizeChunk(chunkHandle) + + def get_chunks(self, id_locations): + locations_to_ids = {} + for id, location in id_locations: + if location[1] == 0: + # empty chunks ... + yield ( id, "" ) + else: + assert self._chunksCache.has_key(id) + yield ( id, self.__realizeChunk(self._chunksCache[id]) ) + + # + # internal merkle_dir stuff hash tree related + # + + #### Hash fetching machinery -- does caching to avoid multiple fetches + #### during sync + def _get_root_hash(self): + if self._root_hash is not None: + return self._root_hash + self._root_hash = _RootHash() + return self._root_hash + + def _set_root_hash(self, obj): + self._root_hash = obj + + # pass an iterable of prefixes + # returns a dict {prefix -> _ChildHash object} + def _get_child_hashes(self, prefixes): + child_hashes = {} + needed = [] + for prefix in prefixes: + if self._hashes.has_key(prefix): + child_hashes[prefix] = self._hashes[prefix] + else: + needed.append(prefix) + if needed: + for prefix in needed: + ch = _ChildHash() + self._hashes[prefix] = ch + child_hashes[prefix] = ch + return child_hashes + + # pass a dict of prefix -> new child hash obj + # automatically updates root hash as well + def _set_child_hashes(self, objs): + if not objs: + return + root_hash = self._get_root_hash() + for prefix, obj in objs.iteritems(): + self._hashes[prefix] = obj + child_data = obj.export() + new_child_id = sha.new(child_data).hexdigest() + + root_hash.set(prefix, new_child_id) + self._set_root_hash(root_hash) + + #### Cheap hash updating + def _bin(self, ids): + bins = {} + for id in ids: + prefix = id[:2] + if not bins.has_key(prefix): + bins[prefix] = [] + bins[prefix].append((id, (-1,-1))) + return bins + + def _flush_hashes(self): + bins = self._bin(self._ids_to_flush) + child_hashes = self._get_child_hashes(bins.iterkeys()) + for k in bins.iterkeys(): + for id, location in bins[k]: + assert id not in child_hashes[k] + child_hashes[k].assign(id, location) + self._set_child_hashes(child_hashes) + self._ids_to_flush = [] + + # + # private stuff + # + + # realize and compress chunk + def __realizeChunk(self, chunkHandle): + rawValue = chunkHandle() + chunkData = zlib.compress(rawValue) + return chunkData ============================================================ --- monotone.py b368c41d9ef4ab7b698f0d5638fe108c48843b1d +++ monotone.py a8536a1e47938b8d9ceea99c3cc62cf46c8be4bc @@ -122,6 +122,9 @@ class Monotone: assert not curr_packet return packets + def keys(self): + return self.basic_io_parser( self.automate("keys") ) + def key_names(self): output = self.automate("keys") keys_parsed = self.basic_io_parser(output) ============================================================ --- plain.py 4f57fdb4db70aecc41c6fa383c36f4ac44a052ad +++ plain.py 64e2790764e8452918cca06b2e9d8e63b70a8b3a @@ -43,20 +43,6 @@ def readConfig(cfgfile): cfg.read(cfgfile) return cfg -def getTempDir(database): - try: - tmpDir = os.environ['TEMP'] - except KeyError: - try: - tmpDir = os.environ['TMP'] - except KeyError: - tmpDir = "/tmp" - tmpDir = os.path.normpath(tmpDir) - if database[0].isalpha() and database[1] == ':': - database = "_" + database[0] + "_" + database[2:] - database = os.path.normpath(database) - return os.path.join(tmpDir, database + "-mtndumbtemp") - def getDefaultDatabase(): dir = "." while True: @@ -99,7 +85,6 @@ def parseOpt(): is used to decrypt the private key file.""") par.add_option("-d","--db", help="monotone db to use", metavar="STRING") - par.add_option("-l","--local", help="local transit directory", metavar="PATH") par.add_option("--dsskey", help="optional, sftp only. DSS private key file. Can't be specified with --rsakey", metavar="FILE") par.add_option("--rsakey", @@ -160,16 +145,9 @@ def parseOpt(): elif len(args) == 2: url = args[1] else: - par.error("only one remote-URL allowed") - - if options.local is None: - defaultTmpDir = getTempDir(options.db) - if defaultTmpDir is None: - par.error("local transit directory not specified") - options.local = "file:" + defaultTmpDir + par.error("only one remote-URL allowed") - config.set(options.db, "repository", url) - config.set(options.db, "local", options.local) + config.set(options.db, "repository", url) return (options, config, action, url) def saveConfig(options,config): @@ -188,12 +166,13 @@ def main(): "proxy":options.proxy} mtn = Dumbtone(options.db, options.verbose) + if action=="pull": - mtn.do_pull(options.local, url, **optdict) + mtn.do_pull(url, **optdict) elif action=="push": - mtn.do_push(options.local, url, **optdict) + mtn.do_push(url, **optdict) elif action=="sync": - mtn.do_sync(options.local, url, **optdict) + mtn.do_sync(url, **optdict) saveConfig(options,config) ============================================================ --- setup.py cc4b8f4c21488a2f51bb505a8f3e5b9187b03b5d +++ setup.py 2eda98c2f22383d48c9bdf01d7aff388279a3521 @@ -17,7 +17,7 @@ license = "LGPL" and one-way (pull) via http(s) and ftp.""" license = "LGPL" -version = "0.0.4" +version = "0.0.5" _authors = [ 'Nathaniel Smith', 'Zbynek Winkler', 'Riccardo "rghetta"',