# # add_file "test_merkle_dir.py" # # patch "fs.py" # from [d5f1f99911ba72c9b1d4b8dacdd406128b935ca9] # to [65c47ed1a28915fa5fccf7abf5e16ac240dd99a0] # # patch "merkle_dir.py" # from [85669ce90a9c7964717b0dd85d6220c0571eb87a] # to [f4ddf7aac47ab8713431d5b30db4defdb861ef9d] # # patch "test_merkle_dir.py" # from [] # to [17e1616704e2089a055c4e16aa48b8ab3586449e] # ======================================================================== --- fs.py d5f1f99911ba72c9b1d4b8dacdd406128b935ca9 +++ fs.py 65c47ed1a28915fa5fccf7abf5e16ac240dd99a0 @@ -20,12 +20,20 @@ # bytes is an iterable of pairs (offset, length) # this is a generator # it yields nested tuples ((offset, length), data) + # subclasses should implement _real_fetch_bytes which has the same API; + # but will receive massaged (seek-optimized) arguments def fetch_bytes(self, filename, bytes): + # FIXME: implement block coalescing/decoalescing, and sort to optimize + # seeks. + return self._real_fetch_bytes(filename, bytes) + + def _real_fetch_bytes(self, filename, bytes): raise NotImplementedError def exists(self, filename): raise NotImplementedError + # Must return 0 for non-existent files def size(self, filename): raise NotImplementedError @@ -89,7 +97,10 @@ return os.path.exists(self._fname(filename)) def size(self, filename): - return os.stat(self._fname(filename)).st_size + try: + return os.stat(self._fname(filename)).st_size + except OSError: + return 0 class LocalWriteableFs(LocalReadableFS, WriteableFS): def open_append(self, filename): @@ -110,7 +121,7 @@ def mkdir(self, filename): try: os.mkdir(self._fname(filename)) - except IOError: + except OSError: return 0 return 1 ======================================================================== --- merkle_dir.py 85669ce90a9c7964717b0dd85d6220c0571eb87a +++ merkle_dir.py f4ddf7aac47ab8713431d5b30db4defdb861ef9d @@ -1,46 +1,8 @@ import sha import os import os.path import zlib -from sets import Set -def push(from, to, new_chunk_callback=None): - to.begin() - to_root = to.get_root_hash() - from_root = from.get_root_hash() - new_stuff = list(from_root.new_or_different_in_me(to_root)) - from_children = from.get_child_hashes(new_stuff) - to_children = to.get_child_hashes(new_stuff) - locations = {} - for prefix in new_stuff: - for id, location in from_children[prefix].new_in_me(to_children[prefix]): - locations[location] = id - # FIXME: horribly inefficient, need to coalesce adjacent chunks into a - # single read request - # FIXME: we clobber abstraction here - ids_to_add = [] - for from_location, data in from.fs.fetch_bytes("DATA", locations.iterkeys()): - id = locations[from_location] - to_location = to.add(id, data) - ids_to_add.append((id, to_location)) - if new_chunk_callback is not None: - new_chunk_callback(data) - to.add_to_hashes(ids_to_add) - to.commit() - -# calls callback for each chunk added to 'to' -def pull(to, from, new_chunk_callback=None): - return push(from, to, new_chunk_callback=new_chunk_callback) - -# calls callback for each chunk added to 'a' -def sync(a, b, new_chunk_callback=None): - a.begin() - b.begin() - push(b, a, new_chunk_callback=new_chunk_callback) - a.commit() - push(a, b) - b.commit() - class HashFile: prefix = "" values = () @@ -57,7 +19,7 @@ def __in__(self, item): return item in self.items - def assign(self, item, *values): + def assign(self, item, values): assert len(values) == len(self.values) self.items[item] = tuple(values) @@ -67,16 +29,22 @@ def load(self, data): for line in zlib.decompress(data).split("\n"): + if not line: + continue words = line.split() assert len(words) == 2 + len(self.values) assert words[0] == self.prefix - self.set(*words[1:]) + item = words[1] + values = [] + for i in xrange(len(self.values)): + values.append(self.value_type(words[2+i])) + self.assign(item, values) def export(self): lines = [] - for prefix, values in self: - value_txt = " ".join(values) - lines.append("%s %s %s") % (prefix, hash, value_txt) + for key, values in self: + value_txt = " ".join([str(v) for v in values]) + lines.append("%s %s %s" % (self.prefix, key, value_txt)) return zlib.compress("".join(lines)) # yields (key, values) @@ -90,16 +58,18 @@ for key, value in self: if key not in versus: yield key - else if versus.get(key) != value: + elif versus.get(key) != value: yield key class RootHash(HashFile): prefix = "subtree" values = ("hash",) + value_type = str -class ChildHash: +class ChildHash(HashFile): prefix = "chunk" values = ("offset", "length") + value_type = int class LockError(Exception): pass @@ -115,6 +85,7 @@ self._root_hash = None self._data_handle = None self._curr_data_length = None + self._ids_to_flush = [] self._hashes = {} #### Locking @@ -132,11 +103,16 @@ self._locked += 1 def commit(self): - assert self._locked + self._need_lock() + if self._locked == 1: + # have to do this before decrementing, because flushes require + # holding the lock + self.flush() self._locked -= 1 if not self._locked: self._data_handle.close() self._data_handle = None + self._curr_data_length = None self._fs.rmdir(self._lock_file) # This can be called either with or without the lock held (either to @@ -159,10 +135,10 @@ def _get_root_hash(self): if self._root_hash is not None: return self._root_hash - data = self._fs.fetch([self._hashes_prefix]) + data = self._fs.fetch([self._hashes_prefix])[self._hashes_prefix] self._root_hash = RootHash() if data is not None: - self._root_hash.load(data[self._hashes_prefix]) + self._root_hash.load(data) return self._root_hash def _set_root_hash(self, obj): @@ -209,21 +185,24 @@ #### Cheap hash updating def _bin(self, id_locations): bins = {} - for id, location in ids: + for id, location in id_locations: prefix = id[:2] if not bins.has_key(prefix): bins[prefix] = [] bins[prefix].append((id, location)) return bins - def add_to_hashes(self, id_locations): + def _flush_hashes(self): self._need_lock() - bins = self._bin(id_locations) + bins = self._bin(self._ids_to_flush) child_hashes = self._get_child_hashes(bins.iterkeys()) for k in bins.iterkeys(): for id, location in bins[k]: child_hashes[k].assign(id, location) + print ("writing hashes for %s new ids to %s hash files" + % (len(self._ids_to_flush), len(bins))) self._set_child_hashes(child_hashes) + self._ids_to_flush = [] #### Adding new items def add(self, id, data): @@ -232,7 +211,9 @@ self._curr_data_length) length = len(data) self._data_handle.write(data) + location = (self._curr_data_length, length) self._curr_data_length += length + self._ids_to_flush.append((id, location)) #### Getting data back out to outside world # returns an iterator over id, (offset, length) tuples @@ -247,10 +228,47 @@ # (FIXME: perhaps should split this up more; for large chunks (e.g., # initial imports) this will load the entire chunk into memory) def all_chunks(self): - data_handle = self._fs.open_read(self._data_file) - id_to_loc = dict(self._all_chunk_locations()) - for loc, data in self._fs.fetch_bytes(id_to_offset_lens.values()): - yield id_to_loc[loc], data + id_to_locations = dict(self._all_chunk_locations()) + if id_to_locations: + for loc, data in self._fs.fetch_bytes(self._data_file, + id_to_locations.values()): + yield id_to_locations[loc], data def flush(self): + if self._locked: + self._data_handle.flush() + self._flush_hashes() + + def push(self, target, new_chunk_callback=None): + try: + self.flush() + target.begin() + source_root = self._get_root_hash() + target_root = target._get_root_hash() + new_stuff = list(source_root.new_or_different_in_me(target_root)) + source_children = self._get_child_hashes(new_stuff) + target_children = self._get_child_hashes(new_stuff) + locations = {} + for prefix in new_stuff: + new_in_source = source_children[prefix].new_in_me(target_children[prefix]) + for id, location in new_in_source: + locations[location] = id + for source_location, data in self._fs.fetch_bytes(self._data_file, + locations.keys()): + id = locations[source_location] + target.add(id, data) + if new_chunk_callback is not None: + new_chunk_callback(id, data) + target.flush() + target.commit() + except: + target.rollback() + raise + + def pull(self, source, new_chunk_callback=None): + source.push(self, new_chunk_callback=new_chunk_callback) + + def sync(self, other, + new_self_chunk_callback=None, new_other_chunk_callback=None): + self.pull(other, new_self_chunk_callback) + self.push(other, new_other_chunk_callback) - self.data_write_handle.flush() ======================================================================== --- test_merkle_dir.py +++ test_merkle_dir.py 17e1616704e2089a055c4e16aa48b8ab3586449e @@ -0,0 +1,92 @@ +import random +import merkle_dir +import fs +import tempfile +import shutil + +def flip(): + return random.randrange(2) + +def randid(): + return "".join([random.choice("0123456789zbcdef") for i in xrange(40)]) + +def randdata(): + length = random.choice([0, 1, None, None, None, None]) + if length is None: + length = random.randrange(100) + return "".join([chr(random.randrange(256)) for i in xrange(length)]) + +def add_to(md, expected): + md.begin() + num = random.randrange(10) + for i in xrange(num): + newid = randid() + newdata = randdata() + expected[newid] = newdata + md.add(newid, newdata) + md.commit() + print "added %s items to a set" % num + +def check_matches(md, expected): + checked = 0 + for id, data in md.all_chunks(): + assert expected[id] == data + checked += 1 + assert checked == len(expected) + +def run_tests(): + random.seed(0) + + try: + a_dir = tempfile.mkdtemp() + b_dir = tempfile.mkdtemp() + a_fs = fs.LocalWriteableFs(a_dir) + b_fs = fs.LocalWriteableFs(b_dir) + + in_a = {} + in_b = {} + + for i in xrange(1000): + print i + a = merkle_dir.MerkleDir(a_fs) + b = merkle_dir.MerkleDir(b_fs) + if flip(): + add_to(a, in_a) + if flip(): + add_to(b, in_b) + + if flip(): + subject = a + in_subject = in_a + object = b + in_object = in_b + else: + subject = b + in_subject = in_b + object = a + in_object = in_a + + verb = random.choice(["push", "pull", "sync"]) + print verb + if verb == "push": + subject.push(object) + in_object.update(in_subject) + elif verb == "pull": + subject.pull(object) + in_subject.update(in_object) + elif verb == "sync": + subject.sync(object) + in_subject.update(in_object) + in_object.update(in_subject) + + check_matches(a, in_a) + check_matches(b, in_b) + + finally: + #shutil.rmtree(a_dir, ignore_errors=1) + #shutil.rmtree(b_dir, ignore_errors=1) + pass + + +if __name__ == "__main__": + run_tests()