# # patch "change_set.hh" # from [c9d0a8fa88d57ae3e6d479d2d62a1e5ea0016ed3] # to [a060f0c256cd5fb1c1156b130eaf3842976012fe] # # patch "netsync.cc" # from [7722aed87080027bef960c2730859dc7cd5fa7e2] # to [6e9b9ebd7efd3e8d82507ba860ba7b09f4d14bd0] # # patch "vocab.hh" # from [22382ac1bdffec21170a88ff2580fe39b508243f] # to [511e1ef3052189b3868e5b3932b12c198eab396f] # ======================================================================== --- change_set.hh c9d0a8fa88d57ae3e6d479d2d62a1e5ea0016ed3 +++ change_set.hh a060f0c256cd5fb1c1156b130eaf3842976012fe @@ -77,24 +77,6 @@ return p.empty(); } -inline bool -null_id(file_id const & i) -{ - return i.inner()().empty(); -} - -inline bool -null_id(manifest_id const & i) -{ - return i.inner()().empty(); -} - -inline bool -null_id(revision_id const & i) -{ - return i.inner()().empty(); -} - inline file_path const & delta_entry_path(change_set::delta_map::const_iterator i) { ======================================================================== --- netsync.cc 7722aed87080027bef960c2730859dc7cd5fa7e2 +++ netsync.cc 6e9b9ebd7efd3e8d82507ba860ba7b09f4d14bd0 @@ -306,6 +306,7 @@ map done_refinements; map > > requested_items; map > > received_items; + map > > full_delta_items; map > > ancestry; map > > received_certs; set< pair > reverse_delta_requests; @@ -345,6 +346,7 @@ bool all_requested_revisions_received(); void note_item_requested(netcmd_item_type ty, id const & i); + void note_item_full_delta(netcmd_item_type ty, id const & ident); bool item_already_requested(netcmd_item_type ty, id const & i); void note_item_arrived(netcmd_item_type ty, id const & i); @@ -482,6 +484,7 @@ return ROOT_PREFIX; } +static file_id null_ident; session::session(protocol_role role, protocol_voice voice, @@ -548,6 +551,9 @@ received_items.insert(make_pair(manifest_item, boost::shared_ptr< set >(new set()))); received_items.insert(make_pair(file_item, boost::shared_ptr< set >(new set()))); received_items.insert(make_pair(epoch_item, boost::shared_ptr< set >(new set()))); + + full_delta_items.insert(make_pair(manifest_item, boost::shared_ptr< set >(new set()))); + full_delta_items.insert(make_pair(file_item, boost::shared_ptr< set >(new set()))); } session::~session() @@ -757,6 +763,15 @@ } void +session::note_item_full_delta(netcmd_item_type ty, id const & ident) +{ + map > >::const_iterator + i = full_delta_items.find(ty); + I(i != full_delta_items.end()); + i->second->insert(ident); +} + +void session::note_item_arrived(netcmd_item_type ty, id const & ident) { map > >::const_iterator @@ -1170,118 +1185,107 @@ } } -void -session::analyze_ancestry_graph() +void +session::get_heads_and_consume_certs( set heads ) { - typedef map > > ancestryT; - typedef map > cert_map; - if (! (all_requested_revisions_received() && cert_refinement_done())) - return; + set nodes, parents; + map chld_num; + L(F("analyzing %d ancestry edges\n") % ancestry.size()); - if (analyzed_ancestry) - return; + for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i) + { + nodes.insert(i->first); + for (edge_map::const_iterator j = i->second->second.edges.begin(); + j != i->second->second.edges.end(); ++j) + { + parents.insert(edge_old_revision(j)); + map::iterator n; + n = chld_num.find(edge_old_revision(j)); + if (n == chld_num.end()) + chld_num.insert(make_pair(edge_old_revision(j), 1)); + else + ++(n->second); + } + } + + set_difference(nodes.begin(), nodes.end(), + parents.begin(), parents.end(), + inserter(heads, heads.begin())); - set heads; - { - set nodes, parents; - map chld_num; - L(F("analyzing %d ancestry edges\n") % ancestry.size()); + // Write permissions checking: + // remove heads w/o proper certs, add their children to heads + // 1) remove unwanted branch certs from consideration + // 2) remove heads w/o a branch tag, process new exposed heads + // 3) repeat 2 until no change - for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i) - { - nodes.insert(i->first); - for (edge_map::const_iterator j = i->second->second.edges.begin(); - j != i->second->second.edges.end(); ++j) - { - parents.insert(edge_old_revision(j)); - map::iterator n; - n = chld_num.find(edge_old_revision(j)); - if (n == chld_num.end()) - chld_num.insert(make_pair(edge_old_revision(j), 1)); - else - ++(n->second); - } - } - - set_difference(nodes.begin(), nodes.end(), - parents.begin(), parents.end(), - inserter(heads, heads.begin())); + //1 + set ok_branches, bad_branches; + cert_name bcert_name(branch_cert_name); + cert_name tcert_name(tag_cert_name); + for (map::iterator i = received_certs.begin(); + i != received_certs.end(); ++i) + { + //branches + vector & bcerts(i->second[bcert_name]); + vector keeping; + for (vector::iterator j = bcerts.begin(); j != bcerts.end(); ++j) + { + cert_value name; + decode_base64(j->value, name); + if (ok_branches.find(name()) != ok_branches.end()) + keeping.push_back(*j); + else if (bad_branches.find(name()) != bad_branches.end()) + ; + else + { + if (our_matcher(name())) + { + ok_branches.insert(name()); + keeping.push_back(*j); + } + else + { + bad_branches.insert(name()); + W(F("Dropping branch certs for unwanted branch %s") + % name); + } + } + } + bcerts = keeping; + } + //2 + list tmp; + for (set::iterator i = heads.begin(); i != heads.end(); ++i) + { + if (!received_certs[*i][bcert_name].size()) + tmp.push_back(*i); + } + for (list::iterator i = tmp.begin(); i != tmp.end(); ++i) + heads.erase(*i); + //3 + while (tmp.size()) + { + ancestryT::const_iterator i = ancestry.find(tmp.front()); + if (i != ancestry.end()) + { + for (edge_map::const_iterator j = i->second->second.edges.begin(); + j != i->second->second.edges.end(); ++j) + { + if (!--chld_num[edge_old_revision(j)]) + { + if (received_certs[i->first][bcert_name].size()) + heads.insert(i->first); + else + tmp.push_back(edge_old_revision(j)); + } + } + // since we don't want this rev, we don't want it's certs either + received_certs[tmp.front()] = cert_map(); + } + tmp.pop_front(); + } - // Write permissions checking: - // remove heads w/o proper certs, add their children to heads - // 1) remove unwanted branch certs from consideration - // 2) remove heads w/o a branch tag, process new exposed heads - // 3) repeat 2 until no change - - //1 - set ok_branches, bad_branches; - cert_name bcert_name(branch_cert_name); - cert_name tcert_name(tag_cert_name); - for (map::iterator i = received_certs.begin(); - i != received_certs.end(); ++i) - { - //branches - vector & bcerts(i->second[bcert_name]); - vector keeping; - for (vector::iterator j = bcerts.begin(); j != bcerts.end(); ++j) - { - cert_value name; - decode_base64(j->value, name); - if (ok_branches.find(name()) != ok_branches.end()) - keeping.push_back(*j); - else if (bad_branches.find(name()) != bad_branches.end()) - ; - else - { - if (our_matcher(name())) - { - ok_branches.insert(name()); - keeping.push_back(*j); - } - else - { - bad_branches.insert(name()); - W(F("Dropping branch certs for unwanted branch %s") - % name); - } - } - } - bcerts = keeping; - } - //2 - list tmp; - for (set::iterator i = heads.begin(); i != heads.end(); ++i) - { - if (!received_certs[*i][bcert_name].size()) - tmp.push_back(*i); - } - for (list::iterator i = tmp.begin(); i != tmp.end(); ++i) - heads.erase(*i); - //3 - while (tmp.size()) - { - ancestryT::const_iterator i = ancestry.find(tmp.front()); - if (i != ancestry.end()) - { - for (edge_map::const_iterator j = i->second->second.edges.begin(); - j != i->second->second.edges.end(); ++j) - { - if (!--chld_num[edge_old_revision(j)]) - { - if (received_certs[i->first][bcert_name].size()) - heads.insert(i->first); - else - tmp.push_back(edge_old_revision(j)); - } - } - // since we don't want this rev, we don't want it's certs either - received_certs[tmp.front()] = cert_map(); - } - tmp.pop_front(); - } - } - // We've reduced the certs to those we want now, send them to dbw. for (map::iterator i = received_certs.begin(); i != received_certs.end(); ++i) @@ -1296,10 +1300,34 @@ } } } +} - L(F("isolated %d heads\n") % heads.size()); +void +session::analyze_ancestry_graph() +{ + if (! (all_requested_revisions_received() && cert_refinement_done())) + return; - // first we determine the "attachment status" of each node in our ancestry + if (analyzed_ancestry) + return; + + ancestry_fetcher fetch(); + + analyzed_ancestry = true; +} + +void +session::analyze_ancestry_graph() +{ + typedef map > > ancestryT; + typedef map > cert_map; + + if (! (all_requested_revisions_received() && cert_refinement_done())) + return; + + if (analyzed_ancestry) + return; + // graph. map attached; @@ -3833,3 +3861,308 @@ end_platform_netsync(); } +struct +ancestry_fetcher +{ + session & sess; + + set const & seen_revs; + // map children to parents + map< file_id, set > & rev_file_deltas; + // map an ancestor to a child + map< file_id, file_id > & fwd_file_deltas; + set< file_id > & full_files; + map< manifest_id, set > & rev_manifest_deltas; + map< manifest_id, manifest_id > & fwd_manifest_deltas; + set< manifest_id > & full_manifests; + set< file_id > & seen_files; + + ancestry_fetcher(session & s) { sess = s }; +}; + +void +ancestry_fetcher::traverse_files(change_set const & cset, + map< file_id, file_id > & fwd_jump_deltas) +{ + for (change_set::delta_map::const_iterator d = cset.deltas.begin(); + d != cset.deltas.end(); ++d) + { + + file_id parent_file (delta_entry_src(d)); + file_id child_file (delta_entry_dst(d)); + + // surely we can assume this + I(parent_file != child_file); + + // when changeset format is altered, this needs revisiting + I(!null_id(child_file)); + + L(F("traverse_files parent %s child %s") + % parent_file % child_file); + + // if this is the first time we've seen the child file_id, + // add it to the fwd-delta list. + if (seen_files.find(child_file) == seen_files.end() + && !this->app.db.file_version_exists(child_file)) + { + L(F("inserting fwd_jump_deltas")); + fwd_jump_deltas.insert( make_pair( parent_file, child_file ) ); + } + else + { + // otherwise, request the reverse delta... + if (!this->app.db.file_version_exists(parent_file) + && !null_id(parent_file)) + { + L(F("inserting file rev_deltas")); + rev_file_deltas[child_file].insert(parent_file); + } + // ... and update any fwd_jump_deltas. + // no point updating if it already references something we have. + if (fwd_jump_deltas.find(child_file) != fwd_jump_deltas.end() + && !this->app.db.file_version_exists(child_file)) + { + L(F("updating fwd_jump_deltas for head file %s") + % fwd_jump_deltas[child_file]); + fwd_jump_deltas[parent_file] = fwd_jump_deltas[child_file]; + fwd_jump_deltas.erase(child_file); + } + } + seen_files.insert(child_file); + seen_files.insert(parent_file); + } +} + +void +ancestry_fetcher::traverse_manifest(manifest_id const & child_man, + manifest_id const & parent_man, + manifest_id & fwd_anc) +{ + + + L(F("traverse_manifest parent %s child %s") + % parent_man % child_man); + // add reverse deltas + if (!this->app.db.manifest_version_exists(parent_man) + && !null_id(parent_man)) + { + L(F("inserting manifest rev_deltas")); + rev_manifest_deltas[child_man].insert(parent_man); + } + + // handle the manifest forward-delta for this head + if (child_man == fwd_anc + && !this->app.db.manifest_version_exists(child_man)) + { + L(F("set connected_manifest to %s (was %s)") + % parent_man, fwd_anc); + fwd_anc = parent_man; + } + +} + +void +ancestry_fetcher::traverse_ancestry(revision_id const & head) +{ + I(seen_revs.find(head) == seen_revs.end()); + I(sess.ancestry.find(head) != sess.ancestry.end()); + + deque frontier; + frontier.push_back(head); + + // this map will get built up so that upon traversing to a known revision, + // we get pairs of ( existing_file, head_file ). along the way, it will + // contain pairs ( ancestor_file, head_file ), updated as the changeset + // is traversed. + map< file_id, file_id > fwd_jump_deltas; + // similarly for the manifest, we'll request it entirely + manifest_id m = sess.ancestry[head].second.new_manifest; + manifest_id head_manifest; + if (this->app.db.manifest_version_exists(m)) + { + head_manifest = m; + } + manifest_id connected_manifest = head_manifest; + + // breadth first up the ancestry + while (!frontier.empty()) + { + revision_id const & rev = frontier.front(); + L(F("traversing %s") % rev); + I(sess.ancestry.find(rev) != sess.ancestry.end()); + + frontier.pop_front(); + + for (edge_map::const_iterator e = sess.ancestry[rev].second.edges.begin() + e != sess.ancestry[rev].second.edges.end(); e++) + { + if (seen_revs.find(e->first) == seen_revs.end()) + front.push_back(head); + + traverse_manifest(sess.ancestry[rev].second.new_manifest, edge_old_manifest(e), + connected_manifest, rev_man_deltas); + traverse_files(edge_changes(e), fwd_jump_deltas); + } + } + + for (map::const_iterator i = fwd_jump_deltas.begin(); + i != fwd_jump_deltas.end(); i++) + { + if (null_id(i->first)) + { + L(F("requesting full head file %s") % i->second); + full_files.insert(i->second); + } + else + { + fwd_file_deltas[i->first] = i->second; + } + } + + if (!null_id(head_manifest)) + { + if (null_id(connected_manifest)) + { + L(F("requesting full manifest %s") % head_manifest); + full_manifests.insert(head_manifest); + } + else + { + L(F("requesting manifest fwd delta %s -> %s") + % connected_manifest % head_manifest); + fwd_manifest_deltas[connected_manifest] = head_manifest; + } + } +} + +void +ancestry_fetcher::request_rev_file_deltas(file_id const & start) +{ + vector< pair > frontier; + frontier.push_back(make_pair("", start)); + + // depth first + while (!frontier.empty()) + { + file_id child = frontier.back().first; + file_id parent = frontier.back().second; + + // request it + if (!null_id(child)) + { + L(F("requesting rev file delta child %s -> parent %s") + % child % parent); + queue_send_delta_cmd(file_item, + plain_id(child), plain_id(parent)); + } + frontier.pop_back(); + + if (rev_file_deltas.find(parent) != rev_file_deltas.end()) + { + frontier.insert(frontier.end(), rev_file_deltas.begin(), + rev_file_deltas.end()); + } + } +} + +void +ancestry_fetcher::request_files() +{ + // we start with the full files, and work + // our way back. + for (set::const_iterator f = full_files.begin(); + f != full_files.end(); f++) + { + L(F("requesting full file data %s") % *f); + queue_send_data_cmd(file_item, plain_id(*f)); + request_rev_file_deltas(*f); + } + full_files.clear(); + + // and now the full files we'll get from fwd deltas + for (map::const_iterator d = fwd_file_deltas.begin(); + d != fwd_file_deltas.end(); d++) + { + L(F("requesting fwd file delta %s->%s") % d->first % d->second); + queue_send_delta_cmd(file_item, plain_id(d->first), plain_id(d->second)); + request_rev_file_deltas(d->second); + sess.note_item_full_delta(file_item, d->second); + } +} + +void +ancestry_fetcher::request_rev_manifest_deltas(manifest_id const & start) +{ + vector< pair > frontier; + frontier.push_back(make_pair("", start)); + + // depth first + while (!frontier.empty()) + { + manifest_id child = frontier.back().first; + manifest_id parent = frontier.back().second; + + // request it + if (!null_id(child)) + { + L(F("requesting rev manifest delta child %s -> parent %s") + % child % parent); + queue_send_delta_cmd(manifest_item, + plain_id(child), plain_id(parent)); + } + frontier.pop_back(); + + if (rev_manifest_deltas.find(parent) != rev_manifest_deltas.end()) + { + frontier.insert(frontier.end(), rev_manifest_deltas.begin(), + rev_manifest_deltas.end()); + } + } +} + +void +ancestry_fetcher::request_manifests() +{ + // we start with the full manifests, and work + // our way back. + for (set::const_iterator f = full_manifests.begin(); + f != full_manifests.end(); f++) + { + L(F("requesting full manifest data %s") % *f); + queue_send_data_cmd(manifest_item, plain_id(*f)); + request_rev_manifest_deltas(*f); + } + full_manifests.clear(); + + // and now the full manifests we'll get from fwd deltas + for (map::const_iterator d = fwd_manifest_deltas.begin(); + d != fwd_manifest_deltas.end(); d++) + { + L(F("requesting fwd manifest delta %s->%s") % d->first % d->second); + queue_send_delta_cmd(manifest_item, plain_id(d->first), plain_id(d->second)); + request_rev_manifest_deltas(d->second); + sess.note_item_full_delta(manifest_item, d->second); + } +} + +void +ancestry_fetcher::ancestry_fetcher(session & sess) +{ + this->sess = sess; + + // Get the heads. + set new_heads; + set new_heads; // TODO: is this the right datastructure? + sess.get_heads_and_consume_certs( new_heads ); + + // for each new head, we traverse up the ancestry until we reach an existing + // revision, an already-requested revision, or a root revision. + for (set::const_iterator h = new_heads.being(); + h != new_heads.end(); h++) + { + traverse_ancestry(*h); + } + + request_files(); + request_manifests(); +} ======================================================================== --- vocab.hh 22382ac1bdffec21170a88ff2580fe39b508243f +++ vocab.hh 511e1ef3052189b3868e5b3932b12c198eab396f @@ -166,5 +166,24 @@ external_diff }; +// do these belong here? +inline bool +null_id(file_id const & i) +{ + return i.inner()().empty(); +} +inline bool +null_id(manifest_id const & i) +{ + return i.inner()().empty(); +} + +inline bool +null_id(revision_id const & i) +{ + return i.inner()().empty(); +} + + #endif // __VOCAB_HH__