# # # patch "ChangeLog" # from [5d9e0acf1f11effa5a3ac2f85b5c78c13f8bd0c8] # to [a49da6c846f0588ab2f9a672118f2b2d30825544] # # patch "contrib/color-logs.sh" # from [4e04daeee4ee8ce8bf6ce45f074ba16faf50ccf9] # to [20148e22f20bf0249432a69990add9b054016b78] # # patch "enumerator.cc" # from [9cd3afa7ef779cb7b0c53a053a9567b57b59f660] # to [43edc729be97fabc35880f79fbea7640d109bea0] # # patch "enumerator.hh" # from [6e6ecbf59d43a8e4bf4de07008a8795bd18cd960] # to [4f3ce29808f2bf2c0f65701e64ae379e369ab9fb] # # patch "netsync.cc" # from [838ea8e645fbc5694a6ac0e81ee0e5c8dde36ede] # to [9e577332850f154ed42b8a3104b57d0c1406f788] # # patch "refiner.hh" # from [9f724877248fc986aed3c2de8fa39d5a27f934d1] # to [785a738e8476f63f8f8b0c5ecfee085dbd764ade] # ============================================================ --- ChangeLog 5d9e0acf1f11effa5a3ac2f85b5c78c13f8bd0c8 +++ ChangeLog a49da6c846f0588ab2f9a672118f2b2d30825544 @@ -1,3 +1,10 @@ +2006-04-28 Graydon Hoare + + * contrib/color-logs.sh: Update to 'mtn'. + * enumerator.{cc,hh}: Expose parent map, cache certs. + * netsync.cc: Use caches in enumerator and refiners. + * refiner.hh (local_item_exists): New helper. + 2006-04-28 Matthew Gregan * database.cc (query::query): Allow query to be constructed ============================================================ --- contrib/color-logs.sh 4e04daeee4ee8ce8bf6ce45f074ba16faf50ccf9 +++ contrib/color-logs.sh 20148e22f20bf0249432a69990add9b054016b78 @@ -1,5 +1,5 @@ #!/bin/sh -./monotone log --diffs --no-merges $@ \ +./mtn log --diffs --no-merges $@ \ | ./contrib/colorize -c contrib/color-logs.conf \ | less -r -p ----------------------------------------------------------------- ============================================================ --- enumerator.cc 9cd3afa7ef779cb7b0c53a053a9567b57b59f660 +++ enumerator.cc 43edc729be97fabc35880f79fbea7640d109bea0 @@ -53,6 +53,23 @@ } } +void +revision_enumerator::get_revision_parents(revision_id const & child, + vector & parents) +{ + parents.clear(); + typedef multimap::const_iterator ci; + pair range = inverse_graph.equal_range(child); + for (ci i = range.first; i != range.second; ++i) + { + if (i->first == child) + { + parents.push_back(i->second); + } + } +} + + bool revision_enumerator::all_parents_enumerated(revision_id const & child) { @@ -158,7 +175,35 @@ } } +void +revision_enumerator::note_cert(revision_id const & rid, + hexenc const & cert_hash) +{ + revision_certs.insert(std::make_pair(rid, cert_hash)); +} + + void +revision_enumerator::get_revision_certs(revision_id const & rid, + std::vector > & hashes) +{ + hashes.clear(); + bool found_one = false; + typedef multimap >::const_iterator ci; + pair range = revision_certs.equal_range(rid); + for (ci i = range.first; i != range.second; ++i) + { + found_one = true; + if (i->first == rid) + hashes.push_back(i->second); + } + if (!found_one) + { + app.db.get_revision_certs(rid, hashes); + } +} + +void revision_enumerator::step() { while (!done()) @@ -185,9 +230,12 @@ pair range = graph.equal_range(r); for (ci i = range.first; i != range.second; ++i) { + // We push_front here rather than push_back in order + // to improve database cache performance. It avoids + // skipping back and forth beween parallel lineages. if (i->first == r) if (enumerated_nodes.find(i->first) == enumerated_nodes.end()) - revs.push_back(i->second); + revs.push_front(i->second); } } @@ -244,7 +292,7 @@ // Queue up some or all of the rev's certs vector > hashes; - app.db.get_revision_certs(r, hashes); + get_revision_certs(r, hashes); for (vector >::const_iterator i = hashes.begin(); i != hashes.end(); ++i) { ============================================================ --- enumerator.hh 6e6ecbf59d43a8e4bf4de07008a8795bd18cd960 +++ enumerator.hh 4f3ce29808f2bf2c0f65701e64ae379e369ab9fb @@ -9,6 +9,7 @@ #include #include #include +#include #include "app_state.hh" #include "vocab.hh" @@ -54,7 +55,15 @@ std::deque items; std::multimap graph; std::multimap inverse_graph; + std::multimap > revision_certs; + void note_cert(revision_id const & rid, + hexenc const & cert_hash); + void get_revision_certs(revision_id const & rid, + std::vector > & certs); + void get_revision_parents(revision_id const & rid, + std::vector & parents); + revision_enumerator(enumerator_callbacks & cb, app_state & app, std::set const & initial, ============================================================ --- netsync.cc 838ea8e645fbc5694a6ac0e81ee0e5c8dde36ede +++ netsync.cc 9e577332850f154ed42b8a3104b57d0c1406f788 @@ -433,6 +433,12 @@ // Various helpers. void respond_to_confirm_cmd(); + bool data_exists(netcmd_item_type type, + id const & item); + void load_data(netcmd_item_type type, + id const & item, + string & out); + void rebuild_merkle_trees(app_state & app, set const & branches); @@ -1686,110 +1692,91 @@ epoch_refiner.begin_refinement(); } -static bool -data_exists(netcmd_item_type type, - id const & item, - app_state & app) +bool +session::data_exists(netcmd_item_type type, + id const & item) { hexenc hitem; encode_hexenc(item, hitem); switch (type) { case key_item: - return app.db.public_key_exists(hitem); + return key_refiner.local_item_exists(item) + || app.db.public_key_exists(hitem); case file_item: return app.db.file_version_exists(file_id(hitem)); case revision_item: - return app.db.revision_exists(revision_id(hitem)); + return rev_refiner.local_item_exists(item) + || app.db.revision_exists(revision_id(hitem)); case cert_item: - return app.db.revision_cert_exists(hitem); + return cert_refiner.local_item_exists(item) + || app.db.revision_cert_exists(hitem); case epoch_item: - return app.db.epoch_exists(epoch_id(hitem)); + return epoch_refiner.local_item_exists(item) + || app.db.epoch_exists(epoch_id(hitem)); } return false; } -static void -load_data(netcmd_item_type type, - id const & item, - app_state & app, - string & out) +void +session::load_data(netcmd_item_type type, + id const & item, + string & out) { string typestr; netcmd_item_type_to_string(type, typestr); hexenc hitem; encode_hexenc(item, hitem); + + if (!data_exists(type, item)) + throw bad_decode(F("%s with hash '%s' does not exist in our database") + % typestr % hitem); + switch (type) { case epoch_item: - if (app.db.epoch_exists(epoch_id(hitem))) { cert_value branch; epoch_data epoch; app.db.get_epoch(epoch_id(hitem), branch, epoch); write_epoch(branch, epoch, out); } - else - { - throw bad_decode(F("epoch with hash '%s' does not exist in our database") - % hitem); - } break; case key_item: - if (app.db.public_key_exists(hitem)) - { - rsa_keypair_id keyid; - base64 pub_encoded; - app.db.get_pubkey(hitem, keyid, pub_encoded); - L(FL("public key '%s' is also called '%s'\n") % hitem % keyid); - write_pubkey(keyid, pub_encoded, out); - } - else - { - throw bad_decode(F("no public key '%s' found in database") % hitem); - } + { + rsa_keypair_id keyid; + base64 pub_encoded; + app.db.get_pubkey(hitem, keyid, pub_encoded); + L(FL("public key '%s' is also called '%s'\n") % hitem % keyid); + write_pubkey(keyid, pub_encoded, out); + } break; case revision_item: - if (app.db.revision_exists(revision_id(hitem))) - { - revision_data mdat; - data dat; - app.db.get_revision(revision_id(hitem), mdat); - out = mdat.inner()(); - } - else - { - throw bad_decode(F("revision '%s' does not exist in our database") % hitem); - } + { + revision_data mdat; + data dat; + app.db.get_revision(revision_id(hitem), mdat); + out = mdat.inner()(); + } break; - + case file_item: - if (app.db.file_version_exists(file_id(hitem))) - { - file_data fdat; - data dat; - app.db.get_file_version(file_id(hitem), fdat); - out = fdat.inner()(); - } - else - { - throw bad_decode(F("file '%s' does not exist in our database") % hitem); - } + { + file_data fdat; + data dat; + app.db.get_file_version(file_id(hitem), fdat); + out = fdat.inner()(); + } break; case cert_item: - if (app.db.revision_cert_exists(hitem)) - { - revision c; - app.db.get_revision_cert(hitem, c); - string tmp; - write_cert(c.inner(), out); - } - else - { - throw bad_decode(F("cert '%s' does not exist in our database") % hitem); - } + { + revision c; + app.db.get_revision_cert(hitem, c); + string tmp; + write_cert(c.inner(), out); + } break; } } @@ -1802,113 +1789,97 @@ hexenc hitem; encode_hexenc(item, hitem); + string typestr; + netcmd_item_type_to_string(type, typestr); + note_item_arrived(type, item); + if (data_exists(type, item)) + { + L(FL("%s '%s' already exists in our database\n") % typestr % hitem); + return true; + } switch (type) { case epoch_item: - if (this->app.db.epoch_exists(epoch_id(hitem))) - { - L(FL("epoch '%s' already exists in our database\n") % hitem); - } - else - { - cert_value branch; - epoch_data epoch; - read_epoch(dat, branch, epoch); - L(FL("received epoch %s for branch %s\n") % epoch % branch); - std::map epochs; - app.db.get_epochs(epochs); - std::map::const_iterator i; - i = epochs.find(branch); - if (i == epochs.end()) - { - L(FL("branch %s has no epoch; setting epoch to %s\n") % branch % epoch); - app.db.set_epoch(branch, epoch); - } - else - { - L(FL("branch %s already has an epoch; checking\n") % branch); - // If we get here, then we know that the epoch must be - // different, because if it were the same then the - // if (epoch_exists()) branch up above would have been taken. - // if somehow this is wrong, then we have broken epoch - // hashing or something, which is very dangerous, so play it - // safe... - I(!(i->second == epoch)); - - // It is safe to call 'error' here, because if we get here, - // then the current netcmd packet cannot possibly have - // written anything to the database. - error((F("Mismatched epoch on branch %s." - " Server has '%s', client has '%s'.") - % branch - % (voice == server_voice ? i->second : epoch) - % (voice == server_voice ? epoch : i->second)).str()); - } - } + { + cert_value branch; + epoch_data epoch; + read_epoch(dat, branch, epoch); + L(FL("received epoch %s for branch %s\n") % epoch % branch); + std::map epochs; + app.db.get_epochs(epochs); + std::map::const_iterator i; + i = epochs.find(branch); + if (i == epochs.end()) + { + L(FL("branch %s has no epoch; setting epoch to %s\n") % branch % epoch); + app.db.set_epoch(branch, epoch); + } + else + { + L(FL("branch %s already has an epoch; checking\n") % branch); + // If we get here, then we know that the epoch must be + // different, because if it were the same then the + // if (epoch_exists()) branch up above would have been taken. + // if somehow this is wrong, then we have broken epoch + // hashing or something, which is very dangerous, so play it + // safe... + I(!(i->second == epoch)); + + // It is safe to call 'error' here, because if we get here, + // then the current netcmd packet cannot possibly have + // written anything to the database. + error((F("Mismatched epoch on branch %s." + " Server has '%s', client has '%s'.") + % branch + % (voice == server_voice ? i->second : epoch) + % (voice == server_voice ? epoch : i->second)).str()); + } + } maybe_note_epochs_finished(); break; case key_item: - if (this->app.db.public_key_exists(hitem)) - L(FL("public key '%s' already exists in our database\n") % hitem); - else - { - rsa_keypair_id keyid; - base64 pub; - read_pubkey(dat, keyid, pub); - hexenc tmp; - key_hash_code(keyid, pub, tmp); - if (! (tmp == hitem)) - throw bad_decode(F("hash check failed for public key '%s' (%s);" - " wanted '%s' got '%s'") - % hitem % keyid % hitem % tmp); - this->dbw.consume_public_key(keyid, pub); - } + { + rsa_keypair_id keyid; + base64 pub; + read_pubkey(dat, keyid, pub); + hexenc tmp; + key_hash_code(keyid, pub, tmp); + if (! (tmp == hitem)) + throw bad_decode(F("hash check failed for public key '%s' (%s);" + " wanted '%s' got '%s'") + % hitem % keyid % hitem % tmp); + this->dbw.consume_public_key(keyid, pub); + } break; case cert_item: - if (this->app.db.revision_cert_exists(hitem)) - L(FL("cert '%s' already exists in our database\n") % hitem); - else - { - cert c; - read_cert(dat, c); - hexenc tmp; - cert_hash_code(c, tmp); - if (! (tmp == hitem)) - throw bad_decode(F("hash check failed for revision cert '%s'") % hitem); - this->dbw.consume_revision_cert(revision(c)); - } + { + cert c; + read_cert(dat, c); + hexenc tmp; + cert_hash_code(c, tmp); + if (! (tmp == hitem)) + throw bad_decode(F("hash check failed for revision cert '%s'") % hitem); + this->dbw.consume_revision_cert(revision(c)); + } break; case revision_item: { - revision_id rid(hitem); - if (this->app.db.revision_exists(rid)) - L(FL("revision '%s' already exists in our database\n") % hitem); - else - { - L(FL("received revision '%s'\n") % hitem); - this->dbw.consume_revision_data(rid, revision_data(dat)); - } + L(FL("received revision '%s'\n") % hitem); + this->dbw.consume_revision_data(revision_id(hitem), revision_data(dat)); } break; - + case file_item: { - file_id fid(hitem); - if (this->app.db.file_version_exists(fid)) - L(FL("file version '%s' already exists in our database\n") % hitem); - else - { - L(FL("received file '%s'\n") % hitem); - this->dbw.consume_file_data(fid, file_data(dat)); - } + L(FL("received file '%s'\n") % hitem); + this->dbw.consume_file_data(file_id(hitem), file_data(dat)); } break; - } return true; } @@ -1980,10 +1951,10 @@ hexenc hitem; encode_hexenc(*i, hitem); - if (data_exists(ty, *i, this->app)) + if (data_exists(ty, *i)) { string out; - load_data(ty, *i, this->app, out); + load_data(ty, *i, out); queue_data_cmd(ty, *i, out); } } @@ -2767,6 +2738,7 @@ void insert_with_parents(revision_id rev, refiner & ref, + revision_enumerator & rev_enumerator, set & revs, app_state & app, ticker & revisions_ticker) @@ -2785,9 +2757,9 @@ id rev_item; decode_hexenc(rid.inner(), rev_item); ref.note_local_item(rev_item); - std::set parents; - app.db.get_revision_parents(rid, parents); - for (std::set::const_iterator i = parents.begin(); + std::vector parents; + rev_enumerator.get_revision_parents(rid, parents); + for (std::vector::const_iterator i = parents.begin(); i != parents.end(); ++i) { work.push_back(*i); @@ -2831,14 +2803,16 @@ for (vector< revision >::const_iterator j = certs.begin(); j != certs.end(); j++) { - insert_with_parents(revision_id(j->inner().ident), - rev_refiner, revision_ids, app, revisions_ticker); + revision_id rid(j->inner().ident); + insert_with_parents(rid, rev_refiner, rev_enumerator, + revision_ids, app, revisions_ticker); // Granch certs go in here, others later on. hexenc tmp; id item; cert_hash_code(j->inner(), tmp); decode_hexenc(tmp, item); cert_refiner.note_local_item(item); + rev_enumerator.note_cert(rid, tmp); if (inserted_keys.find(j->inner().key) == inserted_keys.end()) inserted_keys.insert(j->inner().key); } @@ -2883,7 +2857,7 @@ cert_idx idx; app.db.get_revision_cert_nobranch_index(idx); - + // Insert all non-branch certs reachable via these revisions // (branch certs were inserted earlier). @@ -2892,6 +2866,8 @@ hexenc const & hash = i->first; revision_id const & ident = i->second.first; rsa_keypair_id const & key = i->second.second; + + rev_enumerator.note_cert(ident, hash); if (revision_ids.find(ident) == revision_ids.end()) continue; ============================================================ --- refiner.hh 9f724877248fc986aed3c2de8fa39d5a27f934d1 +++ refiner.hh 785a738e8476f63f8f8b0c5ecfee085dbd764ade @@ -78,7 +78,12 @@ void begin_refinement(); void process_done_command(size_t n_items); void process_refinement_command(refinement_type ty, merkle_node const & their_node); + bool local_item_exists(id const & ident) + { + return local_items.find(ident) != local_items.end(); + } + // These are populated as the 'done' packets arrive. bool done; std::set items_to_send;