# # patch "change_set.hh" # from [c9d0a8fa88d57ae3e6d479d2d62a1e5ea0016ed3] # to [a060f0c256cd5fb1c1156b130eaf3842976012fe] # # patch "netsync.cc" # from [b3c340c4ed39777283351959cc5d0068dd60abd4] # to [4bdd084e4dd25d0a85c5287e99850c30c4f6cc66] # # patch "packet.cc" # from [407e15cf1c485e09665a372588be2f99491a17d7] # to [0867d1963018a7de23a7ca07be1e637817aaeb50] # # patch "packet.hh" # from [ec7178c2332473305c0aa7d00c727e338fc7810d] # to [c2d6f03ffa901f98ad6623090b687982d27db552] # # patch "vocab.cc" # from [cebf734fb6a83a66665786e2c1486d4934137066] # to [81220896b3d16dfa324aae78d7e5bdae045c4d83] # # patch "vocab.hh" # from [22382ac1bdffec21170a88ff2580fe39b508243f] # to [511e1ef3052189b3868e5b3932b12c198eab396f] # ======================================================================== --- change_set.hh c9d0a8fa88d57ae3e6d479d2d62a1e5ea0016ed3 +++ change_set.hh a060f0c256cd5fb1c1156b130eaf3842976012fe @@ -77,24 +77,6 @@ return p.empty(); } -inline bool -null_id(file_id const & i) -{ - return i.inner()().empty(); -} - -inline bool -null_id(manifest_id const & i) -{ - return i.inner()().empty(); -} - -inline bool -null_id(revision_id const & i) -{ - return i.inner()().empty(); -} - inline file_path const & delta_entry_path(change_set::delta_map::const_iterator i) { ======================================================================== --- netsync.cc b3c340c4ed39777283351959cc5d0068dd60abd4 +++ netsync.cc 4bdd084e4dd25d0a85c5287e99850c30c4f6cc66 @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -306,6 +307,7 @@ map done_refinements; map > > requested_items; map > > received_items; + map > > full_delta_items; map > > ancestry; map > > received_certs; set< pair > reverse_delta_requests; @@ -345,6 +347,7 @@ bool all_requested_revisions_received(); void note_item_requested(netcmd_item_type ty, id const & i); + void note_item_full_delta(netcmd_item_type ty, id const & ident); bool item_already_requested(netcmd_item_type ty, id const & i); void note_item_arrived(netcmd_item_type ty, id const & i); @@ -355,15 +358,8 @@ bool got_all_data(); void maybe_say_goodbye(); - void analyze_attachment(revision_id const & i, - set & visited, - map & attached); - void request_rev_revisions(revision_id const & init, - map attached, - set visited); - void request_fwd_revisions(revision_id const & i, - map attached, - set & visited); + void get_heads_and_consume_certs(set & heads); + void analyze_ancestry_graph(); void analyze_manifest(manifest_map const & man); @@ -461,7 +457,39 @@ bool process(); }; +struct +ancestry_fetcher +{ + session & sess; + // map children to parents + multimap< file_id, file_id > rev_file_deltas; + multimap< manifest_id, manifest_id > rev_manifest_deltas; + // map an ancestor to a child + multimap< file_id, file_id > fwd_file_deltas; + multimap< manifest_id, manifest_id > fwd_manifest_deltas; + + set< file_id > seen_files; + + ancestry_fetcher(session & s); + // analysing the ancestry graph + void traverse_files(change_set const & cset); + void traverse_manifest(manifest_id const & child_man, + manifest_id const & parent_man); + void traverse_ancestry(set const & heads); + + // requesting the data + void request_rev_file_deltas(file_id const & start, + set & done_files); + void request_files(); + void request_rev_manifest_deltas(manifest_id const & start, + set & done_manifests); + void request_manifests(); + + +}; + + struct root_prefix { @@ -483,6 +511,7 @@ return ROOT_PREFIX; } +static file_id null_ident; session::session(protocol_role role, protocol_voice voice, @@ -549,6 +578,9 @@ received_items.insert(make_pair(manifest_item, boost::shared_ptr< set >(new set()))); received_items.insert(make_pair(file_item, boost::shared_ptr< set >(new set()))); received_items.insert(make_pair(epoch_item, boost::shared_ptr< set >(new set()))); + + full_delta_items.insert(make_pair(manifest_item, boost::shared_ptr< set >(new set()))); + full_delta_items.insert(make_pair(file_item, boost::shared_ptr< set >(new set()))); } session::~session() @@ -760,6 +792,15 @@ } void +session::note_item_full_delta(netcmd_item_type ty, id const & ident) +{ + map > >::const_iterator + i = full_delta_items.find(ty); + I(i != full_delta_items.end()); + i->second->insert(ident); +} + +void session::note_item_arrived(netcmd_item_type ty, id const & ident) { map > >::const_iterator @@ -879,67 +920,6 @@ } } -static bool -is_attached(revision_id const & i, - map const & attach_map) -{ - map::const_iterator j = attach_map.find(i); - I(j != attach_map.end()); - return j->second; -} - -// this tells us whether a particular revision is "attached" -- meaning -// either our database contains the underlying manifest or else one of our -// parents (recursively, and only in the current ancestry graph we're -// requesting) is attached. if it's detached we will request it using a -// different (more efficient and less failure-prone) algorithm - -void -session::analyze_attachment(revision_id const & i, - set & visited, - map & attached) -{ - typedef map > > ancestryT; - - if (visited.find(i) != visited.end()) - return; - - visited.insert(i); - - bool curr_attached = false; - - if (app.db.revision_exists(i)) - { - L(F("revision %s is attached via database\n") % i); - curr_attached = true; - } - else - { - L(F("checking attachment of %s in ancestry\n") % i); - ancestryT::const_iterator j = ancestry.find(i); - if (j != ancestry.end()) - { - for (edge_map::const_iterator k = j->second->second.edges.begin(); - k != j->second->second.edges.end(); ++k) - { - L(F("checking attachment of %s in parent %s\n") % i % edge_old_revision(k)); - analyze_attachment(edge_old_revision(k), visited, attached); - if (is_attached(edge_old_revision(k), attached)) - { - L(F("revision %s is attached via parent %s\n") % i % edge_old_revision(k)); - curr_attached = true; - } - } - } - } - if (curr_attached) - L(F("decided that revision '%s' is attached\n") % i); - else - L(F("decided that revision '%s' is not attached\n") % i); - - attached[i] = curr_attached; -} - inline static id plain_id(manifest_id const & i) { @@ -958,333 +938,114 @@ return tmp; } -void -session::request_rev_revisions(revision_id const & init, - map attached, - set visited) +void +session::get_heads_and_consume_certs( set & heads ) { typedef map > > ancestryT; + typedef map > cert_map; - set seen_manifests; - set seen_files; + set nodes, parents; + map chld_num; + L(F("analyzing %d ancestry edges\n") % ancestry.size()); - set frontier; - frontier.insert(init); - while(!frontier.empty()) + for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i) { - set next_frontier; - for (set::const_iterator i = frontier.begin(); - i != frontier.end(); ++i) + nodes.insert(i->first); + for (edge_map::const_iterator j = i->second->second.edges.begin(); + j != i->second->second.edges.end(); ++j) { - if (is_attached(*i, attached)) - continue; - - if (visited.find(*i) != visited.end()) - continue; - - visited.insert(*i); - - ancestryT::const_iterator j = ancestry.find(*i); - if (j != ancestry.end()) - { - - for (edge_map::const_iterator k = j->second->second.edges.begin(); - k != j->second->second.edges.end(); ++k) - { - - next_frontier.insert(edge_old_revision(k)); - - // check out the manifest delta edge - manifest_id parent_manifest = edge_old_manifest(k); - manifest_id child_manifest = j->second->second.new_manifest; - - // first, if we have a child we've never seen before we will need - // to request it in its entrety. - if (seen_manifests.find(child_manifest) == seen_manifests.end()) - { - if (this->app.db.manifest_version_exists(child_manifest)) - L(F("not requesting (in reverse) initial manifest %s as we already have it\n") % child_manifest); - else - { - L(F("requesting (in reverse) initial manifest data %s\n") % child_manifest); - queue_send_data_cmd(manifest_item, plain_id(child_manifest)); - } - seen_manifests.insert(child_manifest); - } - - // second, if the parent is nonempty, we want to ask for an edge to it - if (!parent_manifest.inner()().empty()) - { - if (this->app.db.manifest_version_exists(parent_manifest)) - L(F("not requesting (in reverse) manifest delta to %s as we already have it\n") % parent_manifest); - else - { - L(F("requesting (in reverse) manifest delta %s -> %s\n") - % child_manifest % parent_manifest); - reverse_delta_requests.insert(make_pair(plain_id(child_manifest), - plain_id(parent_manifest))); - queue_send_delta_cmd(manifest_item, - plain_id(child_manifest), - plain_id(parent_manifest)); - } - seen_manifests.insert(parent_manifest); - } - - - - // check out each file delta edge - change_set const & cset = edge_changes(k); - for (change_set::delta_map::const_iterator d = cset.deltas.begin(); - d != cset.deltas.end(); ++d) - { - file_id parent_file (delta_entry_src(d)); - file_id child_file (delta_entry_dst(d)); - - - // first, if we have a child we've never seen before we will need - // to request it in its entrety. - if (seen_files.find(child_file) == seen_files.end()) - { - if (this->app.db.file_version_exists(child_file)) - L(F("not requesting (in reverse) initial file %s as we already have it\n") % child_file); - else - { - L(F("requesting (in reverse) initial file data %s\n") % child_file); - queue_send_data_cmd(file_item, plain_id(child_file)); - } - seen_files.insert(child_file); - } - - // second, if the parent is nonempty, we want to ask for an edge to it - if (!parent_file.inner()().empty()) - { - if (this->app.db.file_version_exists(parent_file)) - L(F("not requesting (in reverse) file delta to %s as we already have it\n") % parent_file); - else - { - L(F("requesting (in reverse) file delta %s -> %s on %s\n") - % child_file % parent_file % delta_entry_path(d)); - reverse_delta_requests.insert(make_pair(plain_id(child_file), - plain_id(parent_file))); - queue_send_delta_cmd(file_item, - plain_id(child_file), - plain_id(parent_file)); - } - seen_files.insert(parent_file); - } - } - } - - // now actually consume the data packet, which will wait on the - // arrival of its prerequisites in the packet_db_writer - this->dbw.consume_revision_data(j->first, j->second->first); - } + parents.insert(edge_old_revision(j)); + map::iterator n; + n = chld_num.find(edge_old_revision(j)); + if (n == chld_num.end()) + chld_num.insert(make_pair(edge_old_revision(j), 1)); + else + ++(n->second); } - frontier = next_frontier; } -} - -void -session::request_fwd_revisions(revision_id const & i, - map attached, - set & visited) -{ - if (visited.find(i) != visited.end()) - return; - visited.insert(i); - - L(F("visiting revision '%s' for forward deltas\n") % i); + set_difference(nodes.begin(), nodes.end(), + parents.begin(), parents.end(), + inserter(heads, heads.begin())); - typedef map > > ancestryT; - - ancestryT::const_iterator j = ancestry.find(i); - if (j != ancestry.end()) - { - edge_map::const_iterator an_attached_edge = j->second->second.edges.end(); + L(F("intermediate set_difference heads size %d") % heads.size()); - // first make sure we've requested enough to get to here by - // calling ourselves recursively. this is the forward path after all. + // Write permissions checking: + // remove heads w/o proper certs, add their children to heads + // 1) remove unwanted branch certs from consideration + // 2) remove heads w/o a branch tag, process new exposed heads + // 3) repeat 2 until no change - for (edge_map::const_iterator k = j->second->second.edges.begin(); - k != j->second->second.edges.end(); ++k) + //1 + set ok_branches, bad_branches; + cert_name bcert_name(branch_cert_name); + cert_name tcert_name(tag_cert_name); + for (map::iterator i = received_certs.begin(); + i != received_certs.end(); ++i) + { + //branches + vector & bcerts(i->second[bcert_name]); + vector keeping; + for (vector::iterator j = bcerts.begin(); j != bcerts.end(); ++j) { - if (is_attached(edge_old_revision(k), attached)) - { - request_fwd_revisions(edge_old_revision(k), attached, visited); - an_attached_edge = k; - } - } - - I(an_attached_edge != j->second->second.edges.end()); - - // check out the manifest delta edge - manifest_id parent_manifest = edge_old_manifest(an_attached_edge); - manifest_id child_manifest = j->second->second.new_manifest; - if (this->app.db.manifest_version_exists(child_manifest)) - L(F("not requesting forward manifest delta to '%s' as we already have it\n") - % child_manifest); - else - { - if (parent_manifest.inner()().empty()) - { - L(F("requesting full manifest data %s\n") % child_manifest); - queue_send_data_cmd(manifest_item, plain_id(child_manifest)); - } + cert_value name; + decode_base64(j->value, name); + if (ok_branches.find(name()) != ok_branches.end()) + keeping.push_back(*j); + else if (bad_branches.find(name()) != bad_branches.end()) + ; else { - L(F("requesting forward manifest delta %s -> %s\n") - % parent_manifest % child_manifest); - queue_send_delta_cmd(manifest_item, - plain_id(parent_manifest), - plain_id(child_manifest)); + if (our_matcher(name())) + { + ok_branches.insert(name()); + keeping.push_back(*j); + } + else + { + bad_branches.insert(name()); + W(F("Dropping branch certs for unwanted branch %s") + % name); + } } } + bcerts = keeping; + } + //2 + list tmp; + for (set::iterator i = heads.begin(); i != heads.end(); ++i) + { + if (!received_certs[*i][bcert_name].size()) + tmp.push_back(*i); + } + for (list::iterator i = tmp.begin(); i != tmp.end(); ++i) + heads.erase(*i); - // check out each file delta edge - change_set const & an_attached_cset = edge_changes(an_attached_edge); - for (change_set::delta_map::const_iterator k = an_attached_cset.deltas.begin(); - k != an_attached_cset.deltas.end(); ++k) + L(F("after step 2, heads size %d") % heads.size()); + //3 + while (tmp.size()) + { + ancestryT::const_iterator i = ancestry.find(tmp.front()); + if (i != ancestry.end()) { - if (this->app.db.file_version_exists(delta_entry_dst(k))) - L(F("not requesting forward delta %s -> %s on file %s as we already have it\n") - % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k)); - else + for (edge_map::const_iterator j = i->second->second.edges.begin(); + j != i->second->second.edges.end(); ++j) { - if (delta_entry_src(k).inner()().empty()) + if (!--chld_num[edge_old_revision(j)]) { - L(F("requesting full file data %s\n") % delta_entry_dst(k)); - queue_send_data_cmd(file_item, plain_id(delta_entry_dst(k))); + if (received_certs[i->first][bcert_name].size()) + heads.insert(i->first); + else + tmp.push_back(edge_old_revision(j)); } - else - { - - L(F("requesting forward delta %s -> %s on file %s\n") - % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k)); - queue_send_delta_cmd(file_item, - plain_id(delta_entry_src(k)), - plain_id(delta_entry_dst(k))); - } } + // since we don't want this rev, we don't want it's certs either + received_certs[tmp.front()] = cert_map(); } - // now actually consume the data packet, which will wait on the - // arrival of its prerequisites in the packet_db_writer - this->dbw.consume_revision_data(j->first, j->second->first); + tmp.pop_front(); } -} -void -session::analyze_ancestry_graph() -{ - typedef map > > ancestryT; - typedef map > cert_map; - - if (! (all_requested_revisions_received() && cert_refinement_done())) - return; - - if (analyzed_ancestry) - return; - - set heads; - { - set nodes, parents; - map chld_num; - L(F("analyzing %d ancestry edges\n") % ancestry.size()); - - for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i) - { - nodes.insert(i->first); - for (edge_map::const_iterator j = i->second->second.edges.begin(); - j != i->second->second.edges.end(); ++j) - { - parents.insert(edge_old_revision(j)); - map::iterator n; - n = chld_num.find(edge_old_revision(j)); - if (n == chld_num.end()) - chld_num.insert(make_pair(edge_old_revision(j), 1)); - else - ++(n->second); - } - } - - set_difference(nodes.begin(), nodes.end(), - parents.begin(), parents.end(), - inserter(heads, heads.begin())); - - // Write permissions checking: - // remove heads w/o proper certs, add their children to heads - // 1) remove unwanted branch certs from consideration - // 2) remove heads w/o a branch tag, process new exposed heads - // 3) repeat 2 until no change - - //1 - set ok_branches, bad_branches; - cert_name bcert_name(branch_cert_name); - cert_name tcert_name(tag_cert_name); - for (map::iterator i = received_certs.begin(); - i != received_certs.end(); ++i) - { - //branches - vector & bcerts(i->second[bcert_name]); - vector keeping; - for (vector::iterator j = bcerts.begin(); j != bcerts.end(); ++j) - { - cert_value name; - decode_base64(j->value, name); - if (ok_branches.find(name()) != ok_branches.end()) - keeping.push_back(*j); - else if (bad_branches.find(name()) != bad_branches.end()) - ; - else - { - if (our_matcher(name())) - { - ok_branches.insert(name()); - keeping.push_back(*j); - } - else - { - bad_branches.insert(name()); - W(F("Dropping branch certs for unwanted branch %s") - % name); - } - } - } - bcerts = keeping; - } - //2 - list tmp; - for (set::iterator i = heads.begin(); i != heads.end(); ++i) - { - if (!received_certs[*i][bcert_name].size()) - tmp.push_back(*i); - } - for (list::iterator i = tmp.begin(); i != tmp.end(); ++i) - heads.erase(*i); - //3 - while (tmp.size()) - { - ancestryT::const_iterator i = ancestry.find(tmp.front()); - if (i != ancestry.end()) - { - for (edge_map::const_iterator j = i->second->second.edges.begin(); - j != i->second->second.edges.end(); ++j) - { - if (!--chld_num[edge_old_revision(j)]) - { - if (received_certs[i->first][bcert_name].size()) - heads.insert(i->first); - else - tmp.push_back(edge_old_revision(j)); - } - } - // since we don't want this rev, we don't want it's certs either - received_certs[tmp.front()] = cert_map(); - } - tmp.pop_front(); - } - } - + L(F("after step 3, heads size %d") % heads.size()); // We've reduced the certs to those we want now, send them to dbw. for (map::iterator i = received_certs.begin(); i != received_certs.end(); ++i) @@ -1299,43 +1060,28 @@ } } } +} - L(F("isolated %d heads\n") % heads.size()); +void +session::analyze_ancestry_graph() +{ + L(F("analyze_ancestry_graph")); + if (! (all_requested_revisions_received() && cert_refinement_done())) + { + L(F("not all done in analyze_ancestry_graph")); + return; + } - // first we determine the "attachment status" of each node in our ancestry - // graph. + if (analyzed_ancestry) + { + L(F("already analyzed_ancestry in analyze_ancestry_graph")); + return; + } - map attached; - set visited; - for (set::const_iterator i = heads.begin(); - i != heads.end(); ++i) - analyze_attachment(*i, visited, attached); + L(F("analyze_ancestry_graph fetching")); - // then we walk the graph upwards, recursively, starting from each of the - // heads. we either walk requesting forward deltas or reverse deltas, - // depending on whether we are walking an attached or detached subgraph, - // respectively. the forward walk ignores detached nodes, the backward walk - // ignores attached nodes. + ancestry_fetcher fetch(*this); - set fwd_visited, rev_visited; - - for (set::const_iterator i = heads.begin(); - i != heads.end(); ++i) - { - map::const_iterator k = attached.find(*i); - I(k != attached.end()); - - if (k->second) - { - L(F("requesting attached ancestry of revision '%s'\n") % *i); - request_fwd_revisions(*i, attached, fwd_visited); - } - else - { - L(F("requesting detached ancestry of revision '%s'\n") % *i); - request_rev_revisions(*i, attached, rev_visited); - } - } analyzed_ancestry = true; } @@ -2921,7 +2667,15 @@ case manifest_item: { manifest_id src_manifest(hbase), dst_manifest(hident); - if (reverse_delta_requests.find(id_pair) + if (full_delta_items[manifest_item]->find(ident) + != full_delta_items[manifest_item]->end()) + { + this->dbw.consume_manifest_delta(src_manifest, + dst_manifest, + manifest_delta(del), + true); + } + else if (reverse_delta_requests.find(id_pair) != reverse_delta_requests.end()) { reverse_delta_requests.erase(id_pair); @@ -2940,7 +2694,15 @@ case file_item: { file_id src_file(hbase), dst_file(hident); - if (reverse_delta_requests.find(id_pair) + if (full_delta_items[file_item]->find(ident) + != full_delta_items[file_item]->end()) + { + this->dbw.consume_file_delta(src_file, + dst_file, + file_delta(del), + true); + } + else if (reverse_delta_requests.find(id_pair) != reverse_delta_requests.end()) { reverse_delta_requests.erase(id_pair); @@ -3863,3 +3625,362 @@ end_platform_netsync(); } + +// Steps for determining files/manifests to request, from +// a given revision ancestry: +// +// 1) find the new heads, consume valid branch certs etc. +// +// 2) foreach new head, traverse up the revision ancestry, building +// a set of reverse file/manifest deltas (we stop when we hit an +// already-seen or existing-in-db rev). At the same time, build a +// smaller set of forward deltas to files/manifests which exist in the +// head revisions. +// +// 3) For each file/manifest in head, first request the forward delta +// (or full data if there is no path back to existing data). Then +// traverse up the set of reverse deltas, daisychaining our way until +// we get to existing revisions. +// +// Notes: +// +// - The database stores reverse deltas, so preferring these allows +// a server to send pre-computed deltas straight from the database +// (this isn't done yet). In order to bootstrap the tip-most data, +// forward deltas from a close(est?)-ancestor are used, or full data +// is requested if there is no existing ancestor. +// +// eg, if we have the (manifest) ancestry +// A -> B -> C -> D +// where A is existing, {B,C,D} are new, then we will request deltas +// A->D (fwd) +// D->C (rev) +// C->B (rev) +// This may result in slightly larger deltas than using all forward +// deltas, however it should be more efficient. +// +// - in order to keep a good hit ratio with the reconstructed version +// cache in database, we'll request deltas for a single file/manifest +// all at once, rather than requesting deltas per-revision. This +// requires a bit more memory usage, though it will be less memory +// than would be required to store all the outgoing delta requests +// anyway. +ancestry_fetcher::ancestry_fetcher(session & s) + : sess(s) +{ + set new_heads; + sess.get_heads_and_consume_certs( new_heads ); + + L(F("ancestry_fetcher: got %d heads") % new_heads.size()); + + traverse_ancestry(new_heads); + + request_files(); + request_manifests(); +} + +void +ancestry_fetcher::traverse_files(change_set const & cset) +{ + for (change_set::delta_map::const_iterator d = cset.deltas.begin(); + d != cset.deltas.end(); ++d) + { + file_id parent_file (delta_entry_src(d)); + file_id child_file (delta_entry_dst(d)); + L(F("traverse_files parent %s child %s") + % parent_file % child_file); + + I(!(parent_file == child_file)); + // XXX when changeset format is altered to have [...]->[] deltas on deletion, + // this assertion needs revisiting + I(!null_id(child_file)); + + // request the reverse delta + if (!null_id(parent_file)) + { + L(F("inserting file rev_deltas")); + rev_file_deltas.insert(make_pair(child_file, parent_file)); + } + + // add any new forward deltas + if (seen_files.find(child_file) == seen_files.end()) + { + L(F("inserting fwd_jump_deltas")); + fwd_file_deltas.insert( make_pair( parent_file, child_file ) ); + } + + // update any forward deltas. no point updating if it already + // points to something we have. + if (!null_id(parent_file) + && fwd_file_deltas.find(child_file) != fwd_file_deltas.end()) + { + // We're traversing with child->parent of A->B. + // Update any forward deltas with a parent of B to + // have A as a parent, ie B->C becomes A->C. + for (multimap::iterator d = + fwd_file_deltas.lower_bound(child_file); + d != fwd_file_deltas.upper_bound(child_file); + d++) + { + fwd_file_deltas.insert(make_pair(parent_file, d->second)); + } + + fwd_file_deltas.erase(fwd_file_deltas.lower_bound(child_file), + fwd_file_deltas.upper_bound(child_file)); + } + + seen_files.insert(child_file); + seen_files.insert(parent_file); + } +} + +void +ancestry_fetcher::traverse_manifest(manifest_id const & child_man, + manifest_id const & parent_man) +{ + L(F("traverse_manifest parent %s child %s") + % parent_man % child_man); + I(!null_id(child_man)); + // add reverse deltas + if (!null_id(parent_man)) + { + L(F("inserting manifest rev_deltas")); + rev_manifest_deltas.insert(make_pair(child_man, parent_man)); + } + + // handle the manifest forward-deltas + if (!null_id(parent_man) + // don't update child to itself, it makes the loop iterate infinitely. + && !(parent_man == child_man) + && fwd_manifest_deltas.find(child_man) != fwd_manifest_deltas.end()) + { + // We're traversing with child->parent of A->B. + // Update any forward deltas with a parent of B to + // have A as a parent, ie B->C becomes A->C. + for (multimap::iterator d = + fwd_manifest_deltas.lower_bound(child_man); + d != fwd_manifest_deltas.upper_bound(child_man); + d++) + { + L(F("size %d\n") % fwd_manifest_deltas.size()); + L(F("inserting %s->%s") % parent_man % d->second); + fwd_manifest_deltas.insert(make_pair(parent_man, d->second)); + } + + fwd_manifest_deltas.erase(fwd_manifest_deltas.lower_bound(child_man), + fwd_manifest_deltas.upper_bound(child_man)); + } +} + +void +ancestry_fetcher::traverse_ancestry(set const & heads) +{ + deque frontier; + set seen_revs; + + for (set::const_iterator h = heads.begin(); + h != heads.end(); h++) + { + L(F("inserting head %s") % *h); + frontier.push_back(*h); + seen_revs.insert(*h); + manifest_id const & m = sess.ancestry[*h]->second.new_manifest; + fwd_manifest_deltas.insert(make_pair(m,m)); + } + + // breadth first up the ancestry + while (!frontier.empty()) + { + revision_id const & rev = frontier.front(); + + L(F("frontier %s") % rev); + I(sess.ancestry.find(rev) != sess.ancestry.end()); + + for (edge_map::const_iterator e = sess.ancestry[rev]->second.edges.begin(); + e != sess.ancestry[rev]->second.edges.end(); e++) + { + revision_id const & par = edge_old_revision(e); + if (seen_revs.find(par) == seen_revs.end()) + { + if (sess.ancestry.find(par) != sess.ancestry.end()) + { + L(F("push_back to frontier %s") % par); + frontier.push_back(par); + } + seen_revs.insert(par); + } + + traverse_manifest(sess.ancestry[rev]->second.new_manifest, + edge_old_manifest(e)); + traverse_files(edge_changes(e)); + + } + + sess.dbw.consume_revision_data(rev, sess.ancestry[rev]->first); + frontier.pop_front(); + } +} + +void +ancestry_fetcher::request_rev_file_deltas(file_id const & start, + set & done_files) +{ + stack< file_id > frontier; + frontier.push(start); + + while (!frontier.empty()) + { + file_id const child = frontier.top(); + I(!null_id(child)); + frontier.pop(); + + for (multimap< file_id, file_id>::const_iterator + d = rev_file_deltas.lower_bound(child); + d != rev_file_deltas.upper_bound(child); + d++) + { + file_id const & parent = d->second; + I(!null_id(parent)); + if (done_files.find(parent) == done_files.end()) + { + done_files.insert(parent); + if (!sess.app.db.file_version_exists(parent)) + { + L(F("requesting reverse file delta %s->%s") + % child % parent); + sess.queue_send_delta_cmd(file_item, + plain_id(child), plain_id(parent)); + sess.reverse_delta_requests.insert(make_pair(plain_id(child), + plain_id(parent))); + } + else + { + L(F("file %s exists, not requesting rev delta") + % parent); + } + frontier.push(parent); + } + } + } +} + +void +ancestry_fetcher::request_files() +{ + // just a cache to avoid checking db.foo_version_exists() too much + set done_files; + + for (multimap::const_iterator d = fwd_file_deltas.begin(); + d != fwd_file_deltas.end(); d++) + { + file_id const & anc = d->first; + file_id const & child = d->second; + if (!sess.app.db.file_version_exists(child)) + { + if (null_id(anc) + || !sess.app.db.file_version_exists(anc)) + { + L(F("requesting full file %s") % child); + sess.queue_send_data_cmd(file_item, plain_id(child)); + } + else + { + L(F("requesting forward delta %s->%s") + % anc % child); + sess.queue_send_delta_cmd(file_item, + plain_id(anc), plain_id(child)); + sess.note_item_full_delta(file_item, plain_id(child)); + } + } + else + { + L(F("not requesting fwd delta %s->%s, already have dst") + % anc % child); + } + + // traverse up the reverse deltas + request_rev_file_deltas(child, done_files); + } +} + +void +ancestry_fetcher::request_rev_manifest_deltas(manifest_id const & start, + set & done_manifests) +{ + stack< manifest_id > frontier; + frontier.push(start); + + while (!frontier.empty()) + { + manifest_id const child = frontier.top(); + I(!null_id(child)); + frontier.pop(); + + for (multimap< manifest_id, manifest_id>::const_iterator + d = rev_manifest_deltas.lower_bound(child); + d != rev_manifest_deltas.upper_bound(child); + d++) + { + manifest_id const & parent = d->second; + I(!null_id(parent)); + if (done_manifests.find(parent) == done_manifests.end()) + { + done_manifests.insert(parent); + if (!sess.app.db.manifest_version_exists(parent)) + { + L(F("requesting reverse manifest delta %s->%s") + % child % parent); + sess.queue_send_delta_cmd(manifest_item, + plain_id(child), plain_id(parent)); + sess.reverse_delta_requests.insert(make_pair(plain_id(child), + plain_id(parent))); + } + else + { + L(F("manifest %s exists, not requesting rev delta") + % parent); + } + frontier.push(parent); + } + } + } +} + +void +ancestry_fetcher::request_manifests() +{ + // just a cache to avoid checking db.foo_version_exists() too much + set done_manifests; + + for (multimap::const_iterator d = fwd_manifest_deltas.begin(); + d != fwd_manifest_deltas.end(); d++) + { + manifest_id const & anc = d->first; + manifest_id const & child = d->second; + if (!sess.app.db.manifest_version_exists(child)) + { + if (null_id(anc) + || !sess.app.db.manifest_version_exists(anc)) + { + L(F("requesting full manifest %s") % child); + sess.queue_send_data_cmd(manifest_item, plain_id(child)); + } + else + { + L(F("requesting forward delta %s->%s") + % anc % child); + sess.queue_send_delta_cmd(manifest_item, + plain_id(anc), plain_id(child)); + sess.note_item_full_delta(manifest_item, plain_id(child)); + } + } + else + { + L(F("not requesting fwd delta %s->%s, already have dst") + % anc % child); + } + + // traverse up the reverse deltas + request_rev_manifest_deltas(child, done_manifests); + } +} ======================================================================== --- packet.cc 407e15cf1c485e09665a372588be2f99491a17d7 +++ packet.cc 0867d1963018a7de23a7ca07be1e637817aaeb50 @@ -246,12 +246,14 @@ file_id new_id; file_delta del; bool forward_delta; + bool write_full; public: delayed_file_delta_packet(file_id const & oi, file_id const & ni, file_delta const & md, - bool fwd) - : old_id(oi), new_id(ni), del(md), forward_delta(fwd) + bool fwd, + bool full = false) + : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full) {} virtual void apply_delayed_packet(packet_db_writer & pw); virtual ~delayed_file_delta_packet(); @@ -265,12 +267,14 @@ manifest_id new_id; manifest_delta del; bool forward_delta; + bool write_full; public: delayed_manifest_delta_packet(manifest_id const & oi, manifest_id const & ni, manifest_delta const & md, - bool fwd) - : old_id(oi), new_id(ni), del(md), forward_delta(fwd) + bool fwd, + bool full = false) + : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full) {} virtual void apply_delayed_packet(packet_db_writer & pw); virtual ~delayed_manifest_delta_packet(); @@ -361,12 +365,13 @@ void delayed_manifest_delta_packet::apply_delayed_packet(packet_db_writer & pw) { - L(F("writing delayed manifest %s packet for %s -> %s\n") + L(F("writing delayed manifest %s packet for %s -> %s%s\n") % (forward_delta ? "delta" : "reverse delta") % (forward_delta ? old_id : new_id) - % (forward_delta ? new_id : old_id)); + % (forward_delta ? new_id : old_id) + % (write_full ? " (writing in full)" : "")); if (forward_delta) - pw.consume_manifest_delta(old_id, new_id, del); + pw.consume_manifest_delta(old_id, new_id, del, write_full); else pw.consume_manifest_reverse_delta(new_id, old_id, del); } @@ -381,12 +386,13 @@ void delayed_file_delta_packet::apply_delayed_packet(packet_db_writer & pw) { - L(F("writing delayed file %s packet for %s -> %s\n") + L(F("writing delayed file %s packet for %s -> %s%s\n") % (forward_delta ? "delta" : "reverse delta") % (forward_delta ? old_id : new_id) - % (forward_delta ? new_id : old_id)); + % (forward_delta ? new_id : old_id) + % (write_full ? " (writing in full)" : "")); if (forward_delta) - pw.consume_file_delta(old_id, new_id, del); + pw.consume_file_delta(old_id, new_id, del, write_full); else pw.consume_file_reverse_delta(new_id, old_id, del); } @@ -656,6 +662,15 @@ file_id const & new_id, file_delta const & del) { + consume_file_delta(old_id, new_id, del, false); +} + +void +packet_db_writer::consume_file_delta(file_id const & old_id, + file_id const & new_id, + file_delta const & del, + bool write_full) +{ transaction_guard guard(pimpl->app.db); if (! pimpl->file_version_exists_in_db(new_id)) { @@ -669,7 +684,10 @@ calculate_ident(file_data(new_dat), confirm); if (confirm == new_id) { - pimpl->app.db.put_file_version(old_id, new_id, del); + if (!write_full) + pimpl->app.db.put_file_version(old_id, new_id, del); + else + pimpl->app.db.put_file(new_id, file_data(new_dat)); pimpl->accepted_file(new_id, *this); } else @@ -682,7 +700,7 @@ { L(F("delaying file delta %s -> %s for preimage\n") % old_id % new_id); shared_ptr dp; - dp = shared_ptr(new delayed_file_delta_packet(old_id, new_id, del, true)); + dp = shared_ptr(new delayed_file_delta_packet(old_id, new_id, del, true, write_full)); shared_ptr fp; pimpl->get_file_prereq(old_id, fp); dp->add_prerequisite(fp); @@ -761,6 +779,15 @@ manifest_id const & new_id, manifest_delta const & del) { + consume_manifest_delta(old_id, new_id, del, false); +} + +void +packet_db_writer::consume_manifest_delta(manifest_id const & old_id, + manifest_id const & new_id, + manifest_delta const & del, + bool write_full) +{ transaction_guard guard(pimpl->app.db); if (! pimpl->manifest_version_exists_in_db(new_id)) { @@ -774,7 +801,11 @@ calculate_ident(manifest_data(new_dat), confirm); if (confirm == new_id) { - pimpl->app.db.put_manifest_version(old_id, new_id, del); + if (!write_full) + pimpl->app.db.put_manifest_version(old_id, new_id, del); + else + pimpl->app.db.put_manifest(new_id, manifest_data(new_dat)); + pimpl->accepted_manifest(new_id, *this); } else @@ -787,7 +818,7 @@ { L(F("delaying manifest delta %s -> %s for preimage\n") % old_id % new_id); shared_ptr dp; - dp = shared_ptr(new delayed_manifest_delta_packet(old_id, new_id, del, true)); + dp = shared_ptr(new delayed_manifest_delta_packet(old_id, new_id, del, true, write_full)); shared_ptr fp; pimpl->get_manifest_prereq(old_id, fp); dp->add_prerequisite(fp); @@ -1108,6 +1139,15 @@ } void +packet_db_valve::consume_file_delta(file_id const & id_old, + file_id const & id_new, + file_delta const & del, + bool write_full) +{ + DOIT(delayed_file_delta_packet(id_old, id_new, del, true, write_full)); +} + +void packet_db_valve::consume_file_reverse_delta(file_id const & id_new, file_id const & id_old, file_delta const & del) @@ -1131,6 +1171,15 @@ } void +packet_db_valve::consume_manifest_delta(manifest_id const & id_old, + manifest_id const & id_new, + manifest_delta const & del, + bool write_full) +{ + DOIT(delayed_manifest_delta_packet(id_old, id_new, del, true, write_full)); +} + +void packet_db_valve::consume_manifest_reverse_delta(manifest_id const & id_new, manifest_id const & id_old, manifest_delta const & del) ======================================================================== --- packet.hh ec7178c2332473305c0aa7d00c727e338fc7810d +++ packet.hh c2d6f03ffa901f98ad6623090b687982d27db552 @@ -136,6 +136,10 @@ virtual void consume_file_delta(file_id const & id_old, file_id const & id_new, file_delta const & del); + virtual void consume_file_delta(file_id const & id_old, + file_id const & id_new, + file_delta const & del, + bool write_full); virtual void consume_file_reverse_delta(file_id const & id_new, file_id const & id_old, file_delta const & del); @@ -145,6 +149,10 @@ virtual void consume_manifest_delta(manifest_id const & id_old, manifest_id const & id_new, manifest_delta const & del); + virtual void consume_manifest_delta(manifest_id const & id_old, + manifest_id const & id_new, + manifest_delta const & del, + bool write_full); virtual void consume_manifest_reverse_delta(manifest_id const & id_new, manifest_id const & id_old, manifest_delta const & del); @@ -185,6 +193,10 @@ virtual void consume_file_delta(file_id const & id_old, file_id const & id_new, file_delta const & del); + virtual void consume_file_delta(file_id const & id_old, + file_id const & id_new, + file_delta const & del, + bool write_full); virtual void consume_file_reverse_delta(file_id const & id_new, file_id const & id_old, file_delta const & del); @@ -194,6 +206,10 @@ virtual void consume_manifest_delta(manifest_id const & id_old, manifest_id const & id_new, manifest_delta const & del); + virtual void consume_manifest_delta(manifest_id const & id_old, + manifest_id const & id_new, + manifest_delta const & del, + bool write_full); virtual void consume_manifest_reverse_delta(manifest_id const & id_new, manifest_id const & id_old, manifest_delta const & del); ======================================================================== --- vocab.cc cebf734fb6a83a66665786e2c1486d4934137066 +++ vocab.cc 81220896b3d16dfa324aae78d7e5bdae045c4d83 @@ -228,6 +228,9 @@ template class revision; template class manifest; +template +void dump(revision_id const & r, std::string &); + // the rest is unit tests #ifdef BUILD_UNIT_TESTS ======================================================================== --- vocab.hh 22382ac1bdffec21170a88ff2580fe39b508243f +++ vocab.hh 511e1ef3052189b3868e5b3932b12c198eab396f @@ -166,5 +166,24 @@ external_diff }; +// do these belong here? +inline bool +null_id(file_id const & i) +{ + return i.inner()().empty(); +} +inline bool +null_id(manifest_id const & i) +{ + return i.inner()().empty(); +} + +inline bool +null_id(revision_id const & i) +{ + return i.inner()().empty(); +} + + #endif // __VOCAB_HH__