# # patch "dumb.py" # from [20b825f392c97aadca36cf47a3a107baca3fd066] # to [2764df7b55b2fd152e85bc9dc14b80ca7de9d17e] # # patch "fs.py" # from [60de5c0b4413eda992dcf027373eab82a9606f7c] # to [c40dfe07942db6809afe07f1e8aabe54ab0ddc99] # # patch "merkle_dir.py" # from [a7bca4b74638296707f43c49f36bb8da7b475a84] # to [e16ef97eb84040670c8e2f0ac52a05f65fdffff9] # ======================================================================== --- dumb.py 20b825f392c97aadca36cf47a3a107baca3fd066 +++ dumb.py 2764df7b55b2fd152e85bc9dc14b80ca7de9d17e @@ -42,10 +42,23 @@ # some are unused?) # finally, there's a file VERSION, which contains the sha1 of the string # "\0\0...\0", plus a newline. -# finally2, there's a file called DATA_LENGTH, which contains the size of -# the file DATA in bytes. (this is used to synthesize offsets into it for -# remotely appended data.) +# finally2, there's a file called LENGTH, which contains the size of +# the file DATA in bytes, and then the size of the file INDEX in bytes. +# sometimes there is a file _rollback, which is a copy of LENGTH made before +# starting a transaction. the way this works is that if a transaction gets +# uncleanly aborted (which we try to make sure doesn't happen, because it is +# icky), then we use this file to do rollback. rollback can only be done +# locally -- the way it works is that we truncate DATA and INDEX to the +# lengths given in _rollback, replace LENGTH by _rollback, and regenerate all +# hashes from scratch. +# actually, SFTP supports truncate (paramiko doesn't expose it, but all you +# have to do is create a SFTPAttributes object, set its size field, and send a +# setstat request on it -- see implementation of paramiko's chmod, for +# similar). which means that it would be better to not have to regenerate all +# hash files from scratch. various options... move them out of the way, don't +# delete them until everything is committed, perhaps. + # How a pull works: # -- pull VERSION, see if it matches ours. (note down what it says for # later.) ======================================================================== --- fs.py 60de5c0b4413eda992dcf027373eab82a9606f7c +++ fs.py c40dfe07942db6809afe07f1e8aabe54ab0ddc99 @@ -13,6 +13,7 @@ # takes an iterable of filenames # returns a map {filename -> contents of file} + # if some file does not exist, will have a None for contents def fetch(self, filenames): raise NotImplementedError @@ -33,8 +34,8 @@ raise NotImplementedError # files is a map {filename -> contents of file} - # this operation must be atomic - def replace(self, files): + # this operation must be atomic and clobbering + def put(self, files): raise NotImplementedError def delete(self, filename): @@ -48,7 +49,7 @@ def rmdir(self, filename): raise NotImplementedError -class LocalFS (WriteableFS: +class LocalReadableFS(ReadableFS): def __init__(self, dir): self.dir = dir @@ -61,9 +62,12 @@ def fetch(self, filenames): files = {} for fn in filenames: - f = open(self._fname(fn), "rb") - files[fn] = f.read() - f.close() + try: + f = open(self._fname(fn), "rb") + files[fn] = f.read() + f.close() + except IOError: + files[fn] = None return files def fetch_bytes(self, filename, bytes): @@ -75,10 +79,11 @@ def exists(self, filename): return os.path.exists(self._fname(filename)) +class LocalWriteableFs(LocalReadableFS, WriteableFS): def open_append(self, filename): return open(self._fname(filename), "ab") - def replace(self, filenames): + def put(self, filenames): for fn, data in filenames.iteritems(): tmpname = self._fname("__tmp") tmph = open(tmpname, "wb") ======================================================================== --- merkle_dir.py a7bca4b74638296707f43c49f36bb8da7b475a84 +++ merkle_dir.py e16ef97eb84040670c8e2f0ac52a05f65fdffff9 @@ -1,9 +1,105 @@ import sha import os import os.path import zlib -import glob +from sets import Set +def push(from, to, new_chunk_callback=None): + # FIXME: transactionalness + to.lock() + to_root = to.get_root_hash() + from_root = from.get_root_hash() + new_stuff = list(from_root.new_or_different_in_me(to_root)) + from_children = from.get_child_hashes(new_stuff) + to_children = to.get_child_hashes(new_stuff) + locations = {} + for prefix in new_stuff: + for id, location in from_children[prefix].new_in_me(to_children[prefix]): + locations[location] = id + # FIXME: horribly inefficient, need to coalesce adjacent chunks into a + # single read request + # FIXME: we clobber abstraction here + for location, data in from.fs.fetch_bytes("DATA", locations.iterkeys()): + to.add(locations[location], data) + if new_chunk_callback is not None: + new_chunk_callback(data) + to.add_to_hashes(zip(locations.values(), locations.keys())) + to.unlock() + +# calls callback for each chunk added to 'to' +def pull(to, from, new_chunk_callback=None): + return push(from, to, new_chunk_callback=new_chunk_callback) + +# calls callback for each chunk added to 'a' +def sync(a, b, new_chunk_callback=None): + # FIXME: transactionalness + a.lock() + b.lock() + push(b, a, new_chunk_callback=new_chunk_callback) + push(a, b) + b.unlock() + a.unlock() + +class HashFile: + prefix = "" + values = () + + def __init__(self): + self.items = {} + + def __iter__(self): + return self.items.iteritems() + + def get(self, item): + return self.items[item] + + def __in__(self, item): + return item in self.items + + def assign(self, item, *values): + assert len(values) == len(self.values) + self.items[item] = tuple(values) + + def set(self, *values): + assert len(values) == len(self.values) + 1 + self.items[values[0]] = tuple(values[1:]) + + def load(self, data): + for line in zlib.decompress(data).split("\n"): + words = line.split() + assert len(words) == 2 + len(self.values) + assert words[0] == self.prefix + self.set(*words[1:]) + + def export(self): + lines = [] + for prefix, values in self: + value_txt = " ".join(values) + lines.append("%s %s %s") % (prefix, hash, value_txt) + return zlib.compress("".join(lines)) + + # yields (key, values) + def new_in_me(self, versus): + for key, value in self: + if key not in versus: + yield (key, value) + + # yields keys. yes this is inconsistent. but handy. + def new_or_different_in_me(self, versus): + for key, value in self: + if key not in versus: + yield key + else if versus.get(key) != value: + yield key + +class RootHash(HashFile): + prefix = "subtree" + values = ("hash",) + +class ChildHash: + prefix = "chunk" + values = ("offset", "length") + class LockError(Exception): pass @@ -12,51 +108,148 @@ index_file = "INDEX" data_length_file = "DATA_LENGTH" lock_file = "_lock" - lock_info_file = "_lock_info" hashes_prefix = "HASHES_" - def __init__(self, directory): - self.dir = directory - if not os.path.isdir(self.dir): - os.makedirs(self.dir) - if not os.path.exists(os.path.join(self.dir, self.data_file)): - open(os.path.join(self.dir, self.data_file), "w").close() - if not os.path.exists(os.path.join(self.dir, self.index_file)): - open(os.path.join(self.dir, self.index_file), "w").close() - # dict: id -> (offset, length) - self.index_write_handle = open(os.path.join(self.dir, self.index_file), "a") - self.data_write_handle = open(os.path.join(self.dir, self.data_file), "ab") + def __init__(self, fs): + self.fs = fs + self.locked = 0 self.add_open = 0 - self.reread_index() + self.added_ids = Set() + self.root_hash = None + self.hashes = {} + #### Locking + def need_lock(self): + assert self.locked + + def lock(self): + if not self.locked: + if not self.fs.mkdir(self.lock_file): + raise LockError + self.locked += 1 + + def unlock(self): + assert self.locked + self.locked -= 1 + if not self.locked: + self.fs.rmdir(self.lock_file) + def __del__(self): - self.flush() - self.unlock() + if self.locked: + self.unlock() - # returns an iterator over (id, offset, length) - def chunk_locations(self): - self.flush() - handle = open(os.path.join(self.dir, self.index_file)) - for line in handle: - id, offset, length = line.split() - yield (id, offset, length) + #### Hash fetching machinery -- does caching to avoid multiple fetches + #### during sync + def get_root_hash(self): + if self.root_hash is not None: + return self.root_hash + data = self.fs.fetch([self.hashes_prefix]) + self.root_hash = RootHash() + if data is not None: + self.root_hash.load(data[self.hashes_prefix]) - # returns an iterator over data chunks - def chunks(self): - self.flush() - handle = open(os.path.join(self.dir, self.data_file)) - curr_offset = 0 - for id, offset, length in self.chunk_locations(): - assert curr_offset == offset - cdata = self.data_file.read(length) - curr_offset += length - yield zlib.decompress(cdata) + def set_root_hash(self, obj): + self.need_lock() + self.root_hash = obj + self.fs.put({self.hashes_prefix: obj.export()}) + + # pass an iterable of prefixes + # returns a dict {prefix -> ChildHash object} + def get_child_hashes(self, prefixes): + child_hashes = {} + needed = [] + for prefix in prefixes: + if self.hashes.has_key(prefix): + child_hashes[prefix] = self.hashes[prefix] + else: + needed.append(prefix) + if needed: + datas = self.fs.fetch([self.hashes_prefix + n for n in needed]) + for fname, data in datas.items(): + ch = ChildHash() + if data is not None: + ch.load(data) + prefix = fname[len(self.hashes_prefix):] + self.hashes[prefix] = ch + child_hashes[prefix] = ch + return child_hashes + + # pass a dict of prefix -> new child hash obj + # automatically updates root hash as well + def set_child_hashes(self, objs): + self.need_lock() + root_hash = self.get_root_hash() + put_request = {} + for prefix, obj in objs.iteritems(): + self.hashes[prefix] = obj + child_data = obj.export() + new_child_id = sha.new(child_data).hexdigest() + put_request[self.hashes_prefix + prefix] = child_data + root_hash.set(prefix, new_child_id) + self.fs.put(put_request) + self.set_root_hash(root_hash) + + #### Cheap hash updating + def bin(self, id_locations): + bins = {} + for id, location in ids: + prefix = id[:2] + if not bins.has_key(prefix): + bins[prefix] = [] + bins[prefix].append((id, location)) + return bins + + def add_to_hashes(self, id_locations): + self.need_lock() + bins = self.bin(id_locations) + child_hashes = self.get_child_hashes(bins.iterkeys()) + for k in bins.iterkeys(): + for id, location in bins[k]: + child_hashes[k].assign(id, location) + self.set_child_hashes(child_hashes) + + #### Adding new items + def add(self, id, data): - def reread_index(self): - self.ids = {} - for id, offset, length in self.chunk_locations(): - self.ids[id] = (int(offset), int(length)) + #### Getting data back out to outside world + # returns an iterator over id, offset, length tuples + def read_index(self): + index_handle = self.fs.open_read(self.index_file) + # can't assume that fs handles have line iterators + remainder = "" + for block in iter(lambda: index_handle.read(1024), None): + all = remainder + block + lines = all.split("\n") + remainder = lines[-1] + for line in lines[:-1]: + id, offset, length = line.split() + yield (id, offset, length) + + # returns an iterator over chunk texts (FIXME: perhaps should split this + # up more; for large chunks (e.g., initial imports) this will load the + # entire chunk into memory) + def all_chunks(self): + data_handle = self.fs.open_read(self.data_file) + total_read = 0 + for id, offset, length in self.read_index(): + assert total_read == offset + yield data_handle.read(length) + total_read += length + assert remainder == "" + + #### Fixing things up + def rehash_from_scratch(self): + self.need_lock() + # clear out old hashes + child_hashes = {} + for a in "01234567890abcdef": + for b in "01234567890abcdef": + child_hashes[a+b] = ChildHash() + self.set_child_hashes(child_hashes) + # update from scratch + self.add_to_hashes(self.read_index()) + def add_to_index(self, id, offset, length): assert not self.ids.has_key(id) self.ids[id] = (offset, length) @@ -75,61 +268,3 @@ open(os.path.join(self.dir, self.data_length_file), "w").write(str(length)) self.data_write_handle.flush() self.index_write_handle.flush() - - def rehash(self): - self.flush() - old_hashes = glob.glob(os.path.join(self.dir, self.hashes_prefix + "*")) - for old in old_hashes: - os.unlink(old) - # We only do two levels of merkle hashing; with a branching factor of - # 256, this gives 65536 bins for the actual ids, which should give us - # a reasonable loading factor even for large repos. - self.binned_indexes = {} - for id in self.ids.iterkeys(): - bin = id[:2] - if not self.binned_indexes.has_key(bin): - self.binned_indexes[bin] = [] - self.binned_indexes[bin].append(id) - bin_hashes = {} - for bin, ids in self.binned_indexes.iteritems(): - handle = HashWriter(os.path.join(self.dir, self.hashes_prefix + bin)) - ids.sort() - for id in ids: - handle.write("chunk %s %s %s\n" % ((id,) + self.ids[id])) - handle.close() - bin_hashes[bin] = handle.hash() - root_hashes = "" - for bin, hash in bin_hashes.iteritems(): - root_hashes += "subtree %s %s %s" % (bin, hash, self.hashes_prefix + bin) - open(os.path.join(self.dir, self.hashes_prefix), "wb").write(zlib.compress(root_hashes)) - -class HashWriter: - def __init__(self, filename): - self.sha = sha.new() - self.file = open(filename, "wb") - self.compressor = zlib.compressobj() - def write(self, data): - self.sha.update(data) - self.file.write(self.compressor.compress(data)) - def close(self): - self.file.write(self.compressor.flush()) - self.file.close() - def hash(self): - return self.sha.hexdigest() - -class MerkleAdder: - def __init__(self, store, id): - self.store = store - self.offset = self.store.data_write_handle.tell() - self.id = id - self.compressor = zlib.compressobj() - def write(self, data): - compressed_data = self.compressor.compress(data) - self.store.data_write_handle.write(compressed_data) - def close(self): - last_data = self.compressor.flush() - self.store.data_write_handle.write(last_data) - length = self.store.data_write_handle.tell() - self.offset - self.store.add_to_index(self.id, self.offset, length) - self.store.add_open = 0 -