# # patch "fs.py" # from [5fad094cfdf0ff7c95f6a247bf0d375dc6c48e3e] # to [d5f1f99911ba72c9b1d4b8dacdd406128b935ca9] # # patch "merkle_dir.py" # from [1b847afd1eaec69dc1d8c1145f0d2833ddf3609c] # to [85669ce90a9c7964717b0dd85d6220c0571eb87a] # ======================================================================== --- fs.py 5fad094cfdf0ff7c95f6a247bf0d375dc6c48e3e +++ fs.py d5f1f99911ba72c9b1d4b8dacdd406128b935ca9 @@ -26,7 +26,10 @@ def exists(self, filename): raise NotImplementedError + def size(self, filename): + raise NotImplementedError + class WriteableFS (ReadableFS): # returns an object that supports write(), flush(), close() with standard # file object semantics @@ -38,7 +41,12 @@ def put(self, files): raise NotImplementedError - def delete(self, filename): + # in case put() cannot be made atomic (ftp, ntfs, maybe other situations), + # puts should still be done in a way that they can be rolled back. + # This function checks for puts that were not completed, and if any are + # found, rolls them back. + # files is an iterable of filenames + def rollback_interrupted_puts(self, filenames): raise NotImplementedError # returns true if mkdir succeeded, false if failed @@ -49,8 +57,6 @@ def rmdir(self, filename): raise NotImplementedError - def truncate(self, filename, length): - raise NotImplementedError class LocalReadableFS(ReadableFS): def __init__(self, dir): @@ -82,6 +88,9 @@ def exists(self, filename): return os.path.exists(self._fname(filename)) + def size(self, filename): + return os.stat(self._fname(filename)).st_size + class LocalWriteableFs(LocalReadableFS, WriteableFS): def open_append(self, filename): return open(self._fname(filename), "ab") @@ -94,8 +103,9 @@ tmph.close() os.rename(tmpname, self._fname(fn)) - def delete(self, filename): - os.unlink(self._fname(filename)) + def rollback_interrupted_puts(self, filenames): + # we have atomic put + pass def mkdir(self, filename): try: @@ -106,6 +116,3 @@ def rmdir(self, filename): os.rmdir(self._fname(filename)) - - def truncate(self, filename, length): - open(self._fname(filename), "ab").truncate(length) ======================================================================== --- merkle_dir.py 1b847afd1eaec69dc1d8c1145f0d2833ddf3609c +++ merkle_dir.py 85669ce90a9c7964717b0dd85d6220c0571eb87a @@ -5,8 +5,7 @@ from sets import Set def push(from, to, new_chunk_callback=None): - # FIXME: transactionalness - to.lock() + to.begin() to_root = to.get_root_hash() from_root = from.get_root_hash() new_stuff = list(from_root.new_or_different_in_me(to_root)) @@ -27,7 +26,7 @@ if new_chunk_callback is not None: new_chunk_callback(data) to.add_to_hashes(ids_to_add) - to.unlock() + to.commit() # calls callback for each chunk added to 'to' def pull(to, from, new_chunk_callback=None): @@ -35,13 +34,12 @@ # calls callback for each chunk added to 'a' def sync(a, b, new_chunk_callback=None): - # FIXME: transactionalness - a.lock() - b.lock() + a.begin() + b.begin() push(b, a, new_chunk_callback=new_chunk_callback) + a.commit() push(a, b) - b.unlock() - a.unlock() + b.commit() class HashFile: prefix = "" @@ -107,140 +105,109 @@ pass class MerkleDir: - data_file = "DATA" - index_file = "INDEX" - lengths_file = "LENGTHS" - hashes_prefix = "HASHES_" - lock_file = "_lock" - rollback_file = "_rollback" + _data_file = "DATA" + _hashes_prefix = "HASHES_" + _lock_file = "_lock" def __init__(self, fs): - self.fs = fs - self.locked = 0 - self.add_open = 0 - self.added_ids = Set() - self.root_hash = None - self.data_handle = None - self.index_handle = None - self.curr_data_length = None - self.curr_index_length = None - self.hashes = {} + self._fs = fs + self._locked = 0 + self._root_hash = None + self._data_handle = None + self._curr_data_length = None + self._hashes = {} #### Locking - def need_lock(self): - assert self.locked + def _need_lock(self): + assert self._locked def begin(self): - if not self.locked: - if not self.fs.mkdir(self.lock_file): + if not self._locked: + if not self._fs.mkdir(self._lock_file): raise LockError # okay, we succeeded in getting the write lock. Let's open things # up. - lengths_data = self.fs.fetch([self.lengths_file])[self.lengths_file] - self.curr_data_length, self.curr_index_length = lengths_data.strip().split() - self.fs.put({self.rollback_file: lengths_data}) - self.data_handle = self.fs.open_append(self.data_file) - self.index_handle = self.fs.open_append(self.index_file) - self.locked += 1 + self._curr_data_length = self._fs.size(self._data_file) + self._data_handle = self._fs.open_append(self._data_file) + self._locked += 1 def commit(self): - assert self.locked - self.locked -= 1 - if not self.locked: - self.data_handle.close() - self.data_handle = None - self.index_handle.close() - self.index_handle = None - self.fs.put({self.lengths_file: - "%s %s\n" % (self.curr_data_length, - self.curr_index_length)}) - # if we've gotten this far, we're in a fully consistent state - self.fs.delete(self.rollback_file) - # FIXME: cleanup backup hash files here - self.fs.rmdir(self.lock_file) + assert self._locked + self._locked -= 1 + if not self._locked: + self._data_handle.close() + self._data_handle = None + self._fs.rmdir(self._lock_file) # This can be called either with or without the lock held (either to # cleanup after ourself, or after someone else) def rollback(self): - self.locked = 0 - if self.fs.mkdir(self.lock_file): + self._locked = 0 + if self._fs.mkdir(self._lock_file): # no cleanup to do anyway... - self.fs.rmdir(self.lock_file) + self._fs.rmdir(self._lock_file) return - old_lengths_data = self.fs.fetch([self.rollback_file])[self.rollback_file] - if old_lengths_data is None: - # we got so far in rolling back before that _all_ we have left is - # the lockdir. so we can just delete it and be done. - self.fs.rmdir(self.lock_file) - return - old_data_length, old_index_length = old_lengths_data.strip().split() - self.fs.put({self.lengths_file: old_lengths_data}) - try: - self.fs.truncate(self.index_file, old_index_length) - self.fs.truncate(self.data_file, old_data_length) - except NotImplementedError: - raise NotImplementedError, "rollback not supported on this backend" - self.rehash_from_scratch() - self.delete(self.rollback_file) - self.fs.rmdir(self.lock_file) + all_hash_files = [self._hashes_prefix] + for p1 in "012346789abcdef": + for p2 in "012346789abcdef": + all_hash_files.append(self._hashes_prefix + p1 + p2) + self._fs.rollback_interrupted_puts(all_hash_files) + self._fs.rmdir(self._lock_file) - def __del__(self): - if self.locked: - self.rollback() - #### Hash fetching machinery -- does caching to avoid multiple fetches #### during sync - def get_root_hash(self): - if self.root_hash is not None: - return self.root_hash - data = self.fs.fetch([self.hashes_prefix]) - self.root_hash = RootHash() + def _get_root_hash(self): + if self._root_hash is not None: + return self._root_hash + data = self._fs.fetch([self._hashes_prefix]) + self._root_hash = RootHash() if data is not None: - self.root_hash.load(data[self.hashes_prefix]) + self._root_hash.load(data[self._hashes_prefix]) + return self._root_hash - def set_root_hash(self, obj): - self.need_lock() - self.root_hash = obj - self.fs.put({self.hashes_prefix: obj.export()}) + def _set_root_hash(self, obj): + self._need_lock() + self._root_hash = obj + self._fs.put({self._hashes_prefix: obj.export()}) # pass an iterable of prefixes # returns a dict {prefix -> ChildHash object} - def get_child_hashes(self, prefixes): + def _get_child_hashes(self, prefixes): child_hashes = {} needed = [] for prefix in prefixes: - if self.hashes.has_key(prefix): - child_hashes[prefix] = self.hashes[prefix] + if self._hashes.has_key(prefix): + child_hashes[prefix] = self._hashes[prefix] else: needed.append(prefix) if needed: - datas = self.fs.fetch([self.hashes_prefix + n for n in needed]) + datas = self._fs.fetch([self._hashes_prefix + n for n in needed]) for fname, data in datas.items(): ch = ChildHash() if data is not None: ch.load(data) - prefix = fname[len(self.hashes_prefix):] - self.hashes[prefix] = ch + prefix = fname[len(self._hashes_prefix):] + self._hashes[prefix] = ch child_hashes[prefix] = ch return child_hashes # pass a dict of prefix -> new child hash obj # automatically updates root hash as well - def set_child_hashes(self, objs): - self.need_lock() - root_hash = self.get_root_hash() + def _set_child_hashes(self, objs): + self._need_lock() + root_hash = self._get_root_hash() put_request = {} for prefix, obj in objs.iteritems(): - self.hashes[prefix] = obj + self._hashes[prefix] = obj child_data = obj.export() new_child_id = sha.new(child_data).hexdigest() - put_request[self.hashes_prefix + prefix] = child_data + put_request[self._hashes_prefix + prefix] = child_data root_hash.set(prefix, new_child_id) - self.fs.put(put_request) - self.set_root_hash(root_hash) + self._fs.put(put_request) + self._set_root_hash(root_hash) #### Cheap hash updating - def bin(self, id_locations): + def _bin(self, id_locations): bins = {} for id, location in ids: prefix = id[:2] @@ -250,81 +217,40 @@ return bins def add_to_hashes(self, id_locations): - self.need_lock() - bins = self.bin(id_locations) - child_hashes = self.get_child_hashes(bins.iterkeys()) + self._need_lock() + bins = self._bin(id_locations) + child_hashes = self._get_child_hashes(bins.iterkeys()) for k in bins.iterkeys(): for id, location in bins[k]: child_hashes[k].assign(id, location) - self.set_child_hashes(child_hashes) + self._set_child_hashes(child_hashes) #### Adding new items def add(self, id, data): - self.need_lock() - assert None not in (self.data_handle, - self.index_handle, - self.curr_data_length, - self.curr_index_length) + self._need_lock() + assert None not in (self._data_handle, + self._curr_data_length) length = len(data) - index_text = "%s %s %s\n" % (id, self.curr_data_length, length) - self.data_handle.write(data) - self.index_handle.write(index_text) - self.curr_index_length += len(index_text) - self.curr_data_length += length + self._data_handle.write(data) + self._curr_data_length += length #### Getting data back out to outside world - # returns an iterator over id, offset, length tuples - def read_index(self): - index_handle = self.fs.open_read(self.index_file) - # can't assume that fs handles have line iterators - remainder = "" - for block in iter(lambda: index_handle.read(1024), None): - all = remainder + block - lines = all.split("\n") - remainder = lines[-1] - for line in lines[:-1]: - id, offset, length = line.split() - yield (id, offset, length) - - # returns an iterator over chunk texts (FIXME: perhaps should split this - # up more; for large chunks (e.g., initial imports) this will load the - # entire chunk into memory) - def all_chunks(self): - data_handle = self.fs.open_read(self.data_file) - total_read = 0 - for id, offset, length in self.read_index(): - assert total_read == offset - yield data_handle.read(length) - total_read += length - assert remainder == "" + # returns an iterator over id, (offset, length) tuples + def _all_chunk_locations(self): + prefixes = [prefix for (prefix, _) in self._get_root_hash()] + all_children = self._get_child_hashes(prefixes) + for child_hashes in all_children.values(): + for id_location in child_hashes: + yield id_location - #### Fixing things up - def rehash_from_scratch(self): - self.need_lock() - # clear out old hashes - child_hashes = {} - for a in "01234567890abcdef": - for b in "01234567890abcdef": - child_hashes[a+b] = ChildHash() - self.set_child_hashes(child_hashes) - # update from scratch - self.add_to_hashes(self.read_index()) + # returns an iterator over (chunk id, chunk text) + # (FIXME: perhaps should split this up more; for large chunks (e.g., + # initial imports) this will load the entire chunk into memory) + def all_chunks(self): + data_handle = self._fs.open_read(self._data_file) + id_to_loc = dict(self._all_chunk_locations()) + for loc, data in self._fs.fetch_bytes(id_to_offset_lens.values()): + yield id_to_loc[loc], data - def add_to_index(self, id, offset, length): - assert not self.ids.has_key(id) - self.ids[id] = (offset, length) - self.index_write_handle.write("%s %s %s\n" % (id, offset, length)) - - def add(self, id): - assert not self.add_open - if self.ids.has_key(id): - return None - else: - self.add_open = 1 - return MerkleAdder(self, id) - def flush(self): - length = self.data_write_handle.tell() - open(os.path.join(self.dir, self.data_length_file), "w").write(str(length)) self.data_write_handle.flush() - self.index_write_handle.flush()