# # # add_file "enumerator.cc" # # add_file "enumerator.hh" # # add_file "refiner.cc" # # add_file "refiner.hh" # # patch "Makefile.am" # from [b4e54cf6debc0b615c8b1da211aeae2b65431696] # to [d0fcededc8e09cf6d532265e423723d1abbbf6bf] # # patch "database.cc" # from [c8045a26672bb50696bcd9d12748f37399a52768] # to [c70899f5e2e9703b4adaf13e0d4e755a44c6c3fa] # # patch "database.hh" # from [ec17217f4b392f21635c1bb15487d7b1c0146856] # to [4ed94cac1003bf18349465230031fb2b9c2f67f6] # # patch "enumerator.cc" # from [] # to [7a30cef784137174aa75ccdbe6a412b81e58cae6] # # patch "enumerator.hh" # from [] # to [b7e467ace9dd121e55ceeb8ef787ba4c03c4b735] # # patch "merkle_tree.cc" # from [728d4a69e905674600713ccd275a144604a92544] # to [9dac81fddfc596a6a6e55c205a69cd344eccba83] # # patch "merkle_tree.hh" # from [16ba9bbc41f85101dcf153805677cea19ccb9526] # to [cf513c05a9c90f5dbc1b8664175209230d03a36f] # # patch "netcmd.cc" # from [7234a724163ec360b3d77a0a591ba51bf72c8b4c] # to [05d3d94130753605cc408a7369ee9396dd12bf13] # # patch "netcmd.hh" # from [104f746aaddbc57fbf388486c6247c8c48eb6cf8] # to [889ca3e00418abd466236fb58c89bc6e26c160a5] # # patch "netsync.cc" # from [f61f952ea019ec04bb76c5a520a327d2a95edfda] # to [b3e1fe14931d87e44a7542b52dba4445a895377e] # # patch "refiner.cc" # from [] # to [e584a3e6eeeed3c9b009364df62720b9c34ef07c] # # patch "refiner.hh" # from [] # to [1374f8d2f65822995c016ecff3bec86dc06bf1d5] # ============================================================ --- Makefile.am b4e54cf6debc0b615c8b1da211aeae2b65431696 +++ Makefile.am d0fcededc8e09cf6d532265e423723d1abbbf6bf @@ -23,6 +23,8 @@ ui.cc ui.hh \ schema_migration.cc schema_migration.hh \ constants.cc constants.hh \ + refiner.cc refiner.hh \ + enumerator.cc enumerator.hh \ netsync.cc netsync.hh \ netcmd.cc netcmd.hh \ merkle_tree.cc merkle_tree.hh \ @@ -41,18 +43,18 @@ selectors.cc selectors.hh \ annotate.cc annotate.hh \ restrictions.cc restrictions.hh \ - hmac.cc hmac.hh \ - globish.cc globish.hh \ - string_queue.cc string_queue.hh \ - paths.cc paths.hh \ - roster_merge.cc roster_merge.hh \ - merge.cc merge.hh \ - \ + hmac.cc hmac.hh \ + globish.cc globish.hh \ + string_queue.cc string_queue.hh \ + paths.cc paths.hh \ + roster_merge.cc roster_merge.hh \ + merge.cc merge.hh \ + \ cleanup.hh unit_tests.hh interner.hh \ cycle_detector.hh randomfile.hh adler32.hh quick_alloc.hh \ netio.hh smap.hh gettext.h \ package_revision.c package_revision.h \ - package_full_revision.c package_full_revision.h options.hh \ + package_full_revision.c package_full_revision.h options.hh \ i18n.h hash_map.hh parallel_iter.hh safe_map.hh NETXX_SOURCES = \ ============================================================ --- database.cc c8045a26672bb50696bcd9d12748f37399a52768 +++ database.cc c70899f5e2e9703b4adaf13e0d4e755a44c6c3fa @@ -1402,7 +1402,6 @@ database::get_revision_children(revision_id const & id, set & children) { - I(!null_id(id)); results res; children.clear(); fetch(res, one_col, any_rows, @@ -2065,6 +2064,22 @@ } void +database::get_revision_certs(revision_id const & ident, + vector< hexenc > & ts) +{ + results res; + vector certs; + fetch(res, one_col, any_rows, + "SELECT hash " + "FROM revision_certs " + "WHERE id = ?", + ident.inner()().c_str()); + ts.clear(); + for (size_t i = 0; i < res.size(); ++i) + ts.push_back(hexenc(res[i][0])); +} + +void database::get_revision_cert(hexenc const & hash, revision & c) { ============================================================ --- database.hh ec17217f4b392f21635c1bb15487d7b1c0146856 +++ database.hh 4ed94cac1003bf18349465230031fb2b9c2f67f6 @@ -360,8 +360,11 @@ void get_revision_certs(revision_id const & id, std::vector< revision > & certs); + void get_revision_certs(revision_id const & id, + std::vector< hexenc > & hashes); + void get_revision_cert(hexenc const & hash, - revision & cert); + revision & c); void get_manifest_certs(manifest_id const & id, std::vector< manifest > & certs); ============================================================ --- enumerator.cc +++ enumerator.cc 7a30cef784137174aa75ccdbe6a412b81e58cae6 @@ -0,0 +1,145 @@ +// copyright (C) 2005 graydon hoare +// all rights reserved. +// licensed to the public under the terms of the GNU GPL (>= 2) +// see the file COPYING for details + +#include +#include +#include +#include + +#include "cset.hh" +#include "enumerator.hh" +#include "revision.hh" +#include "vocab.hh" + +using std::deque; +using std::map; +using std::set; +using std::vector; + +revision_enumerator::revision_enumerator(enumerator_callbacks & cb, + app_state & app, + set const & initial, + set const & terminal) + : cb(cb), app(app), terminal_nodes(terminal) +{ + for (set::const_iterator i = initial.begin(); + i != initial.end(); ++i) + revs.push_back(*i); +} + +revision_enumerator::revision_enumerator(enumerator_callbacks & cb, + app_state & app) + : cb(cb), app(app) +{ + revision_id root; + set initial; + app.db.get_revision_children(root, initial); + for (set::const_iterator i = initial.begin(); + i != initial.end(); ++i) + revs.push_back(*i); +} + +void +revision_enumerator::step() +{ + // It's ok if this method simply does nothing. + + if (items.empty()) + { + if (!revs.empty()) + { + revision_id r = revs.front(); + revs.pop_front(); + set children; + app.db.get_revision_children(r, children); + for (set::const_iterator i = children.begin(); + i != children.end(); ++i) + revs.push_back(*i); + + terminal_nodes.erase(r); + + if (cb.process_this_rev(r)) + { + + revision_set rs; + app.db.get_revision(r, rs); + for (edge_map::const_iterator i = rs.edges.begin(); + i != rs.edges.end(); ++i) + { + cset const & cs = edge_changes(i); + + // Queue up all the file-adds + for (map::const_iterator fa = cs.files_added.begin(); + fa != cs.files_added.end(); ++fa) + { + enumerator_item item; + item.tag = enumerator_item::fdata; + item.ident_a = fa->second.inner(); + items.push_back(item); + } + + // Queue up all the file-deltas + for (map >::const_iterator fd + = cs.deltas_applied.begin(); + fd != cs.deltas_applied.end(); ++fd) + { + enumerator_item item; + item.tag = enumerator_item::fdelta; + item.ident_a = fd->second.first.inner(); + item.ident_b = fd->second.second.inner(); + items.push_back(item); + } + } + + // Queue up the rev itself + { + enumerator_item item; + item.tag = enumerator_item::rev; + item.ident_a = r.inner(); + items.push_back(item); + } + + // Queue up all the rev's certs + vector > hashes; + app.db.get_revision_certs(r, hashes); + for (vector >::const_iterator i = hashes.begin(); + i != hashes.end(); ++i) + { + enumerator_item item; + item.tag = enumerator_item::cert; + item.ident_a = *i; + items.push_back(item); + } + } + } + } + else + { + enumerator_item i = items.front(); + items.pop_front(); + I(!null_id(i.ident_a)); + + switch (i.tag) + { + case enumerator_item::fdata: + cb.note_file_data(file_id(i.ident_a)); + break; + + case enumerator_item::fdelta: + I(!null_id(i.ident_b)); + cb.note_file_delta(file_id(i.ident_a), + file_id(i.ident_b)); + break; + + case enumerator_item::rev: + cb.note_rev(revision_id(i.ident_a)); + break; + + case enumerator_item::cert: + cb.note_cert(i.ident_a); + break; + } + } +} ============================================================ --- enumerator.hh +++ enumerator.hh b7e467ace9dd121e55ceeb8ef787ba4c03c4b735 @@ -0,0 +1,61 @@ +#ifndef __ENUMERATOR_H__ +#define __ENUMERATOR_H__ + +// copyright (C) 2005 graydon hoare +// all rights reserved. +// licensed to the public under the terms of the GNU GPL (>= 2) +// see the file COPYING for details + +#include +#include + +#include "app_state.hh" +#include "vocab.hh" + +// The revision_enumerator struct acts as a cursor which emits files, +// deltas, revisions and certs in dependency-correct order. This is +// used for sending sections of the revision graph through netsync. + +struct +enumerator_callbacks +{ + // Your callback will be asked whether you want the details of each rev, + // in order; you should return true for any rev you want to be notified + // about the contents of. The rev's children will be traversed no matter + // what you return here. + virtual bool process_this_rev(revision_id const & rev) = 0; + + virtual void note_file_data(file_id const & f) = 0; + virtual void note_file_delta(file_id const & src, file_id const & dst) = 0; + virtual void note_rev(revision_id const & rev) = 0; + virtual void note_cert(hexenc const & c) = 0; + virtual ~enumerator_callbacks() {} +}; + +struct +enumerator_item +{ + enum { fdata, fdelta, rev, cert } tag; + hexenc ident_a; + hexenc ident_b; +}; + +struct +revision_enumerator +{ + enumerator_callbacks & cb; + app_state & app; + std::set terminal_nodes; + std::deque revs; + std::deque items; + + revision_enumerator(enumerator_callbacks & cb, + app_state & app, + std::set const & initial, + std::set const & terminal); + revision_enumerator(enumerator_callbacks & cb, + app_state & app); + void step(); +}; + +#endif // __ENUMERATOR_H__ ============================================================ --- merkle_tree.cc 728d4a69e905674600713ccd275a144604a92544 +++ merkle_tree.cc 9dac81fddfc596a6a6e55c205a69cd344eccba83 @@ -184,14 +184,11 @@ if (bitmap[2*n+1]) return subtree_state; else - return live_leaf_state; + return leaf_state; } else { - if (bitmap[2*n+1]) - return dead_leaf_state; - else - return empty_state; + return empty_state; } } @@ -203,9 +200,9 @@ I(2*n + 1 < bitmap.size()); bitmap.reset(2*n); bitmap.reset(2*n+1); - if (st == subtree_state || st == live_leaf_state) + if (st == subtree_state || st == leaf_state) bitmap.set(2*n); - if (st == subtree_state || st == dead_leaf_state) + if (st == subtree_state) bitmap.set(2*n+1); } @@ -380,20 +377,50 @@ return hash_merkle_node(*node); } +void +collect_items_in_subtree(merkle_table & tab, + prefix const & pref, + size_t level, + set & items) +{ + merkle_table::const_iterator i = tab.find(make_pair(pref, level)); + merkle_ptr node; + prefix ext; + id item; + if (i != tab.end()) + { + node = i->second; + for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot) + { + switch (node->get_slot_state(slot)) + { + case empty_state: + break; + case leaf_state: + node->get_raw_slot(slot, item); + items.insert(item); + break; + + case subtree_state: + node->extended_raw_prefix(slot, ext); + collect_items_in_subtree(tab, ext, level+1, items); + break; + } + } + } +} + + void insert_into_merkle_tree(merkle_table & tab, netcmd_item_type type, - bool live_p, id const & leaf, size_t level) { I(constants::merkle_hash_length_in_bytes == leaf().size()); I(constants::merkle_fanout_bits * (level + 1) <= constants::merkle_hash_length_in_bits); - - //hexenc hleaf; - //encode_hexenc(leaf, hleaf); size_t slotnum; dynamic_bitset pref; @@ -401,16 +428,8 @@ ostringstream oss; to_block_range(pref, ostream_iterator(oss)); - // hexenc hpref; prefix rawpref(oss.str()); - // encode_hexenc(rawpref, hpref); - - // if (level == 0) - // L(F("-- beginning top level insert --\n")); - // L(F("inserting %s leaf %s into slot 0x%x at node with prefix %s, level %d\n") - // % (live_p ? "live" : "dead") % hleaf % slotnum % hpref % level); - merkle_table::const_iterator i = tab.find(make_pair(rawpref, level)); merkle_ptr node; @@ -420,38 +439,20 @@ slot_state st = node->get_slot_state(slotnum); switch (st) { - case live_leaf_state: - case dead_leaf_state: + case leaf_state: { id slotval; node->get_raw_slot(slotnum, slotval); if (slotval == leaf) { - // L(F("found existing entry for %s at slot 0x%x of node %s, level %d\n") - // % hleaf % slotnum % hpref % level); - if (st == dead_leaf_state && live_p) - { - // L(F("changing setting from dead to live, for %s at slot 0x%x of node %s, level %d\n") - // % hleaf % slotnum % hpref % level); - node->set_slot_state(slotnum, live_leaf_state); - } - else if (st == live_leaf_state && !live_p) - { - // L(F("changing setting from live to dead, for %s at slot 0x%x of node %s, level %d\n") - // % hleaf % slotnum % hpref % level); - node->set_slot_state(slotnum, dead_leaf_state); - } + // Do nothing, it's already present } else { hexenc existing_hleaf; encode_hexenc(slotval, existing_hleaf); - // L(F("pushing existing leaf %s in slot 0x%x of node %s, level %d into subtree\n") - // % existing_hleaf % slotnum % hpref % level); - insert_into_merkle_tree(tab, type, (st == live_leaf_state ? true : false), slotval, level+1); - insert_into_merkle_tree(tab, type, live_p, leaf, level+1); - // L(F("changing setting to subtree, empty hash at slot 0x%x of node %s, level %d\n") - // % slotnum % hpref % level); + insert_into_merkle_tree(tab, type, slotval, level+1); + insert_into_merkle_tree(tab, type, leaf, level+1); id empty_subtree_hash; node->set_raw_slot(slotnum, empty_subtree_hash); node->set_slot_state(slotnum, subtree_state); @@ -460,21 +461,15 @@ break; case empty_state: - // L(F("placing leaf %s in previously empty slot 0x%x of node %s, level %d\n") - // % hleaf % slotnum % hpref % level); node->total_num_leaves++; - node->set_slot_state(slotnum, (live_p ? live_leaf_state : dead_leaf_state)); + node->set_slot_state(slotnum, leaf_state); node->set_raw_slot(slotnum, leaf); break; case subtree_state: { - // L(F("taking %s to subtree in slot 0x%x of node %s, level %d\n") - // % hleaf % slotnum % hpref % level); - insert_into_merkle_tree(tab, type, live_p, leaf, level+1); + insert_into_merkle_tree(tab, type, leaf, level+1); id empty_subtree_hash; - // L(F("updating subtree, setting to empty hash at slot 0x%x of node %s, level %d\n") - // % slotnum % hpref % level); node->set_raw_slot(slotnum, empty_subtree_hash); node->set_slot_state(slotnum, subtree_state); } @@ -483,18 +478,13 @@ } else { - // L(F("creating new node with prefix %s, level %d, holding %s at slot 0x%x\n") - // % hpref % level % hleaf % slotnum); node = merkle_ptr(new merkle_node()); node->type = type; node->level = level; node->pref = pref; node->total_num_leaves = 1; - node->set_slot_state(slotnum, (live_p ? live_leaf_state : dead_leaf_state)); + node->set_slot_state(slotnum, leaf_state); node->set_raw_slot(slotnum, leaf); tab.insert(std::make_pair(std::make_pair(rawpref, level), node)); } - - // if (level == 0) - // L(F("-- finished top level insert --\n")); } ============================================================ --- merkle_tree.hh 16ba9bbc41f85101dcf153805677cea19ccb9526 +++ merkle_tree.hh cf513c05a9c90f5dbc1b8664175209230d03a36f @@ -1,6 +1,6 @@ #ifndef __MERKLE_TREE_HH__ #define __MERKLE_TREE_HH__ -// copyright (C) 2004 graydon hoare +// copyright (C) 2004, 2005 graydon hoare // all rights reserved. // licensed to the public under the terms of the GNU GPL (>= 2) // see the file COPYING for details @@ -15,22 +15,22 @@ #include "vocab.hh" #include "transforms.hh" -// this file contains data structures and functions for managing merkle -// trees. a merkle tree is a general construction whereby a range of K data -// elements is divided up into buckets, the buckets are stored on disk, -// then hashed, and the hash values of the buckets are used as data -// elements for another iteration of the process. this terminates when you -// only have 1 bucket left. +// This file contains data structures and functions for managing merkle +// trees. A merkle tree is, conceptually, a general recursive construction +// whereby a range of K data elements is divided up into buckets. Each +// bucket is then hashed, and the hash values of the buckets at level N of +// the tree are used as data elements held in buckets at level N-1. At +// level 0 there is only one bucket. // -// the result is a tree in which each node has N "slots", each of which +// The result is a tree in which each node has J "slots", each of which // summarizes (as a hashcode) the entire subtree beneath it. this makes a // pair of merkle trees amenable to setwise operations such as union or -// difference while only inspecting D*log(K) nodes where D is the number of -// differences between trees. +// difference while only inspecting D*log_base_J(K) nodes where D is the +// number of differences between trees. // -// we build merkle trees over a few collections of objects in our database -// and use these to synchronize with remote hosts. see netsync.{cc,hh} for -// more details. +// We build merkle trees over a few collections of objects in our database +// and use these to synchronize with remote hosts. See netsync.{cc,hh} and +// refiner.{cc,hh} for more details. typedef enum { @@ -47,8 +47,7 @@ typedef enum { empty_state, - live_leaf_state, - dead_leaf_state, + leaf_state, subtree_state } slot_state; @@ -93,31 +92,40 @@ std::string raw_sha1(std::string const & in); -void pick_slot_and_prefix_for_value(id const & val, size_t level, - size_t & slotnum, boost::dynamic_bitset & pref); +void +pick_slot_and_prefix_for_value(id const & val, + size_t level, + size_t & slotnum, + boost::dynamic_bitset & pref); -// inserts an item into a tree +// Collect the items inside a subtree. +void +collect_items_in_subtree(merkle_table & tab, + prefix const & pref, + size_t level, + std::set & items); + +// Insert an item into a tree. + void insert_into_merkle_tree(merkle_table & tab, netcmd_item_type type, - bool live_p, id const & leaf, size_t level); inline void insert_into_merkle_tree(merkle_table & tab, netcmd_item_type type, - bool live_p, hexenc const & hex_leaf, size_t level) { id leaf; decode_hexenc(hex_leaf, leaf); - insert_into_merkle_tree(tab, type, live_p, leaf, level); + insert_into_merkle_tree(tab, type, leaf, level); } -// recalculates the hashes in the given tree. must be called after +// Recalculate the hashes in the given tree. Must be called after // insert_into_merkle_tree, and before using tree (but you can batch up // multiple calls to insert_into_merkle_tree and then only call this once). ============================================================ --- netcmd.cc 7234a724163ec360b3d77a0a591ba51bf72c8b4c +++ netcmd.cc 05d3d94130753605cc408a7369ee9396dd12bf13 @@ -105,11 +105,8 @@ case static_cast(confirm_cmd): case static_cast(refine_cmd): case static_cast(done_cmd): - case static_cast(send_data_cmd): - case static_cast(send_delta_cmd): case static_cast(data_cmd): case static_cast(delta_cmd): - case static_cast(nonexistant_cmd): case static_cast(usher_cmd): cmd_code = static_cast(cmd_byte); break; @@ -351,75 +348,76 @@ } void -netcmd::read_done_cmd(size_t & level, netcmd_item_type & type) const +netcmd::read_note_item_cmd(netcmd_item_type & type, id & item) const { size_t pos = 0; - // syntax is: - level = extract_datum_uleb128(payload, pos, - "done netcmd, level number"); - type = read_netcmd_item_type(payload, pos, "done netcmd, item type"); - assert_end_of_buffer(payload, pos, "done netcmd payload"); + // syntax is: + type = read_netcmd_item_type(payload, pos, "note_item netcmd, item type"); + item = id(extract_substring(payload, pos, + constants::merkle_hash_length_in_bytes, + "note_item netcmd, item identifier")); + assert_end_of_buffer(payload, pos, "note_item netcmd payload"); } void -netcmd::write_done_cmd(size_t level, - netcmd_item_type type) +netcmd::write_note_item_cmd(netcmd_item_type type, id const & item) { - cmd_code = done_cmd; + I(item().size() == constants::merkle_hash_length_in_bytes); + cmd_code = note_item_cmd; payload.clear(); - insert_datum_uleb128(level, payload); - payload += static_cast(type); + payload = static_cast(type); + payload += item(); } void -netcmd::read_send_data_cmd(netcmd_item_type & type, id & item) const +netcmd::read_note_shared_subtree_cmd(netcmd_item_type & type, + prefix & pref, + size_t & level) const { - size_t pos = 0; - // syntax is: - type = read_netcmd_item_type(payload, pos, "send_data netcmd, item type"); - item = id(extract_substring(payload, pos, - constants::merkle_hash_length_in_bytes, - "send_data netcmd, item identifier")); - assert_end_of_buffer(payload, pos, "send_data netcmd payload"); + size_t pos = 0; + // syntax is: + type = read_netcmd_item_type(payload, pos, + "note_shared_subtree netcmd, item type"); + string tmp; + extract_variable_length_string(payload, tmp, pos, + "note_shared_subtree netcmd, tree prefix"); + pref = tmp; + level = extract_datum_uleb128(payload, pos, + "note_shared_subtree netcmd, level number"); + assert_end_of_buffer(payload, pos, "note_shared_subtree netcmd payload"); } void -netcmd::write_send_data_cmd(netcmd_item_type type, id const & item) +netcmd::write_note_shared_subtree_cmd(netcmd_item_type type, + prefix const & pref, + size_t level) { - cmd_code = send_data_cmd; - I(item().size() == constants::merkle_hash_length_in_bytes); - payload = static_cast(type); - payload += item(); + payload.clear(); + payload += static_cast(type); + insert_variable_length_string(pref(), payload); + insert_datum_uleb128(level, payload); } + void -netcmd::read_send_delta_cmd(netcmd_item_type & type, - id & base, - id & ident) const +netcmd::read_done_cmd(size_t & level, netcmd_item_type & type) const { size_t pos = 0; - // syntax is: - type = read_netcmd_item_type(payload, pos, "send_delta netcmd, item type"); - base = id(extract_substring(payload, pos, - constants::merkle_hash_length_in_bytes, - "send_delta netcmd, base item identifier")); - ident = id(extract_substring(payload, pos, - constants::merkle_hash_length_in_bytes, - "send_delta netcmd, ident item identifier")); - assert_end_of_buffer(payload, pos, "send_delta netcmd payload"); + // syntax is: + level = extract_datum_uleb128(payload, pos, + "done netcmd, level number"); + type = read_netcmd_item_type(payload, pos, "done netcmd, item type"); + assert_end_of_buffer(payload, pos, "done netcmd payload"); } void -netcmd::write_send_delta_cmd(netcmd_item_type type, - id const & base, - id const & ident) +netcmd::write_done_cmd(size_t level, + netcmd_item_type type) { - cmd_code = send_delta_cmd; - I(base().size() == constants::merkle_hash_length_in_bytes); - I(ident().size() == constants::merkle_hash_length_in_bytes); - payload = static_cast(type); - payload += base(); - payload += ident(); + cmd_code = done_cmd; + payload.clear(); + insert_datum_uleb128(level, payload); + payload += static_cast(type); } void @@ -535,29 +533,7 @@ insert_variable_length_string(tmp, payload); } - void -netcmd::read_nonexistant_cmd(netcmd_item_type & type, id & item) const -{ - size_t pos = 0; - // syntax is: - type = read_netcmd_item_type(payload, pos, "nonexistant netcmd, item type"); - item = id(extract_substring(payload, pos, - constants::merkle_hash_length_in_bytes, - "nonexistant netcmd, item identifier")); - assert_end_of_buffer(payload, pos, "nonexistant netcmd payload"); -} - -void -netcmd::write_nonexistant_cmd(netcmd_item_type type, id const & item) -{ - cmd_code = nonexistant_cmd; - I(item().size() == constants::merkle_hash_length_in_bytes); - payload = static_cast(type); - payload += item(); -} - -void netcmd::read_usher_cmd(utf8 & greeting) const { size_t pos = 0; @@ -763,8 +739,7 @@ out_node.set_raw_slot(8, id(raw_sha1("He was arrested for auto theft"))); out_node.set_raw_slot(15, id(raw_sha1("He was whisked away to jail"))); out_node.set_slot_state(0, subtree_state); - out_node.set_slot_state(3, live_leaf_state); - out_node.set_slot_state(8, dead_leaf_state); + out_node.set_slot_state(3, leaf_state); out_node.set_slot_state(15, subtree_state); out_cmd.write_refine_cmd(out_node); @@ -774,54 +749,55 @@ L(boost::format("refine_cmd test done, buffer was %d bytes\n") % buf.size()); } - // done_cmd - { - L(boost::format("checking i/o round trip on done_cmd\n")); + // note_item_cmd + {s + L(boost::format("checking i/o round trip on note_item_cmd\n")); netcmd out_cmd, in_cmd; - size_t out_level(12), in_level; - netcmd_item_type out_type(key_item), in_type(revision_item); string buf; + netcmd_item_type out_ty = revision_item, in_ty; + id out_id(raw_sha1("gone fishin'")), in_id; - out_cmd.write_done_cmd(out_level, out_type); + out_cmd.write_note_item_cmd(out_ty, out_id); do_netcmd_roundtrip(out_cmd, in_cmd, buf); - in_cmd.read_done_cmd(in_level, in_type); - BOOST_CHECK(in_level == out_level); - BOOST_CHECK(in_type == out_type); - L(boost::format("done_cmd test done, buffer was %d bytes\n") % buf.size()); + in_cmd.read_note_item_cmd(in_ty, in_id); + BOOST_CHECK(in_ty == out_ty); + BOOST_CHECK(in_id == out_id); + L(boost::format("note_item_cmd test done, buffer was %d bytes\n") % buf.size()); } - // send_data_cmd + // note_shared_subtree_cmd { - L(boost::format("checking i/o round trip on send_data_cmd\n")); + L(boost::format("checking i/o round trip on note_item_cmd\n")); netcmd out_cmd, in_cmd; - netcmd_item_type out_type(file_item), in_type(key_item); - id out_id(raw_sha1("avocado is the yummiest")), in_id; string buf; + netcmd_item_type out_ty = revision_item, in_ty; + prefix out_pref("f00f"), in_pref; + size_t out_lev=4, in_lev=0; - out_cmd.write_send_data_cmd(out_type, out_id); + out_cmd.write_note_shared_subtree_cmd(out_ty, out_pref, out_lev); do_netcmd_roundtrip(out_cmd, in_cmd, buf); - in_cmd.read_send_data_cmd(in_type, in_id); - BOOST_CHECK(in_type == out_type); - BOOST_CHECK(in_id == out_id); - L(boost::format("send_data_cmd test done, buffer was %d bytes\n") % buf.size()); + in_cmd.read_note_shared_subtree_cmd(in_ty, in_pref, in_lev); + BOOST_CHECK(in_ty == out_ty); + BOOST_CHECK(in_pref == out_pref); + BOOST_CHECK(in_lev == out_lev); + L(boost::format("note_shared_subtre_cmd test done, buffer was %d bytes\n") + % buf.size()); } - // send_delta_cmd + // done_cmd { - L(boost::format("checking i/o round trip on send_delta_cmd\n")); + L(boost::format("checking i/o round trip on done_cmd\n")); netcmd out_cmd, in_cmd; - netcmd_item_type out_type(file_item), in_type(key_item); - id out_head(raw_sha1("when you board an airplane")), in_head; - id out_base(raw_sha1("always check the exit locations")), in_base; + size_t out_level(12), in_level; + netcmd_item_type out_type(key_item), in_type(revision_item); string buf; - out_cmd.write_send_delta_cmd(out_type, out_head, out_base); + out_cmd.write_done_cmd(out_level, out_type); do_netcmd_roundtrip(out_cmd, in_cmd, buf); - in_cmd.read_send_delta_cmd(in_type, in_head, in_base); + in_cmd.read_done_cmd(in_level, in_type); + BOOST_CHECK(in_level == out_level); BOOST_CHECK(in_type == out_type); - BOOST_CHECK(in_head == out_head); - BOOST_CHECK(in_base == out_base); - L(boost::format("send_delta_cmd test done, buffer was %d bytes\n") % buf.size()); + L(boost::format("done_cmd test done, buffer was %d bytes\n") % buf.size()); } // data_cmd @@ -860,22 +836,6 @@ L(boost::format("delta_cmd test done, buffer was %d bytes\n") % buf.size()); } - // nonexistant_cmd - { - L(boost::format("checking i/o round trip on nonexistant_cmd\n")); - netcmd out_cmd, in_cmd; - netcmd_item_type out_type(file_item), in_type(key_item); - id out_id(raw_sha1("avocado is the yummiest")), in_id; - string buf; - - out_cmd.write_nonexistant_cmd(out_type, out_id); - do_netcmd_roundtrip(out_cmd, in_cmd, buf); - in_cmd.read_nonexistant_cmd(in_type, in_id); - BOOST_CHECK(in_type == out_type); - BOOST_CHECK(in_id == out_id); - L(boost::format("nonexistant_cmd test done, buffer was %d bytes\n") % buf.size()); - } - } catch (bad_decode & d) { ============================================================ --- netcmd.hh 104f746aaddbc57fbf388486c6247c8c48eb6cf8 +++ netcmd.hh 889ca3e00418abd466236fb58c89bc6e26c160a5 @@ -37,14 +37,13 @@ // refinement commands refine_cmd = 6, - done_cmd = 7, + note_item_cmd = 7, + note_shared_subtree_cmd = 8, + done_cmd = 9, // transmission commands - send_data_cmd = 8, - send_delta_cmd = 9, data_cmd = 10, delta_cmd = 11, - nonexistant_cmd = 12, // usher commands // usher_cmd is sent by a server farm (or anyone else who wants to serve @@ -133,20 +132,18 @@ void read_refine_cmd(merkle_node & node) const; void write_refine_cmd(merkle_node const & node); - void read_done_cmd(size_t & level, netcmd_item_type & type) const; - void write_done_cmd(size_t level, netcmd_item_type type); + void read_note_item_cmd(netcmd_item_type & type, id & item) const; + void write_note_item_cmd(netcmd_item_type type, id const & item); - void read_send_data_cmd(netcmd_item_type & type, - id & item) const; - void write_send_data_cmd(netcmd_item_type type, - id const & item); + void read_note_shared_subtree_cmd(netcmd_item_type & type, + prefix & pref, + size_t & level) const; + void write_note_shared_subtree_cmd(netcmd_item_type type, + prefix const & pref, + size_t level); - void read_send_delta_cmd(netcmd_item_type & type, - id & base, - id & ident) const; - void write_send_delta_cmd(netcmd_item_type type, - id const & base, - id const & ident); + void read_done_cmd(size_t & level, netcmd_item_type & type) const; + void write_done_cmd(size_t level, netcmd_item_type type); void read_data_cmd(netcmd_item_type & type, id & item, @@ -162,11 +159,6 @@ id const & base, id const & ident, delta const & del); - void read_nonexistant_cmd(netcmd_item_type & type, - id & item) const; - void write_nonexistant_cmd(netcmd_item_type type, - id const & item); - void read_usher_cmd(utf8 & greeting) const; void write_usher_reply_cmd(utf8 const & server, utf8 const & pattern); ============================================================ --- netsync.cc f61f952ea019ec04bb76c5a520a327d2a95edfda +++ netsync.cc b3e1fe14931d87e44a7542b52dba4445a895377e @@ -21,6 +21,7 @@ #include "app_state.hh" #include "cert.hh" #include "constants.hh" +#include "enumerator.hh" #include "keys.hh" #include "merkle_tree.hh" #include "netcmd.hh" @@ -28,6 +29,7 @@ #include "netsync.hh" #include "numeric_vocab.hh" #include "packet.hh" +#include "refiner.hh" #include "sanity.hh" #include "transforms.hh" #include "ui.hh" @@ -211,10 +213,6 @@ // commands for a given merkle tree arrive with no interveining refinements, // the entire merkle tree is considered complete. // -// any "send_data" command received prompts a "data" command in response, -// if the requested item exists. if an item does not exist, a "nonexistant" -// response command is sent. -// // once a response is received for each requested key and revision cert // (either data or nonexistant) the requesting party walks the graph of // received revision certs and transmits send_data or send_delta commands @@ -243,17 +241,6 @@ throw bad_decode(F("check of '%s' failed") % context); } -struct -done_marker -{ - bool current_level_had_refinements; - bool tree_is_done; - done_marker() : - current_level_had_refinements(false), - tree_is_done(false) - {} -}; - struct netsync_error { string msg; @@ -261,7 +248,9 @@ }; struct -session +session: + public refiner_callbacks, + public enumerator_callbacks { protocol_role role; protocol_voice const voice; @@ -306,23 +295,30 @@ vector written_keys; vector written_certs; - map > merkle_tables; - - map done_refinements; - map > > requested_items; - map > > received_items; - map > > full_delta_items; - map > > ancestry; - map > > received_certs; - set< pair > reverse_delta_requests; - bool analyzed_ancestry; - id saved_nonce; bool received_goodbye; bool sent_goodbye; packet_db_valve dbw; + bool encountered_error; + + // Interface to refinement. + refiner epoch_refiner; + refiner key_refiner; + refiner cert_refiner; + refiner rev_refiner; + + // Interface to ancestry grovelling. + revision_enumerator rev_enumerator; + + // enumerator_callbacks methods. + bool process_this_rev(revision_id const & rev); + void note_file_data(file_id const & f); + void note_file_delta(file_id const & src, file_id const & dst); + void note_rev(revision_id const & rev); + void note_cert(hexenc const & c); + session(protocol_role role, protocol_voice voice, utf8 const & our_include_pattern, @@ -345,35 +341,23 @@ void set_session_key(rsa_oaep_sha_data const & key_encrypted); void setup_client_tickers(); - bool done_all_refinements(); - bool cert_refinement_done(); - bool all_requested_revisions_received(); + bool got_all_data(); + void maybe_say_goodbye(); - void note_item_requested(netcmd_item_type ty, id const & i); - void note_item_full_delta(netcmd_item_type ty, id const & ident); - bool item_already_requested(netcmd_item_type ty, id const & i); void note_item_arrived(netcmd_item_type ty, id const & i); - void maybe_note_epochs_finished(); - void note_item_sent(netcmd_item_type ty, id const & i); - bool got_all_data(); - void maybe_say_goodbye(); - - void get_heads_and_consume_certs(set & heads); - - void analyze_ancestry_graph(); - Netxx::Probe::ready_type which_events() const; bool read_some(); bool write_some(); - bool encountered_error; void error(string const & errmsg); void write_netcmd_and_try_flush(netcmd const & cmd); + + // Outgoing queue-writers. void queue_bye_cmd(); void queue_error_cmd(string const & errmsg); void queue_done_cmd(size_t level, netcmd_item_type type); @@ -395,11 +379,10 @@ base64 server_key_encoded); void queue_confirm_cmd(); void queue_refine_cmd(merkle_node const & node); - void queue_send_data_cmd(netcmd_item_type type, - id const & item); - void queue_send_delta_cmd(netcmd_item_type type, - id const & base, - id const & ident); + void queue_note_item_cmd(netcmd_item_type ty, id item); + void queue_note_shared_subtree_cmd(netcmd_item_type ty, + prefix const & pref, + size_t level); void queue_data_cmd(netcmd_item_type type, id const & item, string const & dat); @@ -407,12 +390,10 @@ id const & base, id const & ident, delta const & del); - void queue_nonexistant_cmd(netcmd_item_type type, - id const & item); + // Incoming dispatch-called methods. bool process_bye_cmd(); bool process_error_cmd(string const & errmsg); - bool process_done_cmd(size_t level, netcmd_item_type type); bool process_hello_cmd(rsa_keypair_id const & server_keyname, rsa_pub_key const & server_key, id const & nonce); @@ -426,13 +407,13 @@ id const & nonce1, string const & signature); bool process_confirm_cmd(string const & signature); - void respond_to_confirm_cmd(); bool process_refine_cmd(merkle_node const & node); - bool process_send_data_cmd(netcmd_item_type type, + bool process_done_cmd(size_t level, netcmd_item_type type); + bool process_note_item_cmd(netcmd_item_type ty, id const & item); - bool process_send_delta_cmd(netcmd_item_type type, - id const & base, - id const & ident); + bool process_note_shared_subtree_cmd(netcmd_item_type ty, + prefix const & pref, + size_t lev); bool process_data_cmd(netcmd_item_type type, id const & item, string const & dat); @@ -440,73 +421,21 @@ id const & base, id const & ident, delta const & del); - bool process_nonexistant_cmd(netcmd_item_type type, - id const & item); bool process_usher_cmd(utf8 const & msg); - bool merkle_node_exists(netcmd_item_type type, - size_t level, - prefix const & pref); + // The incoming dispatcher. + bool dispatch_payload(netcmd const & cmd); - void load_merkle_node(netcmd_item_type type, - size_t level, - prefix const & pref, - merkle_ptr & node); - + // Various helpers. + void respond_to_confirm_cmd(); void rebuild_merkle_trees(app_state & app, set const & branches); - bool dispatch_payload(netcmd const & cmd); + void send_all_data(netcmd_item_type ty, set const & items); void begin_service(); bool process(); }; -struct -ancestry_fetcher -{ - session & sess; - - // map children to parents - multimap< file_id, file_id > rev_file_deltas; - // map an ancestor to a child - multimap< file_id, file_id > fwd_file_deltas; - - set< file_id > seen_files; - - ancestry_fetcher(session & s); - // analysing the ancestry graph - void traverse_files(cset const & cs); - void traverse_ancestry(set const & heads); - - // requesting the data - void request_rev_file_deltas(file_id const & start, - set & done_files); - void request_files(); -}; - - -struct -root_prefix -{ - prefix val; - root_prefix() : val("") - {} -}; - -static root_prefix const & -get_root_prefix() -{ - // this is not a static variable for a bizarre reason: mac OSX runs - // static initializers in the "wrong" order (application before - // libraries), so the initializer for a static string in cryptopp runs - // after the initializer for a static variable outside a function - // here. therefore encode_hexenc() fails in the static initializer here - // and the program crashes. curious, eh? - static root_prefix ROOT_PREFIX; - return ROOT_PREFIX; -} - -static file_id null_ident; session::session(protocol_role role, protocol_voice voice, @@ -542,37 +471,23 @@ revision_in_ticker(NULL), revision_out_ticker(NULL), revision_checked_ticker(NULL), - analyzed_ancestry(false), saved_nonce(""), received_goodbye(false), sent_goodbye(false), dbw(app, true), - encountered_error(false) + encountered_error(false), + epoch_refiner(epoch_item, *this), + key_refiner(key_item, *this), + cert_refiner(cert_item, *this), + rev_refiner(revision_item, *this), + rev_enumerator(*this, app) { dbw.set_on_revision_written(boost::bind(&session::rev_written_callback, this, _1)); dbw.set_on_cert_written(boost::bind(&session::cert_written_callback, - this, _1)); + this, _1)); dbw.set_on_pubkey_written(boost::bind(&session::key_written_callback, - this, _1)); - - done_refinements.insert(make_pair(cert_item, done_marker())); - done_refinements.insert(make_pair(key_item, done_marker())); - done_refinements.insert(make_pair(epoch_item, done_marker())); - - requested_items.insert(make_pair(cert_item, boost::shared_ptr< set >(new set()))); - requested_items.insert(make_pair(key_item, boost::shared_ptr< set >(new set()))); - requested_items.insert(make_pair(revision_item, boost::shared_ptr< set >(new set()))); - requested_items.insert(make_pair(file_item, boost::shared_ptr< set >(new set()))); - requested_items.insert(make_pair(epoch_item, boost::shared_ptr< set >(new set()))); - - received_items.insert(make_pair(cert_item, boost::shared_ptr< set >(new set()))); - received_items.insert(make_pair(key_item, boost::shared_ptr< set >(new set()))); - received_items.insert(make_pair(revision_item, boost::shared_ptr< set >(new set()))); - received_items.insert(make_pair(file_item, boost::shared_ptr< set >(new set()))); - received_items.insert(make_pair(epoch_item, boost::shared_ptr< set >(new set()))); - - full_delta_items.insert(make_pair(file_item, boost::shared_ptr< set >(new set()))); + this, _1)); } session::~session() @@ -599,6 +514,7 @@ { app.lua.hook_note_netsync_pubkey_received(*i); } + //Revisions for (vector::iterator i = written_revisions.begin(); i != written_revisions.end(); ++i) @@ -616,6 +532,7 @@ app.db.get_revision(*i, rdat); app.lua.hook_note_netsync_revision_received(*i, rdat, certs); } + //Certs (not attached to a new revision) for (vector::iterator i = unattached_certs.begin(); i != unattached_certs.end(); ++i) @@ -628,6 +545,64 @@ } } +bool +session::process_this_rev(revision_id const & rev) +{ + id item; + decode_hexenc(rev.inner(), item); + return (rev_refiner.items_to_send.find(item) + != rev_refiner.items_to_send.end()); +} + +void +session::note_file_data(file_id const & f) +{ + file_data fd; + id item; + decode_hexenc(f.inner(), item); + app.db.get_file_version(f, fd); + queue_data_cmd(file_item, item, fd.inner()()); +} + +void +session::note_file_delta(file_id const & src, file_id const & dst) +{ + file_data fd1, fd2; + delta del; + id fid1, fid2; + decode_hexenc(src.inner(), fid1); + decode_hexenc(dst.inner(), fid2); + app.db.get_file_version(src, fd1); + app.db.get_file_version(dst, fd2); + diff(fd1.inner(), fd2.inner(), del); + queue_delta_cmd(file_item, fid1, fid2, del); +} + +void +session::note_rev(revision_id const & rev) +{ + revision_set rs; + id item; + decode_hexenc(rev.inner(), item); + app.db.get_revision(rev, rs); + data tmp; + write_revision_set(rs, tmp); + queue_data_cmd(revision_item, item, tmp()); +} + +void +session::note_cert(hexenc const & c) +{ + id item; + decode_hexenc(c, item); + revision cert; + string str; + app.db.get_revision_cert(c, cert); + write_cert(cert.inner(), str); + queue_data_cmd(cert_item, item, str); +} + + void session::rev_written_callback(revision_id rid) { if (revision_checked_ticker.get()) @@ -720,122 +695,67 @@ bool session::done_all_refinements() { - bool all = true; - for (map::const_iterator j = - done_refinements.begin(); j != done_refinements.end(); ++j) - { - if (j->second.tree_is_done == false) - all = false; - } - return all; + return rev_refiner.done() + && cert_refiner.done() + && key_refiner.done() + && epoch_refiner.done(); } -bool -session::cert_refinement_done() -{ - return done_refinements[cert_item].tree_is_done; -} bool session::got_all_data() { - for (map > >::const_iterator i = - requested_items.begin(); i != requested_items.end(); ++i) - { - if (! i->second->empty()) - return false; - } - return true; + return rev_refiner.items_to_receive.empty() + && cert_refiner.items_to_receive.empty() + && key_refiner.items_to_receive.empty() + && epoch_refiner.items_to_receive.empty(); } -bool -session::all_requested_revisions_received() -{ - map > >::const_iterator - i = requested_items.find(revision_item); - I(i != requested_items.end()); - return i->second->empty(); -} void session::maybe_note_epochs_finished() { - map > >::const_iterator - i = requested_items.find(epoch_item); - I(i != requested_items.end()); // Maybe there are outstanding epoch requests. - if (!i->second->empty()) + if (!epoch_refiner.items_to_receive.empty()) return; + // And maybe we haven't even finished the refinement. - if (!done_refinements[epoch_item].tree_is_done) + if (!epoch_refiner.done()) return; + // But otherwise, we're ready to go! L(F("all epochs processed, opening database valve\n")); this->dbw.open_valve(); } void -session::note_item_requested(netcmd_item_type ty, id const & ident) -{ - map > >::const_iterator - i = requested_items.find(ty); - I(i != requested_items.end()); - i->second->insert(ident); -} - -void -session::note_item_full_delta(netcmd_item_type ty, id const & ident) -{ - map > >::const_iterator - i = full_delta_items.find(ty); - I(i != full_delta_items.end()); - i->second->insert(ident); -} - -void session::note_item_arrived(netcmd_item_type ty, id const & ident) { - map > >::const_iterator - i = requested_items.find(ty); - I(i != requested_items.end()); - i->second->erase(ident); - map > >::const_iterator - j = received_items.find(ty); - I(j != received_items.end()); - j->second->insert(ident); - - switch (ty) { case cert_item: + cert_refiner.items_to_receive.erase(ident); if (cert_in_ticker.get() != NULL) ++(*cert_in_ticker); break; case revision_item: + rev_refiner.items_to_receive.erase(ident); if (revision_in_ticker.get() != NULL) ++(*revision_in_ticker); break; + case key_item: + key_refiner.items_to_receive.erase(ident); + break; + case epoch_item: + epoch_refiner.items_to_receive.erase(ident); + break; default: // No ticker for other things. break; } } -bool -session::item_already_requested(netcmd_item_type ty, id const & ident) -{ - map > >::const_iterator i; - i = requested_items.find(ty); - I(i != requested_items.end()); - if (i->second->find(ident) != i->second->end()) - return true; - i = received_items.find(ty); - I(i != received_items.end()); - if (i->second->find(ident) != i->second->end()) - return true; - return false; -} void @@ -844,13 +764,21 @@ switch (ty) { case cert_item: + cert_refiner.items_to_send.erase(ident); if (cert_out_ticker.get() != NULL) ++(*cert_out_ticker); break; case revision_item: + rev_refiner.items_to_send.erase(ident); if (revision_out_ticker.get() != NULL) ++(*revision_out_ticker); break; + case key_item: + key_refiner.items_to_send.erase(ident); + break; + case epoch_item: + epoch_refiner.items_to_send.erase(ident); + break; default: // No ticker for other things. break; @@ -887,171 +815,6 @@ } -inline static id -plain_id(manifest_id const & i) -{ - id tmp; - hexenc htmp(i.inner()); - decode_hexenc(htmp, tmp); - return tmp; -} - -inline static id -plain_id(file_id const & i) -{ - id tmp; - hexenc htmp(i.inner()); - decode_hexenc(htmp, tmp); - return tmp; -} - -void -session::get_heads_and_consume_certs( set & heads ) -{ - typedef map > > ancestryT; - typedef map > cert_map; - - set nodes, parents; - map chld_num; - L(F("analyzing %d ancestry edges\n") % ancestry.size()); - - for (ancestryT::const_iterator i = ancestry.begin(); i != ancestry.end(); ++i) - { - nodes.insert(i->first); - for (edge_map::const_iterator j = i->second->second.edges.begin(); - j != i->second->second.edges.end(); ++j) - { - parents.insert(edge_old_revision(j)); - map::iterator n; - n = chld_num.find(edge_old_revision(j)); - if (n == chld_num.end()) - chld_num.insert(make_pair(edge_old_revision(j), 1)); - else - ++(n->second); - } - } - - set_difference(nodes.begin(), nodes.end(), - parents.begin(), parents.end(), - inserter(heads, heads.begin())); - - L(F("intermediate set_difference heads size %d") % heads.size()); - - // Write permissions checking: - // remove heads w/o proper certs, add their children to heads - // 1) remove unwanted branch certs from consideration - // 2) remove heads w/o a branch tag, process new exposed heads - // 3) repeat 2 until no change - - //1 - set ok_branches, bad_branches; - cert_name bcert_name(branch_cert_name); - cert_name tcert_name(tag_cert_name); - for (map::iterator i = received_certs.begin(); - i != received_certs.end(); ++i) - { - //branches - vector & bcerts(i->second[bcert_name]); - vector keeping; - for (vector::iterator j = bcerts.begin(); j != bcerts.end(); ++j) - { - cert_value name; - decode_base64(j->value, name); - if (ok_branches.find(name()) != ok_branches.end()) - keeping.push_back(*j); - else if (bad_branches.find(name()) != bad_branches.end()) - ; - else - { - if (our_matcher(name())) - { - ok_branches.insert(name()); - keeping.push_back(*j); - } - else - { - bad_branches.insert(name()); - W(F("Dropping branch certs for unwanted branch %s") - % name); - } - } - } - bcerts = keeping; - } - //2 - list tmp; - for (set::iterator i = heads.begin(); i != heads.end(); ++i) - { - if (!received_certs[*i][bcert_name].size()) - tmp.push_back(*i); - } - for (list::iterator i = tmp.begin(); i != tmp.end(); ++i) - heads.erase(*i); - - L(F("after step 2, heads size %d") % heads.size()); - //3 - while (tmp.size()) - { - ancestryT::const_iterator i = ancestry.find(tmp.front()); - if (i != ancestry.end()) - { - for (edge_map::const_iterator j = i->second->second.edges.begin(); - j != i->second->second.edges.end(); ++j) - { - if (!--chld_num[edge_old_revision(j)]) - { - if (received_certs[i->first][bcert_name].size()) - heads.insert(i->first); - else - tmp.push_back(edge_old_revision(j)); - } - } - // since we don't want this rev, we don't want it's certs either - received_certs[tmp.front()] = cert_map(); - } - tmp.pop_front(); - } - - L(F("after step 3, heads size %d") % heads.size()); - // We've reduced the certs to those we want now, send them to dbw. - for (map::iterator i = received_certs.begin(); - i != received_certs.end(); ++i) - { - for (cert_map::iterator j = i->second.begin(); - j != i->second.end(); ++j) - { - for (vector::iterator k = j->second.begin(); - k != j->second.end(); ++k) - { - this->dbw.consume_revision_cert(revision(*k)); - } - } - } -} - -void -session::analyze_ancestry_graph() -{ - L(F("analyze_ancestry_graph")); - if (! (all_requested_revisions_received() && cert_refinement_done())) - { - L(F("not all done in analyze_ancestry_graph")); - return; - } - - if (analyzed_ancestry) - { - L(F("already analyzed_ancestry in analyze_ancestry_graph")); - return; - } - - L(F("analyze_ancestry_graph fetching")); - - ancestry_fetcher fetch(*this); - - analyzed_ancestry = true; -} - Netxx::Probe::ready_type session::which_events() const { @@ -1237,70 +1000,30 @@ } void -session::queue_send_data_cmd(netcmd_item_type type, - id const & item) +session::queue_note_item_cmd(netcmd_item_type ty, id item) { string typestr; - netcmd_item_type_to_string(type, typestr); - hexenc hid; - encode_hexenc(item, hid); - - if (role == source_role) - { - L(F("not queueing request for %s '%s' as we are in pure source role\n") - % typestr % hid); - return; - } - - if (item_already_requested(type, item)) - { - L(F("not queueing request for %s '%s' as we already requested it\n") - % typestr % hid); - return; - } - - L(F("queueing request for data of %s item '%s'\n") - % typestr % hid); + hexenc hitem; + encode_hexenc(item, hitem); + netcmd_item_type_to_string(ty, typestr); + L(F("queueing note about %s item '%s'") % typestr % hitem); netcmd cmd; - cmd.write_send_data_cmd(type, item); - write_netcmd_and_try_flush(cmd); - note_item_requested(type, item); + cmd.write_note_item_cmd(ty, item); + write_netcmd_and_try_flush(cmd); } - + void -session::queue_send_delta_cmd(netcmd_item_type type, - id const & base, - id const & ident) +session::queue_note_shared_subtree_cmd(netcmd_item_type ty, + prefix const & pref, + size_t level) { - I(type == file_item); - string typestr; - netcmd_item_type_to_string(type, typestr); - hexenc base_hid; - encode_hexenc(base, base_hid); - hexenc ident_hid; - encode_hexenc(ident, ident_hid); - - if (role == source_role) - { - L(F("not queueing request for %s delta '%s' -> '%s' as we are in pure source role\n") - % typestr % base_hid % ident_hid); - return; - } - - if (item_already_requested(type, ident)) - { - L(F("not queueing request for %s delta '%s' -> '%s' as we already requested the target\n") - % typestr % base_hid % ident_hid); - return; - } - - L(F("queueing request for contents of %s delta '%s' -> '%s'\n") - % typestr % base_hid % ident_hid); + netcmd_item_type_to_string(ty, typestr); + L(F("queueing note about shared %s subtree at level %d") + % typestr % level); netcmd cmd; - cmd.write_send_delta_cmd(type, base, ident); - write_netcmd_and_try_flush(cmd); - note_item_requested(type, ident); + cmd.write_note_shared_subtree_cmd(ty, pref, level); + write_netcmd_and_try_flush(cmd); } void @@ -1367,28 +1090,7 @@ note_item_sent(type, ident); } -void -session::queue_nonexistant_cmd(netcmd_item_type type, - id const & item) -{ - string typestr; - netcmd_item_type_to_string(type, typestr); - hexenc hid; - encode_hexenc(item, hid); - if (role == sink_role) - { - L(F("not queueing note of nonexistence of %s item '%s' as we are in pure sink role\n") - % typestr % hid); - return; - } - L(F("queueing note of nonexistance of %s item '%s'\n") - % typestr % hid); - netcmd cmd; - cmd.write_nonexistant_cmd(type, item); - write_netcmd_and_try_flush(cmd); -} - // processors bool @@ -1405,54 +1107,6 @@ throw bad_decode(F("received network error: %s") % errmsg); } -bool -session::process_done_cmd(size_t level, netcmd_item_type type) -{ - - map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(type); - I(i != done_refinements.end()); - - string typestr; - netcmd_item_type_to_string(type, typestr); - - if ((! i->second.current_level_had_refinements) || (level >= 0xff)) - { - // we received *no* refinements on this level -- or we ran out of - // levels -- so refinement for this type is finished. - L(F("received 'done' for empty %s level %d, marking as complete\n") - % typestr % static_cast(level)); - - // possibly echo it back one last time, for shutdown purposes - if (!i->second.tree_is_done) - queue_done_cmd(level + 1, type); - - // tombstone it - i->second.current_level_had_refinements = false; - i->second.tree_is_done = true; - - if (all_requested_revisions_received()) - analyze_ancestry_graph(); - - maybe_note_epochs_finished(); - } - - else if (i->second.current_level_had_refinements - && (! i->second.tree_is_done)) - { - // we *did* receive some refinements on this level, reset to zero and - // queue an echo of the 'done' marker. - L(F("received 'done' for %s level %d, which had refinements; " - "sending echo of done for level %d\n") - % typestr - % static_cast(level) - % static_cast(level + 1)); - i->second.current_level_had_refinements = false; - queue_done_cmd(level + 1, type); - return true; - } - return true; -} - void get_branches(app_state & app, vector & names) { @@ -1827,23 +1481,133 @@ return false; } -void -session::respond_to_confirm_cmd() +bool +session::process_refine_cmd(merkle_node const & node) { - merkle_ptr root; - load_merkle_node(epoch_item, 0, get_root_prefix().val, root); - queue_refine_cmd(*root); - queue_done_cmd(0, epoch_item); + switch (node.type) + { + case file_item: + W(F("Unexpected 'refine' command on non-refined item type\n")); + break; + + case key_item: + key_refiner.process_peer_node(node); + break; + + case revision_item: + rev_refiner.process_peer_node(node); + break; + + case cert_item: + cert_refiner.process_peer_node(node); + break; + + case epoch_item: + epoch_refiner.process_peer_node(node); + break; + } + return true; +} - load_merkle_node(key_item, 0, get_root_prefix().val, root); - queue_refine_cmd(*root); - queue_done_cmd(0, key_item); +bool +session::process_done_cmd(size_t level, netcmd_item_type type) +{ + switch (type) + { + case file_item: + W(F("Unexpected 'done' command on non-refined item type\n")); + break; + + case key_item: + key_refiner.process_done_command(level); + if (key_refiner.done()) + send_all_data(key_item, key_refiner.items_to_send); + break; + + case revision_item: + rev_refiner.process_done_command(level); + break; + + case cert_item: + cert_refiner.process_done_command(level); + break; + + case epoch_item: + epoch_refiner.process_done_command(level); + if (epoch_refiner.done()) + send_all_data(epoch_item, epoch_refiner.items_to_send); + break; + } + return true; +} - load_merkle_node(cert_item, 0, get_root_prefix().val, root); - queue_refine_cmd(*root); - queue_done_cmd(0, cert_item); +bool +session::process_note_item_cmd(netcmd_item_type ty, id const & item) +{ + switch (ty) + { + case file_item: + W(F("Unexpected 'note_item' command on non-refined item type\n")); + break; + + case key_item: + key_refiner.note_item_in_peer(item); + break; + + case revision_item: + rev_refiner.note_item_in_peer(item); + break; + + case cert_item: + cert_refiner.note_item_in_peer(item); + break; + + case epoch_item: + epoch_refiner.note_item_in_peer(item); + break; + } + return true; } +bool +session::process_note_shared_subtree_cmd(netcmd_item_type ty, + prefix const & pref, + size_t lev) +{ + switch (ty) + { + case file_item: + W(F("Unexpected 'note_item' command on non-refined item type\n")); + break; + + case key_item: + key_refiner.note_subtree_shared_with_peer(pref, lev); + break; + + case revision_item: + rev_refiner.note_subtree_shared_with_peer(pref, lev); + break; + + case cert_item: + cert_refiner.note_subtree_shared_with_peer(pref, lev); + break; + + case epoch_item: + epoch_refiner.note_subtree_shared_with_peer(pref, lev); + break; + } + return true; +} + +void +session::respond_to_confirm_cmd() +{ + epoch_refiner.begin_refinement(); + key_refiner.begin_refinement(); + cert_refiner.begin_refinement(); + rev_refiner.begin_refinement(); +} + static bool data_exists(netcmd_item_type type, id const & item, @@ -1952,475 +1716,7 @@ } } - bool -session::process_refine_cmd(merkle_node const & their_node) -{ - prefix pref; - hexenc hpref; - their_node.get_raw_prefix(pref); - their_node.get_hex_prefix(hpref); - string typestr; - - netcmd_item_type_to_string(their_node.type, typestr); - size_t lev = static_cast(their_node.level); - - L(F("received 'refine' netcmd on %s node '%s', level %d\n") - % typestr % hpref % lev); - - if (!merkle_node_exists(their_node.type, their_node.level, pref)) - { - L(F("no corresponding %s merkle node for prefix '%s', level %d\n") - % typestr % hpref % lev); - - for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot) - { - switch (their_node.get_slot_state(slot)) - { - case empty_state: - { - // we agree, this slot is empty - L(boost::format("(#0) they have an empty slot %d (in a %s node '%s', level %d, we do not have)\n") - % slot % typestr % hpref % lev); - continue; - } - break; - case live_leaf_state: - { - // we want what *they* have - id slotval; - hexenc hslotval; - their_node.get_raw_slot(slot, slotval); - their_node.get_hex_slot(slot, hslotval); - L(boost::format("(#0) they have a live leaf at slot %d (in a %s node '%s', level %d, we do not have)\n") - % slot % typestr % hpref % lev); - L(boost::format("(#0) requesting their %s leaf %s\n") % typestr % hslotval); - queue_send_data_cmd(their_node.type, slotval); - } - break; - case dead_leaf_state: - { - // we cannot ask for what they have, it is dead - L(boost::format("(#0) they have a dead leaf at slot %d (in a %s node '%s', level %d, we do not have)\n") - % slot % typestr % hpref % lev); - continue; - } - break; - case subtree_state: - { - // they have a subtree; might as well ask for that - L(boost::format("(#0) they have a subtree at slot %d (in a %s node '%s', level %d, we do not have)\n") - % slot % typestr % hpref % lev); - merkle_node our_fake_subtree; - their_node.extended_prefix(slot, our_fake_subtree.pref); - our_fake_subtree.level = their_node.level + 1; - our_fake_subtree.type = their_node.type; - queue_refine_cmd(our_fake_subtree); - } - break; - } - } - } - else - { - // we have a corresponding merkle node. there are 16 branches - // to the following switch condition. it is awful. sorry. - L(F("found corresponding %s merkle node for prefix '%s', level %d\n") - % typestr % hpref % lev); - merkle_ptr our_node; - load_merkle_node(their_node.type, their_node.level, pref, our_node); - for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot) - { - switch (their_node.get_slot_state(slot)) - { - case empty_state: - switch (our_node->get_slot_state(slot)) - { - - case empty_state: - // 1: theirs == empty, ours == empty - L(boost::format("(#1) they have an empty slot %d in %s node '%s', level %d, and so do we\n") - % slot % typestr % hpref % lev); - continue; - break; - - case live_leaf_state: - // 2: theirs == empty, ours == live - L(boost::format("(#2) they have an empty slot %d in %s node '%s', level %d, we have a live leaf\n") - % slot % typestr % hpref % lev); - { - I(their_node.type == our_node->type); - string tmp; - id slotval; - our_node->get_raw_slot(slot, slotval); - load_data(their_node.type, slotval, this->app, tmp); - queue_data_cmd(their_node.type, slotval, tmp); - } - break; - - case dead_leaf_state: - // 3: theirs == empty, ours == dead - L(boost::format("(#3) they have an empty slot %d in %s node '%s', level %d, we have a dead leaf\n") - % slot % typestr % hpref % lev); - continue; - break; - - case subtree_state: - // 4: theirs == empty, ours == subtree - L(boost::format("(#4) they have an empty slot %d in %s node '%s', level %d, we have a subtree\n") - % slot % typestr % hpref % lev); - { - prefix subprefix; - our_node->extended_raw_prefix(slot, subprefix); - merkle_ptr our_subtree; - I(our_node->type == their_node.type); - load_merkle_node(their_node.type, our_node->level + 1, - subprefix, our_subtree); - I(our_node->type == our_subtree->type); - // FIXME: it would be more efficient here, to instead of - // sending our subtree, just send the data for everything - // in the subtree. - queue_refine_cmd(*our_subtree); - } - break; - - } - break; - - - case live_leaf_state: - switch (our_node->get_slot_state(slot)) - { - - case empty_state: - // 5: theirs == live, ours == empty - L(boost::format("(#5) they have a live leaf at slot %d in %s node '%s', level %d, we have nothing\n") - % slot % typestr % hpref % lev); - { - id slotval; - their_node.get_raw_slot(slot, slotval); - queue_send_data_cmd(their_node.type, slotval); - } - break; - - case live_leaf_state: - // 6: theirs == live, ours == live - L(boost::format("(#6) they have a live leaf at slot %d in %s node '%s', and so do we\n") - % slot % typestr % hpref); - { - id our_slotval, their_slotval; - their_node.get_raw_slot(slot, their_slotval); - our_node->get_raw_slot(slot, our_slotval); - if (their_slotval == our_slotval) - { - hexenc hslotval; - their_node.get_hex_slot(slot, hslotval); - L(boost::format("(#6) we both have live %s leaf '%s'\n") % typestr % hslotval); - continue; - } - else - { - I(their_node.type == our_node->type); - string tmp; - load_data(our_node->type, our_slotval, this->app, tmp); - queue_send_data_cmd(their_node.type, their_slotval); - queue_data_cmd(our_node->type, our_slotval, tmp); - } - } - break; - - case dead_leaf_state: - // 7: theirs == live, ours == dead - L(boost::format("(#7) they have a live leaf at slot %d in %s node %s, level %d, we have a dead one\n") - % slot % typestr % hpref % lev); - { - id our_slotval, their_slotval; - our_node->get_raw_slot(slot, our_slotval); - their_node.get_raw_slot(slot, their_slotval); - if (their_slotval == our_slotval) - { - hexenc hslotval; - their_node.get_hex_slot(slot, hslotval); - L(boost::format("(#7) it's the same %s leaf '%s', but ours is dead\n") - % typestr % hslotval); - continue; - } - else - { - queue_send_data_cmd(their_node.type, their_slotval); - } - } - break; - - case subtree_state: - // 8: theirs == live, ours == subtree - L(boost::format("(#8) they have a live leaf in slot %d of %s node '%s', level %d, we have a subtree\n") - % slot % typestr % hpref % lev); - { - - id their_slotval; - hexenc their_hval; - their_node.get_raw_slot(slot, their_slotval); - encode_hexenc(their_slotval, their_hval); - if (data_exists(their_node.type, their_slotval, app)) - L(boost::format("(#8) we have a copy of their live leaf '%s' in slot %d of %s node '%s', level %d\n") - % their_hval % slot % typestr % hpref % lev); - else - { - L(boost::format("(#8) requesting a copy of their live leaf '%s' in slot %d of %s node '%s', level %d\n") - % their_hval % slot % typestr % hpref % lev); - queue_send_data_cmd(their_node.type, their_slotval); - } - - L(boost::format("(#8) sending our subtree for refinement, in slot %d of %s node '%s', level %d\n") - % slot % typestr % hpref % lev); - prefix subprefix; - our_node->extended_raw_prefix(slot, subprefix); - merkle_ptr our_subtree; - load_merkle_node(our_node->type, our_node->level + 1, - subprefix, our_subtree); - // FIXME: it would be more efficient here, to instead of - // sending our subtree, just send the data for everything - // in the subtree (except, possibly, the item they already - // have). - queue_refine_cmd(*our_subtree); - } - break; - } - break; - - - case dead_leaf_state: - switch (our_node->get_slot_state(slot)) - { - case empty_state: - // 9: theirs == dead, ours == empty - L(boost::format("(#9) they have a dead leaf at slot %d in %s node '%s', level %d, we have nothing\n") - % slot % typestr % hpref % lev); - continue; - break; - - case live_leaf_state: - // 10: theirs == dead, ours == live - L(boost::format("(#10) they have a dead leaf at slot %d in %s node '%s', level %d, we have a live one\n") - % slot % typestr % hpref % lev); - { - id our_slotval, their_slotval; - their_node.get_raw_slot(slot, their_slotval); - our_node->get_raw_slot(slot, our_slotval); - hexenc hslotval; - our_node->get_hex_slot(slot, hslotval); - if (their_slotval == our_slotval) - { - L(boost::format("(#10) we both have %s leaf %s, theirs is dead\n") - % typestr % hslotval); - continue; - } - else - { - I(their_node.type == our_node->type); - string tmp; - load_data(our_node->type, our_slotval, this->app, tmp); - queue_data_cmd(our_node->type, our_slotval, tmp); - } - } - break; - - case dead_leaf_state: - // 11: theirs == dead, ours == dead - L(boost::format("(#11) they have a dead leaf at slot %d in %s node '%s', level %d, so do we\n") - % slot % typestr % hpref % lev); - continue; - break; - - case subtree_state: - // theirs == dead, ours == subtree - L(boost::format("(#12) they have a dead leaf in slot %d of %s node '%s', we have a subtree\n") - % slot % typestr % hpref % lev); - { - prefix subprefix; - our_node->extended_raw_prefix(slot, subprefix); - merkle_ptr our_subtree; - load_merkle_node(our_node->type, our_node->level + 1, - subprefix, our_subtree); - // FIXME: it would be more efficient here, to instead of - // sending our subtree, just send the data for everything - // in the subtree (except, possibly, the dead thing). - queue_refine_cmd(*our_subtree); - } - break; - } - break; - - - case subtree_state: - switch (our_node->get_slot_state(slot)) - { - case empty_state: - // 13: theirs == subtree, ours == empty - L(boost::format("(#13) they have a subtree at slot %d in %s node '%s', level %d, we have nothing\n") - % slot % typestr % hpref % lev); - { - merkle_node our_fake_subtree; - their_node.extended_prefix(slot, our_fake_subtree.pref); - our_fake_subtree.level = their_node.level + 1; - our_fake_subtree.type = their_node.type; - queue_refine_cmd(our_fake_subtree); - } - break; - - case live_leaf_state: - // 14: theirs == subtree, ours == live - L(boost::format("(#14) they have a subtree at slot %d in %s node '%s', level %d, we have a live leaf\n") - % slot % typestr % hpref % lev); - { - size_t subslot; - id our_slotval; - merkle_node our_fake_subtree; - our_node->get_raw_slot(slot, our_slotval); - hexenc hslotval; - encode_hexenc(our_slotval, hslotval); - - pick_slot_and_prefix_for_value(our_slotval, our_node->level + 1, subslot, - our_fake_subtree.pref); - L(boost::format("(#14) pushed our leaf '%s' into fake subtree slot %d, level %d\n") - % hslotval % subslot % (lev + 1)); - our_fake_subtree.type = their_node.type; - our_fake_subtree.level = our_node->level + 1; - our_fake_subtree.set_raw_slot(subslot, our_slotval); - our_fake_subtree.set_slot_state(subslot, our_node->get_slot_state(slot)); - queue_refine_cmd(our_fake_subtree); - } - break; - - case dead_leaf_state: - // 15: theirs == subtree, ours == dead - L(boost::format("(#15) they have a subtree at slot %d in %s node '%s', level %d, we have a dead leaf\n") - % slot % typestr % hpref % lev); - { - size_t subslot; - id our_slotval; - merkle_node our_fake_subtree; - our_node->get_raw_slot(slot, our_slotval); - pick_slot_and_prefix_for_value(our_slotval, our_node->level + 1, subslot, - our_fake_subtree.pref); - our_fake_subtree.type = their_node.type; - our_fake_subtree.level = our_node->level + 1; - our_fake_subtree.set_raw_slot(subslot, our_slotval); - our_fake_subtree.set_slot_state(subslot, our_node->get_slot_state(slot)); - queue_refine_cmd(our_fake_subtree); - } - break; - - case subtree_state: - // 16: theirs == subtree, ours == subtree - L(boost::format("(#16) they have a subtree at slot %d in %s node '%s', level %d, and so do we\n") - % slot % typestr % hpref % lev); - { - id our_slotval, their_slotval; - hexenc hslotval; - their_node.get_raw_slot(slot, their_slotval); - our_node->get_raw_slot(slot, our_slotval); - our_node->get_hex_slot(slot, hslotval); - if (their_slotval == our_slotval) - { - L(boost::format("(#16) we both have %s subtree '%s'\n") % typestr % hslotval); - continue; - } - else - { - L(boost::format("(#16) %s subtrees at slot %d differ, refining ours\n") % typestr % slot); - prefix subprefix; - our_node->extended_raw_prefix(slot, subprefix); - merkle_ptr our_subtree; - load_merkle_node(our_node->type, our_node->level + 1, - subprefix, our_subtree); - queue_refine_cmd(*our_subtree); - } - } - break; - } - break; - } - } - } - return true; -} - - -bool -session::process_send_data_cmd(netcmd_item_type type, - id const & item) -{ - string typestr; - netcmd_item_type_to_string(type, typestr); - hexenc hitem; - encode_hexenc(item, hitem); - L(F("received 'send_data' netcmd requesting %s '%s'\n") - % typestr % hitem); - if (data_exists(type, item, this->app)) - { - string out; - load_data(type, item, this->app, out); - queue_data_cmd(type, item, out); - } - else - { - queue_nonexistant_cmd(type, item); - } - return true; -} - -bool -session::process_send_delta_cmd(netcmd_item_type type, - id const & base, - id const & ident) -{ - string typestr; - netcmd_item_type_to_string(type, typestr); - delta del; - - hexenc hbase, hident; - encode_hexenc(base, hbase); - encode_hexenc(ident, hident); - - L(F("received 'send_delta' netcmd requesting %s edge '%s' -> '%s'\n") - % typestr % hbase % hident); - - switch (type) - { - case file_item: - { - file_id fbase(hbase), fident(hident); - file_delta fdel; - if (this->app.db.file_version_exists(fbase) - && this->app.db.file_version_exists(fident)) - { - file_data base_fdat, ident_fdat; - data base_dat, ident_dat; - this->app.db.get_file_version(fbase, base_fdat); - this->app.db.get_file_version(fident, ident_fdat); - string tmp; - base_dat = base_fdat.inner(); - ident_dat = ident_fdat.inner(); - compute_delta(base_dat(), ident_dat(), tmp); - del = delta(tmp); - } - else - { - return process_send_data_cmd(type, ident); - } - } - break; - - default: - throw bad_decode(F("delta requested for item type %s\n") % typestr); - } - queue_delta_cmd(type, base, ident, del); - return true; -} - -bool session::process_data_cmd(netcmd_item_type type, id const & item, string const & dat) @@ -2506,14 +1802,7 @@ cert_hash_code(c, tmp); if (! (tmp == hitem)) throw bad_decode(F("hash check failed for revision cert '%s'") % hitem); -// this->dbw.consume_revision_cert(revision(c)); - received_certs[revision_id(c.ident)][c.name].push_back(c); - if (!app.db.revision_exists(revision_id(c.ident))) - { - id rid; - decode_hexenc(c.ident, rid); - queue_send_data_cmd(revision_item, rid); - } + this->dbw.consume_revision_cert(revision(c)); } break; @@ -2525,16 +1814,7 @@ else { L(F("received revision '%s'\n") % hitem); - boost::shared_ptr< pair > - rp(new pair()); - - rp->first = revision_data(dat); - read_revision_set(dat, rp->second); - ancestry.insert(std::make_pair(rid, rp)); - if (cert_refinement_done()) - { - analyze_ancestry_graph(); - } + this->dbw.consume_revision_data(rid, revision_data(dat)); } } break; @@ -2546,6 +1826,7 @@ L(F("file version '%s' already exists in our database\n") % hitem); else { + L(F("received file '%s'\n") % hitem); this->dbw.consume_file_data(fid, file_data(dat)); } } @@ -2569,8 +1850,6 @@ pair id_pair = make_pair(base, ident); - // it's ok if we received something we didn't ask for; it might - // be a spontaneous transmission from refinement note_item_arrived(type, ident); switch (type) @@ -2578,26 +1857,9 @@ case file_item: { file_id src_file(hbase), dst_file(hident); - if (full_delta_items[file_item]->find(ident) - != full_delta_items[file_item]->end()) - { - this->dbw.consume_file_delta(src_file, - dst_file, - file_delta(del), - true); - } - else if (reverse_delta_requests.find(id_pair) - != reverse_delta_requests.end()) - { - reverse_delta_requests.erase(id_pair); - this->dbw.consume_file_reverse_delta(src_file, - dst_file, - file_delta(del)); - } - else - this->dbw.consume_file_delta(src_file, - dst_file, - file_delta(del)); + this->dbw.consume_file_delta(src_file, + dst_file, + file_delta(del)); } break; @@ -2608,20 +1870,6 @@ return true; } -bool -session::process_nonexistant_cmd(netcmd_item_type type, - id const & item) -{ - string typestr; - netcmd_item_type_to_string(type, typestr); - hexenc hitem; - encode_hexenc(item, hitem); - L(F("received 'nonexistant' netcmd for %s '%s'\n") - % typestr % hitem); - note_item_arrived(type, item); - return true; -} - bool session::process_usher_cmd(utf8 const & msg) { @@ -2639,35 +1887,22 @@ return true; } -bool -session::merkle_node_exists(netcmd_item_type type, - size_t level, - prefix const & pref) -{ - map >::const_iterator i = - merkle_tables.find(type); - - I(i != merkle_tables.end()); - merkle_table::const_iterator j = i->second->find(std::make_pair(pref, level)); - return (j != i->second->end()); -} -void -session::load_merkle_node(netcmd_item_type type, - size_t level, - prefix const & pref, - merkle_ptr & node) +void +session::send_all_data(netcmd_item_type ty, set const & items) { - map >::const_iterator i = - merkle_tables.find(type); - - I(i != merkle_tables.end()); - merkle_table::const_iterator j = i->second->find(std::make_pair(pref, level)); - I(j != i->second->end()); - node = j->second; + for (set::const_iterator i = items.begin(); + i != items.end(); ++i) + { + if (data_exists(ty, *i, this->app)) + { + string out; + load_data(ty, *i, this->app, out); + queue_data_cmd(ty, *i, out); + } + } } - bool session::dispatch_payload(netcmd const & cmd) { @@ -2774,10 +2009,6 @@ { merkle_node node; cmd.read_refine_cmd(node); - map< netcmd_item_type, done_marker>::iterator i = done_refinements.find(node.type); - require(i != done_refinements.end(), "refinement netcmd refers to valid type"); - require(i->second.tree_is_done == false, "refinement netcmd received when tree is live"); - i->second.current_level_had_refinements = true; return process_refine_cmd(node); } break; @@ -2788,34 +2019,27 @@ size_t level; netcmd_item_type type; cmd.read_done_cmd(level, type); - return process_done_cmd(level, type); } break; - case send_data_cmd: - require(authenticated, "send_data netcmd received when authenticated"); - require(role == source_role || - role == source_and_sink_role, - "send_data netcmd received in source or source/sink role"); + case note_item_cmd: { - netcmd_item_type type; + netcmd_item_type ty; id item; - cmd.read_send_data_cmd(type, item); - return process_send_data_cmd(type, item); + cmd.read_note_item_cmd(ty, item); + return process_note_item_cmd(ty, item); } break; - case send_delta_cmd: - require(authenticated, "send_delta netcmd received when authenticated"); - require(role == source_role || - role == source_and_sink_role, - "send_delta netcmd received in source or source/sink role"); + case note_shared_subtree_cmd: { - netcmd_item_type type; - id base, ident; - cmd.read_send_delta_cmd(type, base, ident); - return process_send_delta_cmd(type, base, ident); + netcmd_item_type ty; + prefix pref; + size_t lev; + cmd.read_note_shared_subtree_cmd(ty, pref, lev); + return process_note_shared_subtree_cmd(ty, pref, lev); } + break; case data_cmd: require(authenticated, "data netcmd received when authenticated"); @@ -2845,18 +2069,6 @@ } break; - case nonexistant_cmd: - require(authenticated, "nonexistant netcmd received when authenticated"); - require(role == sink_role || - role == source_and_sink_role, - "nonexistant netcmd received in sink or source/sink role"); - { - netcmd_item_type type; - id item; - cmd.read_nonexistant_cmd(type, item); - return process_nonexistant_cmd(type, item); - } - break; case usher_cmd: { utf8 greeting; @@ -2864,6 +2076,7 @@ return process_usher_cmd(greeting); } break; + case usher_reply_cmd: return false;// should not happen break; @@ -3318,41 +2531,33 @@ // ///////////////////////////////////////////////// -static boost::shared_ptr -make_root_node(session & sess, - netcmd_item_type ty) -{ - boost::shared_ptr tab = - boost::shared_ptr(new merkle_table()); - - merkle_ptr tmp = merkle_ptr(new merkle_node()); - tmp->type = ty; - tab->insert(std::make_pair(std::make_pair(get_root_prefix().val, 0), tmp)); - - sess.merkle_tables[ty] = tab; - return tab; -} - void -insert_with_parents(revision_id rev, set & col, app_state & app, ticker & revisions_ticker) +insert_with_parents(revision_id rev, refiner & ref, + app_state & app, + ticker & revisions_ticker) { - vector frontier; - frontier.push_back(rev); - while (!frontier.empty()) + deque work; + set seen; + work.push_back(rev); + while (!work.empty()) { - revision_id rid = frontier.back(); - frontier.pop_back(); - if (!null_id(rid) && col.find(rid) == col.end()) + revision_id rid = work.front(); + work.pop_front(); + + if (!null_id(rid) && seen.find(rid) == seen.end()) { + seen.insert(rid); ++revisions_ticker; - col.insert(rid); + id rev_item; + decode_hexenc(rid.inner(), rev_item); + ref.note_local_item(rev_item); std::set parents; app.db.get_revision_parents(rid, parents); for (std::set::const_iterator i = parents.begin(); i != parents.end(); ++i) { - frontier.push_back(*i); + work.push_back(*i); } } } @@ -3367,10 +2572,6 @@ i != branchnames.end(); ++i) L(F("including branch %s") % *i); - boost::shared_ptr ctab = make_root_node(*this, cert_item); - boost::shared_ptr ktab = make_root_node(*this, key_item); - boost::shared_ptr etab = make_root_node(*this, epoch_item); - // xgettext: please use short message and try to avoid multibytes chars ticker revisions_ticker(_("revisions"), "r", 64); // xgettext: please use short message and try to avoid multibytes chars @@ -3398,11 +2599,13 @@ j != certs.end(); j++) { insert_with_parents(revision_id(j->inner().ident), - revision_ids, app, revisions_ticker); + rev_refiner, app, revisions_ticker); // branch certs go in here, others later on - hexenc hash; - cert_hash_code(j->inner(), hash); - insert_into_merkle_tree(*ctab, cert_item, true, hash, 0); + hexenc tmp; + id item; + cert_hash_code(j->inner(), tmp); + decode_hexenc(tmp, item); + cert_refiner.note_local_item(item); if (inserted_keys.find(j->inner().key) == inserted_keys.end()) inserted_keys.insert(j->inner().key); } @@ -3432,8 +2635,10 @@ j = epochs.find(branch); I(j != epochs.end()); epoch_id eid; + id epoch_item; epoch_hash_code(j->first, j->second, eid); - insert_into_merkle_tree(*etab, epoch_item, true, eid.inner(), 0); + decode_hexenc(eid.inner(), epoch_item); + epoch_refiner.note_local_item(epoch_item); } } @@ -3455,7 +2660,9 @@ if (revision_ids.find(ident) == revision_ids.end()) continue; - insert_into_merkle_tree(*ctab, cert_item, true, hash, 0); + id item; + decode_hexenc(hash, item); + cert_refiner.note_local_item(item); ++certs_ticker; if (inserted_keys.find(key) == inserted_keys.end()) inserted_keys.insert(key); @@ -3488,14 +2695,17 @@ app.db.get_key(*key, pub_encoded); hexenc keyhash; key_hash_code(*key, pub_encoded, keyhash); - insert_into_merkle_tree(*ktab, key_item, true, keyhash, 0); + id key_item; + decode_hexenc(keyhash, key_item); + key_refiner.note_local_item(key_item); ++keys_ticker; } } - recalculate_merkle_codes(*etab, get_root_prefix().val, 0); - recalculate_merkle_codes(*ktab, get_root_prefix().val, 0); - recalculate_merkle_codes(*ctab, get_root_prefix().val, 0); + rev_refiner.reindex_local_items(); + cert_refiner.reindex_local_items(); + key_refiner.reindex_local_items(); + epoch_refiner.reindex_local_items(); } void @@ -3535,247 +2745,3 @@ } } - -// Steps for determining files/manifests to request, from -// a given revision ancestry: -// -// 1) find the new heads, consume valid branch certs etc. -// -// 2) foreach new head, traverse up the revision ancestry, building -// a set of reverse file/manifest deltas (we stop when we hit an -// already-seen or existing-in-db rev). -// -// at the same time, build a (smaller) set of forward deltas (files and -// manifests). these have a file/manifest in the new head as the -// destination, and end up having an item already existing in the -// database as the source (or null, in which case full data is -// requested). -// -// 3) For each file/manifest in head, first request the forward delta -// (or full data if there is no path back to existing data). Then -// traverse up the set of reverse deltas, daisychaining our way until -// we get to existing revisions. -// -// Notes: -// -// - The database stores reverse deltas, so preferring these allows -// a server to send pre-computed deltas straight from the database -// (this isn't done yet). In order to bootstrap the tip-most data, -// forward deltas from a close(est?)-ancestor are used, or full data -// is requested if there is no existing ancestor. -// -// eg, if we have the (manifest) ancestry -// A -> B -> C -> D -// where A is existing, {B,C,D} are new, then we will request deltas -// A->D (fwd) -// D->C (rev) -// C->B (rev) -// This may result in slightly larger deltas than using all forward -// deltas, however it should be more efficient. -// -// - in order to keep a good hit ratio with the reconstructed version -// cache in database, we'll request deltas for a single file/manifest -// all at once, rather than requesting deltas per-revision. This -// requires a bit more memory usage, though it will be less memory -// than would be required to store all the outgoing delta requests -// anyway. -ancestry_fetcher::ancestry_fetcher(session & s) - : sess(s) -{ - set new_heads; - sess.get_heads_and_consume_certs( new_heads ); - - L(F("ancestry_fetcher: got %d heads") % new_heads.size()); - - traverse_ancestry(new_heads); - - request_files(); -} - -// adds file deltas from the given changeset into the sets of forward -// and reverse deltas -void -ancestry_fetcher::traverse_files(cset const & cs) -{ - for (std::map::const_iterator i = cs.files_added.begin(); - i != cs.files_added.end(); ++i) - { - // add any new forward deltas - if (seen_files.find(i->second) == seen_files.end()) - { - file_id parent; - fwd_file_deltas.insert( make_pair( parent, i->second ) ); - } - seen_files.insert(i->second); - } - - for (std::map >::const_iterator - d = cs.deltas_applied.begin(); - d != cs.deltas_applied.end(); ++d) - { - file_id parent_file (delta_entry_src(d)); - file_id child_file (delta_entry_dst(d)); - MM(parent_file); - MM(child_file); - - I(!(parent_file == child_file)); - // when changeset format is altered to have [...]->[] deltas on deletion, - // this assertion needs revisiting - I(!null_id(child_file)); - - // request the reverse delta - if (!null_id(parent_file)) - { - rev_file_deltas.insert(make_pair(child_file, parent_file)); - } - - // add any new forward deltas - if (seen_files.find(child_file) == seen_files.end()) - { - fwd_file_deltas.insert( make_pair( parent_file, child_file ) ); - } - - // update any forward deltas. no point updating if it already - // points to something we have. - if (!null_id(parent_file) - && fwd_file_deltas.find(child_file) != fwd_file_deltas.end()) - { - // We're traversing with child->parent of A->B. - // Update any forward deltas with a parent of B to - // have A as a parent, ie B->C becomes A->C. - for (multimap::iterator d = - fwd_file_deltas.lower_bound(child_file); - d != fwd_file_deltas.upper_bound(child_file); - d++) - { - fwd_file_deltas.insert(make_pair(parent_file, d->second)); - } - - fwd_file_deltas.erase(fwd_file_deltas.lower_bound(child_file), - fwd_file_deltas.upper_bound(child_file)); - } - - seen_files.insert(child_file); - seen_files.insert(parent_file); - } -} - -// traverse up the ancestry for each of the given new head revisions, -// storing sets of file and manifest deltas -void -ancestry_fetcher::traverse_ancestry(set const & heads) -{ - deque frontier; - set seen_revs; - - for (set::const_iterator h = heads.begin(); - h != heads.end(); h++) - { - L(F("traversing head %s") % *h); - frontier.push_back(*h); - seen_revs.insert(*h); - } - - // breadth first up the ancestry - while (!frontier.empty()) - { - revision_id const & rev = frontier.front(); - MM(rev); - - I(sess.ancestry.find(rev) != sess.ancestry.end()); - - for (edge_map::const_iterator e = sess.ancestry[rev]->second.edges.begin(); - e != sess.ancestry[rev]->second.edges.end(); e++) - { - revision_id const & par = edge_old_revision(e); - MM(par); - if (seen_revs.find(par) == seen_revs.end()) - { - if (sess.ancestry.find(par) != sess.ancestry.end()) - { - frontier.push_back(par); - } - seen_revs.insert(par); - } - - traverse_files(edge_changes(e)); - - } - - sess.dbw.consume_revision_data(rev, sess.ancestry[rev]->first); - frontier.pop_front(); - } -} - -void -ancestry_fetcher::request_rev_file_deltas(file_id const & start, - set & done_files) -{ - stack< file_id > frontier; - frontier.push(start); - - while (!frontier.empty()) - { - file_id const child = frontier.top(); - MM(child); - I(!null_id(child)); - frontier.pop(); - - for (multimap< file_id, file_id>::const_iterator - d = rev_file_deltas.lower_bound(child); - d != rev_file_deltas.upper_bound(child); - d++) - { - file_id const & parent = d->second; - MM(parent); - I(!null_id(parent)); - if (done_files.find(parent) == done_files.end()) - { - done_files.insert(parent); - if (!sess.app.db.file_version_exists(parent)) - { - sess.queue_send_delta_cmd(file_item, - plain_id(child), plain_id(parent)); - sess.reverse_delta_requests.insert(make_pair(plain_id(child), - plain_id(parent))); - } - frontier.push(parent); - } - } - } -} - -void -ancestry_fetcher::request_files() -{ - // just a cache to avoid checking db.foo_version_exists() too much - set done_files; - - for (multimap::const_iterator d = fwd_file_deltas.begin(); - d != fwd_file_deltas.end(); d++) - { - file_id const & anc = d->first; - file_id const & child = d->second; - MM(anc); - MM(child); - if (!sess.app.db.file_version_exists(child)) - { - if (null_id(anc) - || !sess.app.db.file_version_exists(anc)) - { - sess.queue_send_data_cmd(file_item, plain_id(child)); - } - else - { - sess.queue_send_delta_cmd(file_item, - plain_id(anc), plain_id(child)); - sess.note_item_full_delta(file_item, plain_id(child)); - } - } - - // traverse up the reverse deltas - request_rev_file_deltas(child, done_files); - } -} - - ============================================================ --- refiner.cc +++ refiner.cc e584a3e6eeeed3c9b009364df62720b9c34ef07c @@ -0,0 +1,400 @@ + +// copyright (C) 2005 graydon hoare +// all rights reserved. +// licensed to the public under the terms of the GNU GPL (>= 2) +// see the file COPYING for details + +#include +#include +#include + +#include + +#include "refiner.hh" +#include "vocab.hh" +#include "merkle_tree.hh" +#include "netcmd.hh" + +using std::string; +using std::set; + +// The previous incarnation of this algorithm had code related to sending +// decisions (and skippable transmissions) mixed in with the refinement +// actions. +// +// This incarnation is much simpler: our goal is only to learn the complete +// set of items in our peer's tree, and inform our peer of every item we +// have in our tree. To do this we must perform a complete refinement, and +// record the results in an in-memory table. We will decide what to send +// elsewhere, based on this knowledge. + +void +refiner::note_local_item(id const & item) +{ + insert_into_merkle_tree(table, type, item, 0); +} + + +void +refiner::reindex_local_items() +{ + recalculate_merkle_codes(table, prefix(""), 0); +} + + +void +refiner::refine_synthetic_empty_subtree(merkle_node const & their_node, + size_t slot) +{ + // Our peer has a subtree, we have nothing. We want to explore their + // subtree but we have nothing real to send, so we synthesize an empty + // node and send it as a refinement request. + merkle_node our_fake_node; + their_node.extended_prefix(slot, our_fake_node.pref); + our_fake_node.level = their_node.level + 1; + our_fake_node.type = their_node.type; + cb.queue_refine_cmd(our_fake_node); +} + + +void +refiner::refine_synthetic_singleton_subtree(merkle_node const & their_node, + merkle_node const & our_node, + size_t slot) +{ + // Our peer has a subtree, we have a single leaf. We want to explore + // their subtree but we have nothing real to send, so we synthesize an + // empty subtree and push our leaf into it, sending a refinement request + // on the new (fake) subtree. + size_t subslot; + id our_slotval; + merkle_node our_fake_subtree; + our_node.get_raw_slot(slot, our_slotval); + pick_slot_and_prefix_for_value(our_slotval, our_node.level + 1, + subslot, our_fake_subtree.pref); + our_fake_subtree.type = their_node.type; + our_fake_subtree.level = our_node.level + 1; + our_fake_subtree.set_raw_slot(subslot, our_slotval); + our_fake_subtree.set_slot_state(subslot, our_node.get_slot_state(slot)); + cb.queue_refine_cmd(our_fake_subtree); +} + + +void +refiner::inform_peer_of_item_in_slot(merkle_node const & our_node, + size_t slot) +{ + id slotval; + string tmp; + our_node.get_raw_slot(slot, slotval); + cb.queue_note_item_cmd(type, slotval); +} + + +void +refiner::load_merkle_node(size_t level, prefix const & pref, + merkle_ptr & node) +{ + merkle_table::const_iterator j = table.find(std::make_pair(pref, level)); + I(j != table.end()); + node = j->second; +} + +bool +refiner::merkle_node_exists(size_t level, + prefix const & pref) +{ + merkle_table::const_iterator j = table.find(std::make_pair(pref, level)); + return (j != table.end()); +} + +void +refiner::calculate_items_to_send_and_receive() +{ + items_to_send.clear(); + items_to_receive.clear(); + + std::set_difference(local_items.begin(), local_items.end(), + peer_items.begin(), peer_items.end(), + std::inserter(items_to_send, items_to_send.begin())); + + std::set_difference(peer_items.begin(), peer_items.end(), + local_items.begin(), local_items.end(), + std::inserter(items_to_receive, items_to_receive.begin())); +} + + +void +refiner::inform_peer_of_subtree_in_slot(merkle_node const & our_node, + size_t slot) +{ + prefix subprefix; + our_node.extended_raw_prefix(slot, subprefix); + merkle_ptr our_subtree; + load_merkle_node(our_node.level + 1, subprefix, our_subtree); + cb.queue_refine_cmd(*our_subtree); +} + +void +refiner::note_subtree_shared_with_peer(merkle_node const & our_subtree) +{ + prefix pref; + our_subtree.get_raw_prefix(pref); + collect_items_in_subtree(table, pref, our_subtree.level, peer_items); +} + +void +refiner::note_subtree_shared_with_peer(prefix const & pref, size_t lev) +{ + collect_items_in_subtree(table, pref, lev, peer_items); +} + + +void +refiner::compare_subtrees_and_maybe_refine(merkle_node const & their_node, + merkle_node const & our_node, + size_t slot) +{ + // Our peer has a subtree at slot, and so do we. + // + // There are three things to do here: + // + // 1. If we have the same subtree as the peer, for every item in our + // subtree, make a note to ourself that the peer has that item too. + // + // 2. If we have the same subtree, make sure our peer knows it, so + // they can perform #1 for themselves. + // + // 3. If we have different subtrees, refine. + + id our_slotval, their_slotval; + their_node.get_raw_slot(slot, their_slotval); + our_node.get_raw_slot(slot, our_slotval); + + prefix pref; + our_node.extended_raw_prefix(slot, pref); + merkle_ptr our_subtree; + size_t level = our_node.level + 1; + load_merkle_node(level, pref, our_subtree); + + if (their_slotval == our_slotval) + { + cb.queue_note_shared_subtree_cmd(type, pref, level); + note_subtree_shared_with_peer(*our_subtree); + } + else + cb.queue_refine_cmd(*our_subtree); +} + + +refiner::refiner(netcmd_item_type type, refiner_callbacks & cb) + : type(type), cb(cb), + exchanged_data_since_last_done_cmd(false), + finished_refinement(false) +{} + +void +refiner::note_item_in_peer(id const & item) +{ + peer_items.insert(item); +} + + +void +refiner::note_item_in_peer(merkle_node const & their_node, + size_t slot) +{ + I(slot < constants::merkle_num_slots); + id slotval; + their_node.get_raw_slot(slot, slotval); + + note_item_in_peer(slotval); + + // Write a debug message + { + hexenc hslotval; + their_node.get_hex_slot(slot, hslotval); + + size_t lev = static_cast(their_node.level); + + hexenc hpref; + their_node.get_hex_prefix(hpref); + + string typestr; + netcmd_item_type_to_string(their_node.type, typestr); + + L(boost::format("peer has %s '%s' at slot %d " + "(in node '%s', level %d)\n") + % typestr % hslotval % slot % hpref % lev); + } +} + + +void +refiner::begin_refinement() +{ + merkle_ptr root; + load_merkle_node(0, prefix(""), root); + cb.queue_refine_cmd(*root); + cb.queue_done_cmd(0, type); +} + + +void +refiner::process_done_command(size_t level) +{ + if (!exchanged_data_since_last_done_cmd + || level >= 0xff) + { + // Echo 'done' if we're shutting down + if (!finished_refinement) + cb.queue_done_cmd(level+1, type); + + // Mark ourselves shut down + finished_refinement = true; + + // And prepare for queries from our host + calculate_items_to_send_and_receive(); + } + else if (exchanged_data_since_last_done_cmd + && !finished_refinement) + { + // Echo 'done', we're still active. + cb.queue_done_cmd(level+1, type); + } + + // Reset exchanged_data_since_last_done_cmd + exchanged_data_since_last_done_cmd = false; +} + +bool +refiner::done() const +{ + return finished_refinement; +} + + +void +refiner::process_peer_node(merkle_node const & their_node) +{ + prefix pref; + hexenc hpref; + their_node.get_raw_prefix(pref); + their_node.get_hex_prefix(hpref); + string typestr; + + netcmd_item_type_to_string(their_node.type, typestr); + size_t lev = static_cast(their_node.level); + + L(F("received 'refine' netcmd on %s node '%s', level %d\n") + % typestr % hpref % lev); + + if (!merkle_node_exists(their_node.level, pref)) + { + L(F("no corresponding %s merkle node for prefix '%s', level %d\n") + % typestr % hpref % lev); + + for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot) + { + switch (their_node.get_slot_state(slot)) + { + case empty_state: + // We agree, this slot is empty. + break; + + case leaf_state: + note_item_in_peer(their_node, slot); + break; + + case subtree_state: + refine_synthetic_empty_subtree(their_node, slot); + break; + } + } + } + else + { + // We have a corresponding merkle node. There are 9 branches + // to the following switch condition. It is awful. Sorry. + L(F("found corresponding %s merkle node for prefix '%s', level %d\n") + % typestr % hpref % lev); + merkle_ptr our_node; + load_merkle_node(their_node.level, pref, our_node); + + for (size_t slot = 0; slot < constants::merkle_num_slots; ++slot) + { + switch (their_node.get_slot_state(slot)) + { + case empty_state: + switch (our_node->get_slot_state(slot)) + { + + case empty_state: + // 1: theirs == empty, ours == empty + break; + + case leaf_state: + // 2: theirs == empty, ours == leaf + inform_peer_of_item_in_slot(*our_node, slot); + break; + + case subtree_state: + // 3: theirs == empty, ours == subtree + inform_peer_of_subtree_in_slot(*our_node, slot); + break; + + } + break; + + + case leaf_state: + switch (our_node->get_slot_state(slot)) + { + + case empty_state: + // 4: theirs == leaf, ours == empty + note_item_in_peer(their_node, slot); + break; + + case leaf_state: + // 5: theirs == leaf, ours == leaf + note_item_in_peer(their_node, slot); + inform_peer_of_item_in_slot(*our_node, slot); + break; + + case subtree_state: + // 6: theirs == leaf, ours == subtree + note_item_in_peer(their_node, slot); + inform_peer_of_subtree_in_slot(*our_node, slot); + break; + } + break; + + case subtree_state: + switch (our_node->get_slot_state(slot)) + { + case empty_state: + // 7: theirs == subtree, ours == empty + refine_synthetic_empty_subtree(their_node, slot); + break; + + case leaf_state: + // 14: theirs == subtree, ours == leaf + refine_synthetic_singleton_subtree(their_node, + *our_node, slot); + break; + + case subtree_state: + // 16: theirs == subtree, ours == subtree + compare_subtrees_and_maybe_refine(their_node, + *our_node, slot); + break; + } + break; + } + } + } +} + + + ============================================================ --- refiner.hh +++ refiner.hh 1374f8d2f65822995c016ecff3bec86dc06bf1d5 @@ -0,0 +1,95 @@ +#ifndef __REFINER_HH__ +#define __REFINER_HH__ + +// copyright (C) 2005 graydon hoare +// all rights reserved. +// licensed to the public under the terms of the GNU GPL (>= 2) +// see the file COPYING for details + +#include + +#include "vocab.hh" +#include "merkle_tree.hh" +#include "netcmd.hh" + +// This file defines the "refiner" class, which is a helper encapsulating +// the main tricky part of the netsync algorithm. You must construct a +// refiner for every merkle trie you wish to refine, and pass it a +// reference to a refiner_callbacks object, such as the netsync session +// object. Refinement proceeds in stages. +// +// 1. Add local items. +// +// 2. Call reindex_local_items to index the merkle table. +// +// 3. Call begin_refinement, and process the queue_refine_cmd callback +// this will generate. +// +// 4. Call process_peer_node repeatedly as nodes arrive from your peer, +// processing the callbacks each such call generates. +// +// 5. When done, stop refining and examine the sets of local and peer +// items you've determined the existence of during refinement. + + +struct +refiner_callbacks +{ + virtual void queue_refine_cmd(merkle_node const & our_node) = 0; + virtual void queue_note_item_cmd(netcmd_item_type ty, id item) = 0; + virtual void queue_note_shared_subtree_cmd(netcmd_item_type ty, + prefix const & pref, + size_t level) = 0; + virtual void queue_done_cmd(size_t level, + netcmd_item_type ty) = 0; + virtual ~refiner_callbacks() {} +}; + +class +refiner +{ + netcmd_item_type type; + refiner_callbacks & cb; + bool exchanged_data_since_last_done_cmd; + bool finished_refinement; + + std::set local_items; + std::set peer_items; + merkle_table table; + + void refine_synthetic_empty_subtree(merkle_node const & their_node, + size_t slot); + void refine_synthetic_singleton_subtree(merkle_node const & their_node, + merkle_node const & our_node, + size_t slot); + void inform_peer_of_item_in_slot(merkle_node const & our_node, size_t slot); + void inform_peer_of_subtree_in_slot(merkle_node const & our_node, size_t slot); + void note_subtree_shared_with_peer(merkle_node const & our_subtree); + void compare_subtrees_and_maybe_refine(merkle_node const & their_node, + merkle_node const & our_node, + size_t slot); + void note_item_in_peer(merkle_node const & their_node, size_t slot); + void load_merkle_node(size_t level, prefix const & pref, + merkle_ptr & node); + bool merkle_node_exists(size_t level, prefix const & pref); + void calculate_items_to_send_and_receive(); + +public: + + refiner(netcmd_item_type type, refiner_callbacks & cb); + void note_item_in_peer(id const & item); + void note_subtree_shared_with_peer(prefix const & pref, size_t lev); + void note_local_item(id const & item); + void reindex_local_items(); + void begin_refinement(); + bool done() const; + void process_done_command(size_t level); + void process_peer_node(merkle_node const & their_node); + + // These are populated as the 'done' packets arrive. + std::set items_to_send; + std::set items_to_receive; +}; + + +#endif // __REFINER_H__