# # # patch "dumb.py" # from [2a39acfe3ae5540b6d5994d5de0664898f2889b6] # to [5dac8e4020f3489b57563ecb96ad2509e9f45901] # # patch "monotone.py" # from [3fb788b372946354426762e6a39e4a901bd78c05] # to [8ff7bf5d80cb703e31d6ee3e494ae979231ad4f5] # ============================================================ --- dumb.py 2a39acfe3ae5540b6d5994d5de0664898f2889b6 +++ dumb.py 5dac8e4020f3489b57563ecb96ad2509e9f45901 @@ -11,10 +11,12 @@ import os.path import os import sys import os.path +import zlib + from cStringIO import StringIO from merkle_dir import MerkleDir, MemoryMerkleDir, LockError from fs import readable_fs_for_url, writeable_fs_for_url -from monotone import Monotone, find_stanza_entry, decode_cert_packet_info +from monotone import Monotone, MonotoneError, find_stanza_entry, decode_cert_packet_info, validate_packet # # DelegateFunctor @@ -207,8 +209,13 @@ class Dumbtone: self.added = 0 self.feeder = feeder def callback(self, id, data, total): - self.added += 1 - self.feeder.write(data) + uncdata = zlib.decompress(data) + self.added += 1 + try: + validate_packet(uncdata) + except MonotoneError,e: + raise MonotoneError(str(e) + "\npacket %s is broken, you must fix destination repository" % id) + self.feeder.write(uncdata) def __prepare_local_md(self, branch_pattern): callback = Dumbtone.PushCallback("finding items to synchronize") @@ -220,8 +227,11 @@ class Dumbtone: memory_md = self.__prepare_local_md(branch_pattern) target_md = MerkleDir(writeable_fs_for_url(target_url, **kwargs)) - callback = Dumbtone.CounterCallback("pushing packets") - memory_md.push(target_md, callback) + try: + callback = Dumbtone.CounterCallback("pushing packets") + memory_md.push(target_md, callback) + finally: + callback.close() print "Pushed %s packets to %s" % (callback.added, target_url) @@ -230,23 +240,30 @@ class Dumbtone: memory_md = self.__prepare_local_md(branch_pattern) source_md = MerkleDir(readable_fs_for_url(source_url, **kwargs)) - self.monotone.ensure_db() - feeder = self.monotone.feeder(self.verbosity) + self.monotone.ensure_db() + try: + feeder = self.monotone.feeder(self.verbosity) - fc = Dumbtone.FeederCallback(feeder, "pulling packets") - memory_md.pull(source_md, fc) - feeder.close() + fc = Dumbtone.FeederCallback(feeder, "pulling packets") + memory_md.pull(source_md, fc) + finally: + fc.finish() + feeder.close() print "Pulled and imported %s packets from %s" % (fc.added, source_url) def do_sync(self, other_url, branch_pattern, **kwargs): print "Synchronizing database and %s" % (other_url,) memory_md = self.__prepare_local_md(branch_pattern) other_md = MerkleDir(writeable_fs_for_url(other_url, **kwargs)) - feeder = self.monotone.feeder(self.verbosity) - pull_fc = Dumbtone.FeederCallback(feeder, "pulling packets") - push_c = Dumbtone.CounterCallback("pushing packets") - memory_md.sync(other_md, pull_fc, push_c) - feeder.close() + try: + feeder = self.monotone.feeder(self.verbosity) + pull_fc = Dumbtone.FeederCallback(feeder, "pulling packets") + push_c = Dumbtone.CounterCallback("pushing packets") + memory_md.sync(other_md, pull_fc, push_c) + finally: + pull_fc.finish() + push_c.finish() + feeder.close() print "Pulled and imported %s packets from %s" % (pull_fc.added, other_url) print "Pushed %s packets to %s" % (push_c.added, other_url) ============================================================ --- monotone.py 3fb788b372946354426762e6a39e4a901bd78c05 +++ monotone.py 8ff7bf5d80cb703e31d6ee3e494ae979231ad4f5 @@ -30,7 +30,7 @@ class Feeder: # this is technically broken; we might deadlock. # subprocess.Popen.communicate uses threads to do this; that'd be # better. - def _write(self, data): + def _write(self, data): if self.process is None: self.process = subprocess.Popen(self.args, stdin=subprocess.PIPE, @@ -40,16 +40,20 @@ class Feeder: if self.verbosity>1: # processing every single call with a new process # to give immediate error reporting - stdout, stderr = self.process.communicate() - print "writing: >>>",data,"<<<\n",stdout,stderr + print "writing: >>>\n",data,"\n<<<\n" + stdout = self.process.communicate() + if self.process.returncode: - raise MonotoneError, stderr + raise MonotoneError, "monotone rejected packets" self.process = None # uncompresses and writes the data - def write(self, data): + def write(self, uncdata): # first, uncompress data - uncdata = zlib.decompress(data) + # feeder mustn't care about mtndumb communication details + # compression is done at dumb/merkle_dir level + # uncdata = zlib.decompress(data) + if self.verbosity > 1: # verbose op, splits the chunk in the individual packets, @@ -117,16 +121,16 @@ class Monotone: return self.automate("get_revision", rid) def get_pubkey_packet(self, keyid): - return self.run_monotone(["pubkey", keyid]) + return check_packet("pubkey", self.run_monotone(["pubkey", keyid]) ) def get_revision_packet(self, rid): - return self.automate("packet_for_rdata", rid) + return check_packet("rdata", self.automate("packet_for_rdata", rid)) def get_file_packet(self, fid): - return self.automate("packet_for_fdata", fid) + return check_packet("fdata", self.automate("packet_for_fdata", fid) ) def get_file_delta_packet(self, old_fid, new_fid): - return self.automate("packet_for_fdelta", old_fid, new_fid) + return check_packet("fdelta", self.automate("packet_for_fdelta", old_fid, new_fid)) def get_cert_packets(self, rid): output = self.automate("packets_for_certs", rid) @@ -135,7 +139,7 @@ class Monotone: for line in output.strip().split("\n"): curr_packet += line + "\n" if line == "[end]": - packets.append(curr_packet) + packets.append(check_packet("rcert",curr_packet)) curr_packet = "" assert not curr_packet return packets @@ -333,6 +337,27 @@ cert_packet_info_re = re.compile(r'^\[rc cert_packet_info_re = re.compile(r'^\[rcert ([0-9a-f]+)\r?\n\s+(\S+)\s*\r?\n\s+(.*)\r?\n') +def check_packet(type, data): + validate_packet(data, type) + return data + +valid_packet_end = re.compile(r'\[end\](\r\n )+$') + +def validate_packet(data, type = None): + valid = True + if data.startswith("error: "): + valid = False + if valid_packet_end.match(data): + valid = False + if valid and type is not None and not data.startswith("[%s " % type): + valid = False + if not valid: + firstline = data.splitlines()[0] + if type is not None: + raise MonotoneError("unexpected or bad packet (wanted %s), got: %s" % (type, firstline)) + else: + raise MonotoneError("unknown packet starting with: %s" % firstline) + def decode_cert_packet_info(cert_packet): m = cert_packet_info_re.match(cert_packet) if not m: