# # patch "netsync.cc" # from [1a08b80949f7458e7868d09b0647dbdfd2f5d645] # to [2158f0f1f9e8f5e83aa4a5dfefa05d104bdb0a0e] # # patch "packet.cc" # from [407e15cf1c485e09665a372588be2f99491a17d7] # to [0867d1963018a7de23a7ca07be1e637817aaeb50] # # patch "packet.hh" # from [ec7178c2332473305c0aa7d00c727e338fc7810d] # to [c2d6f03ffa901f98ad6623090b687982d27db552] # ======================================================================== --- netsync.cc 1a08b80949f7458e7868d09b0647dbdfd2f5d645 +++ netsync.cc 2158f0f1f9e8f5e83aa4a5dfefa05d104bdb0a0e @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -359,15 +360,6 @@ void get_heads_and_consume_certs(set & heads); - void analyze_attachment(revision_id const & i, - set & visited, - map & attached); - void request_rev_revisions(revision_id const & init, - map attached, - set visited); - void request_fwd_revisions(revision_id const & i, - map attached, - set & visited); void analyze_ancestry_graph(); void analyze_manifest(manifest_map const & man); @@ -469,26 +461,23 @@ { session & sess; - set< revision_id> seen_revs; // map children to parents - map< file_id, set > rev_file_deltas; + multimap< file_id, file_id > rev_file_deltas; + multimap< manifest_id, manifest_id > rev_manifest_deltas; // map an ancestor to a child - map< file_id, file_id > fwd_file_deltas; - set< file_id > full_files; - map< manifest_id, set > rev_manifest_deltas; - map< manifest_id, manifest_id > fwd_manifest_deltas; - set< manifest_id > full_manifests; + multimap< file_id, file_id > fwd_file_deltas; + multimap< manifest_id, manifest_id > fwd_manifest_deltas; + set< file_id > seen_files; - set< file_id > anchor_files; - set< manifest_id > anchor_manifests; ancestry_fetcher(session & s); - void traverse_files(change_set const & cset, - map< file_id, file_id > & fwd_jump_deltas); + // analysing the ancestry graph + void traverse_files(change_set const & cset); void traverse_manifest(manifest_id const & child_man, - manifest_id const & parent_man, - manifest_id & fwd_anc); - void traverse_ancestry(revision_id const & head); + manifest_id const & parent_man); + void traverse_ancestry(set const & heads); + + // requesting the data void request_rev_file_deltas(file_id const & start); void request_files(); void request_rev_manifest_deltas(manifest_id const & start); @@ -926,67 +915,6 @@ } } -static bool -is_attached(revision_id const & i, - map const & attach_map) -{ - map::const_iterator j = attach_map.find(i); - I(j != attach_map.end()); - return j->second; -} - -// this tells us whether a particular revision is "attached" -- meaning -// either our database contains the underlying manifest or else one of our -// parents (recursively, and only in the current ancestry graph we're -// requesting) is attached. if it's detached we will request it using a -// different (more efficient and less failure-prone) algorithm - -void -session::analyze_attachment(revision_id const & i, - set & visited, - map & attached) -{ - typedef map > > ancestryT; - - if (visited.find(i) != visited.end()) - return; - - visited.insert(i); - - bool curr_attached = false; - - if (app.db.revision_exists(i)) - { - L(F("revision %s is attached via database\n") % i); - curr_attached = true; - } - else - { - L(F("checking attachment of %s in ancestry\n") % i); - ancestryT::const_iterator j = ancestry.find(i); - if (j != ancestry.end()) - { - for (edge_map::const_iterator k = j->second->second.edges.begin(); - k != j->second->second.edges.end(); ++k) - { - L(F("checking attachment of %s in parent %s\n") % i % edge_old_revision(k)); - analyze_attachment(edge_old_revision(k), visited, attached); - if (is_attached(edge_old_revision(k), attached)) - { - L(F("revision %s is attached via parent %s\n") % i % edge_old_revision(k)); - curr_attached = true; - } - } - } - } - if (curr_attached) - L(F("decided that revision '%s' is attached\n") % i); - else - L(F("decided that revision '%s' is not attached\n") % i); - - attached[i] = curr_attached; -} - inline static id plain_id(manifest_id const & i) { @@ -1005,221 +933,6 @@ return tmp; } -void -session::request_rev_revisions(revision_id const & init, - map attached, - set visited) -{ - typedef map > > ancestryT; - - set seen_manifests; - set seen_files; - - set frontier; - frontier.insert(init); - while(!frontier.empty()) - { - set next_frontier; - for (set::const_iterator i = frontier.begin(); - i != frontier.end(); ++i) - { - if (is_attached(*i, attached)) - continue; - - if (visited.find(*i) != visited.end()) - continue; - - visited.insert(*i); - - ancestryT::const_iterator j = ancestry.find(*i); - if (j != ancestry.end()) - { - - for (edge_map::const_iterator k = j->second->second.edges.begin(); - k != j->second->second.edges.end(); ++k) - { - - next_frontier.insert(edge_old_revision(k)); - - // check out the manifest delta edge - manifest_id parent_manifest = edge_old_manifest(k); - manifest_id child_manifest = j->second->second.new_manifest; - - // first, if we have a child we've never seen before we will need - // to request it in its entrety. - if (seen_manifests.find(child_manifest) == seen_manifests.end()) - { - if (this->app.db.manifest_version_exists(child_manifest)) - L(F("not requesting (in reverse) initial manifest %s as we already have it\n") % child_manifest); - else - { - L(F("requesting (in reverse) initial manifest data %s\n") % child_manifest); - queue_send_data_cmd(manifest_item, plain_id(child_manifest)); - } - seen_manifests.insert(child_manifest); - } - - // second, if the parent is nonempty, we want to ask for an edge to it - if (!parent_manifest.inner()().empty()) - { - if (this->app.db.manifest_version_exists(parent_manifest)) - L(F("not requesting (in reverse) manifest delta to %s as we already have it\n") % parent_manifest); - else - { - L(F("requesting (in reverse) manifest delta %s -> %s\n") - % child_manifest % parent_manifest); - reverse_delta_requests.insert(make_pair(plain_id(child_manifest), - plain_id(parent_manifest))); - queue_send_delta_cmd(manifest_item, - plain_id(child_manifest), - plain_id(parent_manifest)); - } - seen_manifests.insert(parent_manifest); - } - - - - // check out each file delta edge - change_set const & cset = edge_changes(k); - for (change_set::delta_map::const_iterator d = cset.deltas.begin(); - d != cset.deltas.end(); ++d) - { - file_id parent_file (delta_entry_src(d)); - file_id child_file (delta_entry_dst(d)); - - - // first, if we have a child we've never seen before we will need - // to request it in its entrety. - if (seen_files.find(child_file) == seen_files.end()) - { - if (this->app.db.file_version_exists(child_file)) - L(F("not requesting (in reverse) initial file %s as we already have it\n") % child_file); - else - { - L(F("requesting (in reverse) initial file data %s\n") % child_file); - queue_send_data_cmd(file_item, plain_id(child_file)); - } - seen_files.insert(child_file); - } - - // second, if the parent is nonempty, we want to ask for an edge to it - if (!parent_file.inner()().empty()) - { - if (this->app.db.file_version_exists(parent_file)) - L(F("not requesting (in reverse) file delta to %s as we already have it\n") % parent_file); - else - { - L(F("requesting (in reverse) file delta %s -> %s on %s\n") - % child_file % parent_file % delta_entry_path(d)); - reverse_delta_requests.insert(make_pair(plain_id(child_file), - plain_id(parent_file))); - queue_send_delta_cmd(file_item, - plain_id(child_file), - plain_id(parent_file)); - } - seen_files.insert(parent_file); - } - } - } - - // now actually consume the data packet, which will wait on the - // arrival of its prerequisites in the packet_db_writer - this->dbw.consume_revision_data(j->first, j->second->first); - } - } - frontier = next_frontier; - } -} - -void -session::request_fwd_revisions(revision_id const & i, - map attached, - set & visited) -{ - if (visited.find(i) != visited.end()) - return; - - visited.insert(i); - - L(F("visiting revision '%s' for forward deltas\n") % i); - - typedef map > > ancestryT; - - ancestryT::const_iterator j = ancestry.find(i); - if (j != ancestry.end()) - { - edge_map::const_iterator an_attached_edge = j->second->second.edges.end(); - - // first make sure we've requested enough to get to here by - // calling ourselves recursively. this is the forward path after all. - - for (edge_map::const_iterator k = j->second->second.edges.begin(); - k != j->second->second.edges.end(); ++k) - { - if (is_attached(edge_old_revision(k), attached)) - { - request_fwd_revisions(edge_old_revision(k), attached, visited); - an_attached_edge = k; - } - } - - I(an_attached_edge != j->second->second.edges.end()); - - // check out the manifest delta edge - manifest_id parent_manifest = edge_old_manifest(an_attached_edge); - manifest_id child_manifest = j->second->second.new_manifest; - if (this->app.db.manifest_version_exists(child_manifest)) - L(F("not requesting forward manifest delta to '%s' as we already have it\n") - % child_manifest); - else - { - if (parent_manifest.inner()().empty()) - { - L(F("requesting full manifest data %s\n") % child_manifest); - queue_send_data_cmd(manifest_item, plain_id(child_manifest)); - } - else - { - L(F("requesting forward manifest delta %s -> %s\n") - % parent_manifest % child_manifest); - queue_send_delta_cmd(manifest_item, - plain_id(parent_manifest), - plain_id(child_manifest)); - } - } - - // check out each file delta edge - change_set const & an_attached_cset = edge_changes(an_attached_edge); - for (change_set::delta_map::const_iterator k = an_attached_cset.deltas.begin(); - k != an_attached_cset.deltas.end(); ++k) - { - if (this->app.db.file_version_exists(delta_entry_dst(k))) - L(F("not requesting forward delta %s -> %s on file %s as we already have it\n") - % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k)); - else - { - if (delta_entry_src(k).inner()().empty()) - { - L(F("requesting full file data %s\n") % delta_entry_dst(k)); - queue_send_data_cmd(file_item, plain_id(delta_entry_dst(k))); - } - else - { - - L(F("requesting forward delta %s -> %s on file %s\n") - % delta_entry_src(k) % delta_entry_dst(k) % delta_entry_path(k)); - queue_send_delta_cmd(file_item, - plain_id(delta_entry_src(k)), - plain_id(delta_entry_dst(k))); - } - } - } - // now actually consume the data packet, which will wait on the - // arrival of its prerequisites in the packet_db_writer - this->dbw.consume_revision_data(j->first, j->second->first); - } -} - void session::get_heads_and_consume_certs( set & heads ) { @@ -1360,7 +1073,7 @@ return; } - L(F("analyze_ancestry_graph doing a fetch")); + L(F("analyze_ancestry_graph fetching")); ancestry_fetcher fetch(*this); @@ -2949,7 +2662,15 @@ case manifest_item: { manifest_id src_manifest(hbase), dst_manifest(hident); - if (reverse_delta_requests.find(id_pair) + if (full_delta_items[manifest_item]->find(ident) + != full_delta_items[manifest_item]->end()) + { + this->dbw.consume_manifest_delta(src_manifest, + dst_manifest, + manifest_delta(del), + true); + } + else if (reverse_delta_requests.find(id_pair) != reverse_delta_requests.end()) { reverse_delta_requests.erase(id_pair); @@ -2968,13 +2689,13 @@ case file_item: { file_id src_file(hbase), dst_file(hident); - if (full_delta_items[file_item].find(dst_file) - != full_delta_items[file_item].end()) + if (full_delta_items[file_item]->find(ident) + != full_delta_items[file_item]->end()) { this->dbw.consume_file_delta(src_file, - dst_file, - file_delta(del), - true); + dst_file, + file_delta(del), + true); } else if (reverse_delta_requests.find(id_pair) != reverse_delta_requests.end()) @@ -3872,82 +3593,106 @@ end_platform_netsync(); } + +// Notes: +// +// - it is cheaper for both the source and for the sink to receive +// reverse deltas (as opposed to forward deltas), since that's what +// gets stored in the db. Hence, we try to always use reverse deltas. +// If we have the (manifest) ancestry +// A -> B -> C -> D +// where A is existing, {B,C,D} are new, then we will request deltas +// A->D (fwd) +// D->C (rev) +// C->B (rev) +// This may result in slightly larger deltas than using all forward +// deltas, however it is much more efficient. +// +// - in order to keep a good hit ratio with the reconstructed version +// cache in database, we'll request deltas for a single file/manifest +// all at once, rather than requesting deltas per-revision. This +// requires a bit more memory usage, though it will be less memory +// than would be required to store all the outgoing delta requests +// anyway. +// +// Steps: +// 1) get new heads, consume valid branch certs etc. +// +// 2) foreach new head, traverse up the revision ancestry building +// a set of mainly reverse deltas, with forward deltas to files or +// manifests which exist in the head revision. +// +// 3) remove any deltas which already exist, and make requests +// for full files in the case where we don't already have the source +// for a full delta. +// +// 4) For each file/manifest in head, traverse up the set of reverse +// deltas requesting those deltas. +// ancestry_fetcher::ancestry_fetcher(session & s) : sess(s) { - - // Get the heads. set new_heads; sess.get_heads_and_consume_certs( new_heads ); + L(F("ancestry_fetcher: got %d heads") % new_heads.size()); - // for each new head, we traverse up the ancestry until we reach an existing - // revision, an already-requested revision, or a root revision. - for (set::const_iterator h = new_heads.begin(); - h != new_heads.end(); h++) - { - L(F("traverse head %s") % *h); - traverse_ancestry(*h); - } + traverse_ancestry(new_heads); request_files(); request_manifests(); } void -ancestry_fetcher::traverse_files(change_set const & cset, - map< file_id, file_id > & fwd_jump_deltas) +ancestry_fetcher::traverse_files(change_set const & cset) { for (change_set::delta_map::const_iterator d = cset.deltas.begin(); d != cset.deltas.end(); ++d) { - file_id parent_file (delta_entry_src(d)); file_id child_file (delta_entry_dst(d)); + L(F("traverse_files parent %s child %s") + % parent_file % child_file); - // surely we can assume this I(!(parent_file == child_file)); - - // when changeset format is altered to have ->"" deltas on deletion, - // this needs revisiting. + // XXX when changeset format is altered to have [...]->[] deltas on deletion, + // this assertion needs revisiting I(!null_id(child_file)); - L(F("traverse_files parent %s child %s") - % parent_file % child_file); + // request the reverse delta + if (!null_id(parent_file)) + { + L(F("inserting file rev_deltas")); + rev_file_deltas.insert(make_pair(child_file, parent_file)); + } - // if this is the first time we've seen the child file_id, - // add it to the fwd-delta list. + // add any new forward deltas if (seen_files.find(child_file) == seen_files.end()) { - L(F("unseen file %s, adding to anchor_files") % child_file); - anchor_files.insert(child_file); - if (!sess.app.db.file_version_exists(child_file)) - { - L(F("inserting fwd_jump_deltas")); - fwd_jump_deltas.insert( make_pair( parent_file, child_file ) ); - } + L(F("inserting fwd_jump_deltas")); + fwd_file_deltas.insert( make_pair( parent_file, child_file ) ); } - else + + // update any forward deltas. no point updating if it already + // points to something we have. + if (!null_id(parent_file) + && fwd_file_deltas.find(child_file) != fwd_file_deltas.end()) { - // ... and update any fwd_jump_deltas. - // no point updating if it already references something we have. - if (fwd_jump_deltas.find(child_file) != fwd_jump_deltas.end() - && !sess.app.db.file_version_exists(child_file)) + // We're traversing with child->parent of A->B. + // Update any forward deltas with a parent of B to + // have A as a parent, ie B->C becomes A->C. + for (multimap::iterator d = + fwd_file_deltas.lower_bound(child_file); + d != fwd_file_deltas.upper_bound(child_file); + d++) { - L(F("updating fwd_jump_deltas for head file %s") - % fwd_jump_deltas[child_file]); - fwd_jump_deltas[parent_file] = fwd_jump_deltas[child_file]; - fwd_jump_deltas.erase(child_file); + fwd_file_deltas.insert(make_pair(parent_file, d->second)); } - } - // request the reverse delta - if (!sess.app.db.file_version_exists(parent_file) - && !null_id(parent_file)) - { - L(F("inserting file rev_deltas")); - rev_file_deltas[child_file].insert(parent_file); + fwd_file_deltas.erase(fwd_file_deltas.lower_bound(child_file), + fwd_file_deltas.upper_bound(child_file)); } + seen_files.insert(child_file); seen_files.insert(parent_file); } @@ -3955,152 +3700,132 @@ void ancestry_fetcher::traverse_manifest(manifest_id const & child_man, - manifest_id const & parent_man, - manifest_id & fwd_anc) + manifest_id const & parent_man) { - - L(F("traverse_manifest parent %s child %s") % parent_man % child_man); + I(!null_id(child_man)); // add reverse deltas - if (!sess.app.db.manifest_version_exists(parent_man) - && !null_id(parent_man)) + if (!null_id(parent_man)) { L(F("inserting manifest rev_deltas")); - rev_manifest_deltas[child_man].insert(parent_man); + rev_manifest_deltas.insert(make_pair(child_man, parent_man)); } - // handle the manifest forward-delta for this head - if (child_man == fwd_anc - && !sess.app.db.manifest_version_exists(child_man)) + // handle the manifest forward-deltas + if (!null_id(parent_man) + && fwd_manifest_deltas.find(child_man) != fwd_manifest_deltas.end()) { - L(F("set connected_manifest to %s (was %s)") - % parent_man % fwd_anc); - fwd_anc = parent_man; - } + // We're traversing with child->parent of A->B. + // Update any forward deltas with a parent of B to + // have A as a parent, ie B->C becomes A->C. + L(F("inserting manifest rev_deltas")); + for (multimap::iterator d = + fwd_manifest_deltas.lower_bound(child_man); + d != fwd_manifest_deltas.upper_bound(child_man); + d++) + { + L(F("here %s,%s") % d->first % d->second); + } + for (multimap::iterator d = + fwd_manifest_deltas.lower_bound(child_man); + d != fwd_manifest_deltas.upper_bound(child_man); + d++) + { + L(F("size %d\n") % fwd_manifest_deltas.size()); + L(F("inserting %s->%s") % parent_man % d->second); + fwd_manifest_deltas.insert(make_pair(parent_man, d->second)); + L(F("erasing %s,%s") % d->first % d->second); + } + fwd_manifest_deltas.erase(fwd_manifest_deltas.lower_bound(child_man), + fwd_manifest_deltas.upper_bound(child_man)); + } } void -ancestry_fetcher::traverse_ancestry(revision_id const & head) +ancestry_fetcher::traverse_ancestry(set const & heads) { - I(seen_revs.find(head) == seen_revs.end()); - I(sess.ancestry.find(head) != sess.ancestry.end()); - deque frontier; - frontier.push_back(head); + set seen_revs; - // this map will get built up so that upon traversing to a known revision, - // we get pairs of ( existing_file, head_file ). along the way, it will - // contain pairs ( ancestor_file, head_file ), updated as the changeset - // is traversed. - map< file_id, file_id > fwd_jump_deltas; - // similarly for the manifest, we'll request it entirely - manifest_id m = sess.ancestry[head]->second.new_manifest; - manifest_id head_manifest; - if (!sess.app.db.manifest_version_exists(m)) + for (set::const_iterator h = heads.begin(); + h != heads.end(); h++) { - head_manifest = m; + L(F("inserting head %s") % *h); + frontier.push_back(*h); + seen_revs.insert(*h); + manifest_id const & m = sess.ancestry[*h]->second.new_manifest; + fwd_manifest_deltas.insert(make_pair(m,m)); } - L(F("head_manifest %s, new_manifest %s") % head_manifest % m); - manifest_id connected_manifest = head_manifest; // breadth first up the ancestry while (!frontier.empty()) { - revision_id rev = frontier.front(); - L(F("ancestry frontier front %s") % rev); + revision_id const & rev = frontier.front(); + + L(F("frontier %s") % rev); I(sess.ancestry.find(rev) != sess.ancestry.end()); - frontier.pop_front(); - for (edge_map::const_iterator e = sess.ancestry[rev]->second.edges.begin(); e != sess.ancestry[rev]->second.edges.end(); e++) { - revision_id const & par = e->first; + revision_id const & par = edge_old_revision(e); if (seen_revs.find(par) == seen_revs.end()) { if (sess.ancestry.find(par) != sess.ancestry.end()) { - L(F("ancestry frontier push_back %s") % par); + L(F("push_back to frontier %s") % par); frontier.push_back(par); } - else - { - L(F("ancestry frontier not adding %s, isn't in ancestry") % par); - } seen_revs.insert(par); } - traverse_manifest(sess.ancestry[rev]->second.new_manifest, edge_old_manifest(e), - connected_manifest); - traverse_files(edge_changes(e), fwd_jump_deltas); + traverse_manifest(sess.ancestry[rev]->second.new_manifest, + edge_old_manifest(e)); + traverse_files(edge_changes(e)); - sess.dbw.consume_revision_data(rev, sess.ancestry[rev]->first); } - } - for (map::const_iterator i = fwd_jump_deltas.begin(); - i != fwd_jump_deltas.end(); i++) - { - if (null_id(i->first)) - { - L(F("requesting full head file %s") % i->second); - full_files.insert(i->second); - } - else - { - fwd_file_deltas[i->first] = i->second; - } + sess.dbw.consume_revision_data(rev, sess.ancestry[rev]->first); + frontier.pop_front(); } - - if (!null_id(head_manifest)) - { - if (null_id(connected_manifest)) - { - L(F("requesting full manifest %s") % head_manifest); - full_manifests.insert(head_manifest); - } - else - { - L(F("requesting manifest fwd delta %s -> %s") - % connected_manifest % head_manifest); - fwd_manifest_deltas[connected_manifest] = head_manifest; - } - } } void ancestry_fetcher::request_rev_file_deltas(file_id const & start) { - vector< pair > frontier; - frontier.push_back(make_pair(file_id(), start)); + stack< file_id > frontier; + frontier.push(start); - L(F("request_rev_file_deltas: start %s, %d rev_file_deltas") - % start % rev_file_deltas.size()); - // depth first while (!frontier.empty()) { - file_id child = frontier.back().first; - file_id parent = frontier.back().second; + file_id const child = frontier.top(); + I(!null_id(child)); + frontier.pop(); - // request it - if (!null_id(child)) + for (multimap< file_id, file_id>::const_iterator + d = rev_file_deltas.lower_bound(child); + d != rev_file_deltas.upper_bound(child); + d++) { - L(F("requesting rev file delta child %s -> parent %s") - % child % parent); - sess.queue_send_delta_cmd(file_item, - plain_id(child), plain_id(parent)); - } - frontier.pop_back(); - - if (rev_file_deltas.find(parent) != rev_file_deltas.end()) - { - for (set::const_iterator f = rev_file_deltas[parent].begin(); - f != rev_file_deltas[parent].end(); f++) + file_id const & parent = d->second; + I(!null_id(parent)); + if (!sess.app.db.file_version_exists(parent)) { - frontier.push_back( make_pair(parent, *f) ); + L(F("requesting reverse file delta %s->%s") + % child % parent); + sess.queue_send_delta_cmd(file_item, + plain_id(child), plain_id(parent)); + sess.reverse_delta_requests.insert(make_pair(plain_id(child), + plain_id(parent))); } + else + { + L(F("already have file %s, not requesting rev delta") + % parent); + } + frontier.push(parent); } } } @@ -4108,87 +3833,73 @@ void ancestry_fetcher::request_files() { - - L(F("request_files: %d full, %d fwd_deltas") - % full_files.size() % fwd_file_deltas.size()); - - L(F("rev files deltas are:")); - for (map >::const_iterator d = rev_file_deltas.begin(); - d != rev_file_deltas.end(); d++) + for (multimap::const_iterator d = fwd_file_deltas.begin(); + d != fwd_file_deltas.end(); d++) { - L(F("child %s") % d->first); - for (set::const_iterator p = d->second.begin(); - p != d->second.end(); p++) + file_id const & anc = d->first; + file_id const & child = d->second; + if (!sess.app.db.file_version_exists(child)) { - L(F("\tpar %s") % *p); + if (null_id(anc) + || !sess.app.db.file_version_exists(anc)) + { + L(F("requesting full file %s") % child); + sess.queue_send_data_cmd(file_item, plain_id(child)); + } + else + { + L(F("requesting forward delta %s->%s") + % anc % child); + sess.queue_send_delta_cmd(file_item, + plain_id(anc), plain_id(child)); + sess.note_item_full_delta(file_item, plain_id(child)); + } } - } - - // we start with the full files, and work - // our way back. - for (set::const_iterator f = full_files.begin(); - f != full_files.end(); f++) - { - L(F("requesting full file data %s") % *f); - sess.queue_send_data_cmd(file_item, plain_id(*f)); - request_rev_file_deltas(*f); - anchor_files.erase(*f); - } - full_files.clear(); + else + { + L(F("not requesting fwd delta %s->%s, already have dst") + % anc % child); + } - // and now the full files we'll get from fwd deltas - for (map::const_iterator d = fwd_file_deltas.begin(); - d != fwd_file_deltas.end(); d++) - { - L(F("requesting fwd file delta %s->%s") % d->first % d->second); - sess.queue_send_delta_cmd(file_item, plain_id(d->first), plain_id(d->second)); - sess.note_item_full_delta(file_item, plain_id(d->second)); - request_rev_file_deltas(d->second); - anchor_files.erase(d->second); + // traverse up the reverse deltas + request_rev_file_deltas(child); } - - // and finally all the remaining anchor files - for (set::const_iterator f = anchor_files.begin(); - f != anchor_files.end(); f++) - { - L(F("rev deltas from anchor file %s") % *f); - request_rev_file_deltas(*f); - } - anchor_files.clear(); } void ancestry_fetcher::request_rev_manifest_deltas(manifest_id const & start) { - vector< pair > frontier; - frontier.push_back(make_pair(manifest_id(), start)); + stack< manifest_id > frontier; + frontier.push(start); - L(F("request_rev_manifest_deltas: start %s, %d rev_manifest_deltas") - % start % rev_manifest_deltas.size()); - - // depth first while (!frontier.empty()) { - manifest_id child = frontier.back().first; - manifest_id parent = frontier.back().second; + manifest_id const child = frontier.top(); + I(!null_id(child)); + frontier.pop(); - // request it - if (!null_id(child)) + for (multimap< manifest_id, manifest_id>::const_iterator + d = rev_manifest_deltas.lower_bound(child); + d != rev_manifest_deltas.upper_bound(child); + d++) { - L(F("requesting rev manifest delta child %s -> parent %s") - % child % parent); - sess.queue_send_delta_cmd(manifest_item, - plain_id(child), plain_id(parent)); - } - frontier.pop_back(); - - if (rev_manifest_deltas.find(parent) != rev_manifest_deltas.end()) - { - for (set::const_iterator m = rev_manifest_deltas[parent].begin(); - m != rev_manifest_deltas[parent].end(); m++) + manifest_id const & parent = d->second; + I(!null_id(parent)); + if (!sess.app.db.manifest_version_exists(parent)) { - frontier.push_back( make_pair(parent, *m) ); + L(F("requesting reverse manifest delta %s->%s") + % child % parent); + sess.queue_send_delta_cmd(manifest_item, + plain_id(child), plain_id(parent)); + sess.reverse_delta_requests.insert(make_pair(plain_id(child), + plain_id(parent))); } + else + { + L(F("already have manifest %s, not requesting rev delta") + % parent); + } + frontier.push(parent); } } } @@ -4196,28 +3907,35 @@ void ancestry_fetcher::request_manifests() { - L(F("request_manifests: %d full, %d fwd_deltas") - % full_manifests.size() % fwd_manifest_deltas.size()); - - // we start with the full manifests, and work - // our way back. - for (set::const_iterator f = full_manifests.begin(); - f != full_manifests.end(); f++) + for (multimap::const_iterator d = fwd_manifest_deltas.begin(); + d != fwd_manifest_deltas.end(); d++) { - L(F("requesting full manifest data %s") % *f); - sess.queue_send_data_cmd(manifest_item, plain_id(*f)); - request_rev_manifest_deltas(*f); - } - full_manifests.clear(); + manifest_id const & anc = d->first; + manifest_id const & child = d->second; + if (!sess.app.db.manifest_version_exists(child)) + { + if (null_id(anc) + || !sess.app.db.manifest_version_exists(anc)) + { + L(F("requesting full manifest %s") % child); + sess.queue_send_data_cmd(manifest_item, plain_id(child)); + } + else + { + L(F("requesting forward delta %s->%s") + % anc % child); + sess.queue_send_delta_cmd(manifest_item, + plain_id(anc), plain_id(child)); + sess.note_item_full_delta(manifest_item, plain_id(child)); + } + } + else + { + L(F("not requesting fwd delta %s->%s, already have dst") + % anc % child); + } - // and now the full manifests we'll get from fwd deltas - for (map::const_iterator d = fwd_manifest_deltas.begin(); - d != fwd_manifest_deltas.end(); d++) - { - L(F("requesting fwd manifest delta %s->%s") % d->first % d->second); - sess.queue_send_delta_cmd(manifest_item, plain_id(d->first), plain_id(d->second)); - sess.note_item_full_delta(manifest_item, plain_id(d->second)); - request_rev_manifest_deltas(d->second); + // traverse up the reverse deltas + request_rev_manifest_deltas(child); } } - ======================================================================== --- packet.cc 407e15cf1c485e09665a372588be2f99491a17d7 +++ packet.cc 0867d1963018a7de23a7ca07be1e637817aaeb50 @@ -246,12 +246,14 @@ file_id new_id; file_delta del; bool forward_delta; + bool write_full; public: delayed_file_delta_packet(file_id const & oi, file_id const & ni, file_delta const & md, - bool fwd) - : old_id(oi), new_id(ni), del(md), forward_delta(fwd) + bool fwd, + bool full = false) + : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full) {} virtual void apply_delayed_packet(packet_db_writer & pw); virtual ~delayed_file_delta_packet(); @@ -265,12 +267,14 @@ manifest_id new_id; manifest_delta del; bool forward_delta; + bool write_full; public: delayed_manifest_delta_packet(manifest_id const & oi, manifest_id const & ni, manifest_delta const & md, - bool fwd) - : old_id(oi), new_id(ni), del(md), forward_delta(fwd) + bool fwd, + bool full = false) + : old_id(oi), new_id(ni), del(md), forward_delta(fwd), write_full(full) {} virtual void apply_delayed_packet(packet_db_writer & pw); virtual ~delayed_manifest_delta_packet(); @@ -361,12 +365,13 @@ void delayed_manifest_delta_packet::apply_delayed_packet(packet_db_writer & pw) { - L(F("writing delayed manifest %s packet for %s -> %s\n") + L(F("writing delayed manifest %s packet for %s -> %s%s\n") % (forward_delta ? "delta" : "reverse delta") % (forward_delta ? old_id : new_id) - % (forward_delta ? new_id : old_id)); + % (forward_delta ? new_id : old_id) + % (write_full ? " (writing in full)" : "")); if (forward_delta) - pw.consume_manifest_delta(old_id, new_id, del); + pw.consume_manifest_delta(old_id, new_id, del, write_full); else pw.consume_manifest_reverse_delta(new_id, old_id, del); } @@ -381,12 +386,13 @@ void delayed_file_delta_packet::apply_delayed_packet(packet_db_writer & pw) { - L(F("writing delayed file %s packet for %s -> %s\n") + L(F("writing delayed file %s packet for %s -> %s%s\n") % (forward_delta ? "delta" : "reverse delta") % (forward_delta ? old_id : new_id) - % (forward_delta ? new_id : old_id)); + % (forward_delta ? new_id : old_id) + % (write_full ? " (writing in full)" : "")); if (forward_delta) - pw.consume_file_delta(old_id, new_id, del); + pw.consume_file_delta(old_id, new_id, del, write_full); else pw.consume_file_reverse_delta(new_id, old_id, del); } @@ -656,6 +662,15 @@ file_id const & new_id, file_delta const & del) { + consume_file_delta(old_id, new_id, del, false); +} + +void +packet_db_writer::consume_file_delta(file_id const & old_id, + file_id const & new_id, + file_delta const & del, + bool write_full) +{ transaction_guard guard(pimpl->app.db); if (! pimpl->file_version_exists_in_db(new_id)) { @@ -669,7 +684,10 @@ calculate_ident(file_data(new_dat), confirm); if (confirm == new_id) { - pimpl->app.db.put_file_version(old_id, new_id, del); + if (!write_full) + pimpl->app.db.put_file_version(old_id, new_id, del); + else + pimpl->app.db.put_file(new_id, file_data(new_dat)); pimpl->accepted_file(new_id, *this); } else @@ -682,7 +700,7 @@ { L(F("delaying file delta %s -> %s for preimage\n") % old_id % new_id); shared_ptr dp; - dp = shared_ptr(new delayed_file_delta_packet(old_id, new_id, del, true)); + dp = shared_ptr(new delayed_file_delta_packet(old_id, new_id, del, true, write_full)); shared_ptr fp; pimpl->get_file_prereq(old_id, fp); dp->add_prerequisite(fp); @@ -761,6 +779,15 @@ manifest_id const & new_id, manifest_delta const & del) { + consume_manifest_delta(old_id, new_id, del, false); +} + +void +packet_db_writer::consume_manifest_delta(manifest_id const & old_id, + manifest_id const & new_id, + manifest_delta const & del, + bool write_full) +{ transaction_guard guard(pimpl->app.db); if (! pimpl->manifest_version_exists_in_db(new_id)) { @@ -774,7 +801,11 @@ calculate_ident(manifest_data(new_dat), confirm); if (confirm == new_id) { - pimpl->app.db.put_manifest_version(old_id, new_id, del); + if (!write_full) + pimpl->app.db.put_manifest_version(old_id, new_id, del); + else + pimpl->app.db.put_manifest(new_id, manifest_data(new_dat)); + pimpl->accepted_manifest(new_id, *this); } else @@ -787,7 +818,7 @@ { L(F("delaying manifest delta %s -> %s for preimage\n") % old_id % new_id); shared_ptr dp; - dp = shared_ptr(new delayed_manifest_delta_packet(old_id, new_id, del, true)); + dp = shared_ptr(new delayed_manifest_delta_packet(old_id, new_id, del, true, write_full)); shared_ptr fp; pimpl->get_manifest_prereq(old_id, fp); dp->add_prerequisite(fp); @@ -1108,6 +1139,15 @@ } void +packet_db_valve::consume_file_delta(file_id const & id_old, + file_id const & id_new, + file_delta const & del, + bool write_full) +{ + DOIT(delayed_file_delta_packet(id_old, id_new, del, true, write_full)); +} + +void packet_db_valve::consume_file_reverse_delta(file_id const & id_new, file_id const & id_old, file_delta const & del) @@ -1131,6 +1171,15 @@ } void +packet_db_valve::consume_manifest_delta(manifest_id const & id_old, + manifest_id const & id_new, + manifest_delta const & del, + bool write_full) +{ + DOIT(delayed_manifest_delta_packet(id_old, id_new, del, true, write_full)); +} + +void packet_db_valve::consume_manifest_reverse_delta(manifest_id const & id_new, manifest_id const & id_old, manifest_delta const & del) ======================================================================== --- packet.hh ec7178c2332473305c0aa7d00c727e338fc7810d +++ packet.hh c2d6f03ffa901f98ad6623090b687982d27db552 @@ -136,6 +136,10 @@ virtual void consume_file_delta(file_id const & id_old, file_id const & id_new, file_delta const & del); + virtual void consume_file_delta(file_id const & id_old, + file_id const & id_new, + file_delta const & del, + bool write_full); virtual void consume_file_reverse_delta(file_id const & id_new, file_id const & id_old, file_delta const & del); @@ -145,6 +149,10 @@ virtual void consume_manifest_delta(manifest_id const & id_old, manifest_id const & id_new, manifest_delta const & del); + virtual void consume_manifest_delta(manifest_id const & id_old, + manifest_id const & id_new, + manifest_delta const & del, + bool write_full); virtual void consume_manifest_reverse_delta(manifest_id const & id_new, manifest_id const & id_old, manifest_delta const & del); @@ -185,6 +193,10 @@ virtual void consume_file_delta(file_id const & id_old, file_id const & id_new, file_delta const & del); + virtual void consume_file_delta(file_id const & id_old, + file_id const & id_new, + file_delta const & del, + bool write_full); virtual void consume_file_reverse_delta(file_id const & id_new, file_id const & id_old, file_delta const & del); @@ -194,6 +206,10 @@ virtual void consume_manifest_delta(manifest_id const & id_old, manifest_id const & id_new, manifest_delta const & del); + virtual void consume_manifest_delta(manifest_id const & id_old, + manifest_id const & id_new, + manifest_delta const & del, + bool write_full); virtual void consume_manifest_reverse_delta(manifest_id const & id_new, manifest_id const & id_old, manifest_delta const & del);