# # patch "dumb.py" # from [2764df7b55b2fd152e85bc9dc14b80ca7de9d17e] # to [86cc3fff44c25f7d941346c8b0ab44259d69b20d] # # patch "fs.py" # from [c40dfe07942db6809afe07f1e8aabe54ab0ddc99] # to [5fad094cfdf0ff7c95f6a247bf0d375dc6c48e3e] # # patch "merkle_dir.py" # from [e16ef97eb84040670c8e2f0ac52a05f65fdffff9] # to [1b847afd1eaec69dc1d8c1145f0d2833ddf3609c] # ======================================================================== --- dumb.py 2764df7b55b2fd152e85bc9dc14b80ca7de9d17e +++ dumb.py 86cc3fff44c25f7d941346c8b0ab44259d69b20d @@ -58,6 +58,10 @@ # similar). which means that it would be better to not have to regenerate all # hash files from scratch. various options... move them out of the way, don't # delete them until everything is committed, perhaps. +# +# hmm... even if we don't have truncate, we should be able to restore original +# hash files, at which point, we have random junk in DATA and INDEX but it +# doesn't matter too much... INDEX will become corrupted, of course. # How a pull works: # -- pull VERSION, see if it matches ours. (note down what it says for ======================================================================== --- fs.py c40dfe07942db6809afe07f1e8aabe54ab0ddc99 +++ fs.py 5fad094cfdf0ff7c95f6a247bf0d375dc6c48e3e @@ -49,6 +49,9 @@ def rmdir(self, filename): raise NotImplementedError + def truncate(self, filename, length): + raise NotImplementedError + class LocalReadableFS(ReadableFS): def __init__(self, dir): self.dir = dir @@ -103,3 +106,6 @@ def rmdir(self, filename): os.rmdir(self._fname(filename)) + + def truncate(self, filename, length): + open(self._fname(filename), "ab").truncate(length) ======================================================================== --- merkle_dir.py e16ef97eb84040670c8e2f0ac52a05f65fdffff9 +++ merkle_dir.py 1b847afd1eaec69dc1d8c1145f0d2833ddf3609c @@ -19,11 +19,14 @@ # FIXME: horribly inefficient, need to coalesce adjacent chunks into a # single read request # FIXME: we clobber abstraction here - for location, data in from.fs.fetch_bytes("DATA", locations.iterkeys()): - to.add(locations[location], data) + ids_to_add = [] + for from_location, data in from.fs.fetch_bytes("DATA", locations.iterkeys()): + id = locations[from_location] + to_location = to.add(id, data) + ids_to_add.append((id, to_location)) if new_chunk_callback is not None: new_chunk_callback(data) - to.add_to_hashes(zip(locations.values(), locations.keys())) + to.add_to_hashes(ids_to_add) to.unlock() # calls callback for each chunk added to 'to' @@ -106,9 +109,10 @@ class MerkleDir: data_file = "DATA" index_file = "INDEX" - data_length_file = "DATA_LENGTH" + lengths_file = "LENGTHS" + hashes_prefix = "HASHES_" lock_file = "_lock" - hashes_prefix = "HASHES_" + rollback_file = "_rollback" def __init__(self, fs): self.fs = fs @@ -116,27 +120,73 @@ self.add_open = 0 self.added_ids = Set() self.root_hash = None + self.data_handle = None + self.index_handle = None + self.curr_data_length = None + self.curr_index_length = None self.hashes = {} #### Locking def need_lock(self): assert self.locked - def lock(self): + def begin(self): if not self.locked: if not self.fs.mkdir(self.lock_file): raise LockError + # okay, we succeeded in getting the write lock. Let's open things + # up. + lengths_data = self.fs.fetch([self.lengths_file])[self.lengths_file] + self.curr_data_length, self.curr_index_length = lengths_data.strip().split() + self.fs.put({self.rollback_file: lengths_data}) + self.data_handle = self.fs.open_append(self.data_file) + self.index_handle = self.fs.open_append(self.index_file) self.locked += 1 - - def unlock(self): + + def commit(self): assert self.locked self.locked -= 1 if not self.locked: + self.data_handle.close() + self.data_handle = None + self.index_handle.close() + self.index_handle = None + self.fs.put({self.lengths_file: + "%s %s\n" % (self.curr_data_length, + self.curr_index_length)}) + # if we've gotten this far, we're in a fully consistent state + self.fs.delete(self.rollback_file) + # FIXME: cleanup backup hash files here self.fs.rmdir(self.lock_file) + # This can be called either with or without the lock held (either to + # cleanup after ourself, or after someone else) + def rollback(self): + self.locked = 0 + if self.fs.mkdir(self.lock_file): + # no cleanup to do anyway... + self.fs.rmdir(self.lock_file) + return + old_lengths_data = self.fs.fetch([self.rollback_file])[self.rollback_file] + if old_lengths_data is None: + # we got so far in rolling back before that _all_ we have left is + # the lockdir. so we can just delete it and be done. + self.fs.rmdir(self.lock_file) + return + old_data_length, old_index_length = old_lengths_data.strip().split() + self.fs.put({self.lengths_file: old_lengths_data}) + try: + self.fs.truncate(self.index_file, old_index_length) + self.fs.truncate(self.data_file, old_data_length) + except NotImplementedError: + raise NotImplementedError, "rollback not supported on this backend" + self.rehash_from_scratch() + self.delete(self.rollback_file) + self.fs.rmdir(self.lock_file) + def __del__(self): if self.locked: - self.unlock() + self.rollback() #### Hash fetching machinery -- does caching to avoid multiple fetches #### during sync @@ -210,7 +260,17 @@ #### Adding new items def add(self, id, data): - + self.need_lock() + assert None not in (self.data_handle, + self.index_handle, + self.curr_data_length, + self.curr_index_length) + length = len(data) + index_text = "%s %s %s\n" % (id, self.curr_data_length, length) + self.data_handle.write(data) + self.index_handle.write(index_text) + self.curr_index_length += len(index_text) + self.curr_data_length += length #### Getting data back out to outside world # returns an iterator over id, offset, length tuples