# # # patch "enumerator.cc" # from [db11df61b79edbfa3d59b65cdeee1c3c0a102f9a] # to [fced10d3654e3cfb37e9ad3e6d74238bd796737e] # # patch "enumerator.hh" # from [566bc3989489ca714e5c47a71c1f07f094340923] # to [a506623c172113bc4c55cb87097820a789dc24fb] # # patch "netcmd.cc" # from [354be8e5391634f749c53b7b33e693235b254daa] # to [7d4623fa34ca32a1217a77fe3c5e3041d33daee4] # # patch "netcmd.hh" # from [88a2a49cc8638b5f98366be83cb53bce8ef21e99] # to [2d51f4bf2ea58f075eef6536428a46b2023a9e3e] # # patch "netsync.cc" # from [c83bebf13c42335eab22a6dab1b94c3cbd6c0900] # to [d28efc3e97f8f0f74784f0f945bb78532ba9b3e6] # # patch "packet.cc" # from [d08e5a64d015377161236974a8cac24d6469310d] # to [f39c8a694ef3c87a5d262c712fb567c2cbaa54e3] # # patch "refiner.cc" # from [3fe6d5d6fc1573591fe2d8b080920a6d55747f7d] # to [17fef57cd7411873901a8a58a69eef603b417dd4] # # patch "refiner.hh" # from [1374f8d2f65822995c016ecff3bec86dc06bf1d5] # to [cdb2d13b9054ea56707690d422ee4349774df28c] # # patch "tests/t_netsync_nocerts.at" # from [5b1e9d87f055a58f2c7d3ee0cfaed2e185469955] # to [a46961c621de2d5ffe721d3cbc7b6561b7964a1b] # ============================================================ --- enumerator.cc db11df61b79edbfa3d59b65cdeee1c3c0a102f9a +++ enumerator.cc fced10d3654e3cfb37e9ad3e6d74238bd796737e @@ -110,17 +110,20 @@ item.ident_a = r.inner(); items.push_back(item); } + } - // Queue up all the rev's certs - vector > hashes; - app.db.get_revision_certs(r, hashes); - for (vector >::const_iterator i = hashes.begin(); - i != hashes.end(); ++i) + // Queue up some or all of the rev's certs + vector > hashes; + app.db.get_revision_certs(r, hashes); + for (vector >::const_iterator i = hashes.begin(); + i != hashes.end(); ++i) + { + if (cb.queue_this_cert(*i)) { enumerator_item item; item.tag = enumerator_item::cert; item.ident_a = *i; - items.push_back(item); + items.push_back(item); } } } ============================================================ --- enumerator.hh 566bc3989489ca714e5c47a71c1f07f094340923 +++ enumerator.hh a506623c172113bc4c55cb87097820a789dc24fb @@ -19,11 +19,12 @@ struct enumerator_callbacks { - // Your callback will be asked whether you want the details of each rev, - // in order; you should return true for any rev you want to be notified - // about the contents of. The rev's children will be traversed no matter - // what you return here. + // Your callback will be asked whether you want the details of each rev + // or cert, in order; you should return true for any rev or cert you want + // to be notified about the contents of. The rev's children will be + // traversed no matter what you return here. virtual bool process_this_rev(revision_id const & rev) = 0; + virtual bool queue_this_cert(hexenc const & c) = 0; virtual void note_file_data(file_id const & f) = 0; virtual void note_file_delta(file_id const & src, file_id const & dst) = 0; ============================================================ --- netcmd.cc 354be8e5391634f749c53b7b33e693235b254daa +++ netcmd.cc 7d4623fa34ca32a1217a77fe3c5e3041d33daee4 @@ -98,6 +98,7 @@ switch (cmd_byte) { case static_cast(hello_cmd): + case static_cast(bye_cmd): case static_cast(anonymous_cmd): case static_cast(auth_cmd): case static_cast(error_cmd): @@ -110,6 +111,7 @@ case static_cast(delta_cmd): case static_cast(usher_cmd): cmd_code = static_cast(cmd_byte); + P(F("examining command type %d\n") % cmd_code); break; default: throw bad_decode(F("unknown netcmd code 0x%x") % widen(cmd_byte)); @@ -216,6 +218,25 @@ } +void +netcmd::read_bye_cmd(u8 & phase) const +{ + size_t pos = 0; + // syntax is: + phase = extract_datum_lsb(payload, pos, "bye netcmd, phase number"); + assert_end_of_buffer(payload, pos, "bye netcmd payload"); +} + + +void +netcmd::write_bye_cmd(u8 phase) +{ + cmd_code = bye_cmd; + payload.clear(); + payload += phase; +} + + void netcmd::read_anonymous_cmd(protocol_role & role, utf8 & include_pattern, @@ -251,7 +272,7 @@ rsa_oaep_sha_data const & hmac_key_encrypted) { cmd_code = anonymous_cmd; - payload = static_cast(role); + payload += static_cast(role); insert_variable_length_string(include_pattern(), payload); insert_variable_length_string(exclude_pattern(), payload); insert_variable_length_string(hmac_key_encrypted(), payload); @@ -310,7 +331,7 @@ cmd_code = auth_cmd; I(client().size() == constants::merkle_hash_length_in_bytes); I(nonce1().size() == constants::merkle_hash_length_in_bytes); - payload = static_cast(role); + payload += static_cast(role); insert_variable_length_string(include_pattern(), payload); insert_variable_length_string(exclude_pattern(), payload); payload += client(); @@ -366,7 +387,7 @@ I(item().size() == constants::merkle_hash_length_in_bytes); cmd_code = note_item_cmd; payload.clear(); - payload = static_cast(type); + payload += static_cast(type); payload += item(); } @@ -394,6 +415,7 @@ size_t level) { payload.clear(); + cmd_code = note_shared_subtree_cmd; payload += static_cast(type); insert_variable_length_string(pref(), payload); insert_datum_uleb128(level, payload); @@ -456,7 +478,7 @@ { cmd_code = data_cmd; I(item().size() == constants::merkle_hash_length_in_bytes); - payload = static_cast(type); + payload += static_cast(type); payload += item(); if (dat.size() > constants::netcmd_minimum_bytes_to_bother_with_gzip) { @@ -512,7 +534,7 @@ cmd_code = delta_cmd; I(base().size() == constants::merkle_hash_length_in_bytes); I(ident().size() == constants::merkle_hash_length_in_bytes); - payload = static_cast(type); + payload += static_cast(type); payload += base(); payload += ident(); @@ -656,6 +678,20 @@ L(boost::format("hello_cmd test done, buffer was %d bytes\n") % buf.size()); } + // bye_cmd + { + L(boost::format("checking i/o round trip on bye_cmd\n")); + netcmd out_cmd, in_cmd; + u8 out_phase(1), in_phase; + string buf; + + out_cmd.write_bye_cmd(out_phase); + do_netcmd_roundtrip(out_cmd, in_cmd, buf); + in_cmd.read_done_cmd(in_phase); + BOOST_CHECK(in_phase == out_phase); + L(boost::format("bye_cmd test done, buffer was %d bytes\n") % buf.size()); + } + // anonymous_cmd { L(boost::format("checking i/o round trip on anonymous_cmd\n")); ============================================================ --- netcmd.hh 88a2a49cc8638b5f98366be83cb53bce8ef21e99 +++ netcmd.hh 2d51f4bf2ea58f075eef6536428a46b2023a9e3e @@ -27,6 +27,7 @@ { // general commands error_cmd = 0, + bye_cmd = 1, // authentication commands hello_cmd = 2, @@ -98,6 +99,9 @@ rsa_pub_key const & server_key, id const & nonce); + void read_bye_cmd(u8 & phase) const; + void write_bye_cmd(u8 phase); + void read_anonymous_cmd(protocol_role & role, utf8 & include_pattern, utf8 & exclude_pattern, ============================================================ --- netsync.cc c83bebf13c42335eab22a6dab1b94c3cbd6c0900 +++ netsync.cc d28efc3e97f8f0f74784f0f945bb78532ba9b3e6 @@ -298,6 +298,13 @@ id saved_nonce; packet_db_valve dbw; + enum + { + working_state, + shutdown_state, + confirmed_state + } + protocol_state; bool encountered_error; // Interface to refinement. @@ -311,6 +318,7 @@ // enumerator_callbacks methods. bool process_this_rev(revision_id const & rev); + bool queue_this_cert(hexenc const & c); void note_file_data(file_id const & f); void note_file_delta(file_id const & src, file_id const & dst); void note_rev(revision_id const & rev); @@ -341,8 +349,9 @@ bool done_all_refinements(); bool queued_all_items(); bool received_all_items(); - bool finished_exchange_ok(); + bool finished_working(); void maybe_step(); + void maybe_say_goodbye(); void note_item_arrived(netcmd_item_type ty, id const & i); void maybe_note_epochs_finished(); @@ -357,7 +366,7 @@ void write_netcmd_and_try_flush(netcmd const & cmd); // Outgoing queue-writers. - void queue_bye_cmd(); + void queue_bye_cmd(u8 phase); void queue_error_cmd(string const & errmsg); void queue_done_cmd(size_t level, netcmd_item_type type); void queue_hello_cmd(rsa_keypair_id const & key_name, @@ -395,6 +404,7 @@ bool process_hello_cmd(rsa_keypair_id const & server_keyname, rsa_pub_key const & server_key, id const & nonce); + bool process_bye_cmd(u8 phase); bool process_anonymous_cmd(protocol_role role, utf8 const & their_include_pattern, utf8 const & their_exclude_pattern); @@ -471,6 +481,7 @@ revision_checked_ticker(NULL), saved_nonce(""), dbw(app, true), + protocol_state(working_state), encountered_error(false), epoch_refiner(epoch_item, *this), key_refiner(key_item, *this), @@ -550,9 +561,20 @@ != rev_refiner.items_to_send.end()); } +bool +session::queue_this_cert(hexenc const & c) +{ + id item; + decode_hexenc(c, item); + return (cert_refiner.items_to_send.find(item) + != cert_refiner.items_to_send.end()); +} + void session::note_file_data(file_id const & f) { + if (role == sink_role) + return; file_data fd; id item; decode_hexenc(f.inner(), item); @@ -563,6 +585,8 @@ void session::note_file_delta(file_id const & src, file_id const & dst) { + if (role == sink_role) + return; file_data fd1, fd2; delta del; id fid1, fid2; @@ -577,6 +601,8 @@ void session::note_rev(revision_id const & rev) { + if (role == sink_role) + return; revision_set rs; id item; decode_hexenc(rev.inner(), item); @@ -589,6 +615,8 @@ void session::note_cert(hexenc const & c) { + if (role == sink_role) + return; id item; decode_hexenc(c, item); revision cert; @@ -691,10 +719,12 @@ bool session::done_all_refinements() { - return rev_refiner.done() + bool all = rev_refiner.done() && cert_refiner.done() && key_refiner.done() && epoch_refiner.done(); + P(F("done all refinements? %d\n") % all); + return all; } @@ -702,29 +732,38 @@ bool session::received_all_items() { - return rev_refiner.items_to_receive.empty() + if (role == source_role) + return true; + bool all = rev_refiner.items_to_receive.empty() && cert_refiner.items_to_receive.empty() && key_refiner.items_to_receive.empty() && epoch_refiner.items_to_receive.empty(); + P(F("received all items? %d\n") % all); + return all; } bool -session::finished_exchange_ok() +session::finished_working() { - return done_all_refinements() + bool all = done_all_refinements() && received_all_items() && queued_all_items() - && rev_enumerator.done() - && outbuf.empty(); + && rev_enumerator.done(); + P(F("finished working? %d\n") % all); + return all; } bool session::queued_all_items() -{ - return rev_refiner.items_to_send.empty() +{ + if (role == sink_role) + return true; + bool all = rev_refiner.items_to_send.empty() && cert_refiner.items_to_send.empty() && key_refiner.items_to_send.empty() && epoch_refiner.items_to_send.empty(); + P(F("queued all items? %d\n") % all); + return all; } @@ -922,6 +961,16 @@ } void +session::queue_bye_cmd(u8 phase) +{ + P(F("queueing 'bye' command, phase %d\n") + % static_cast(phase)); + netcmd cmd; + cmd.write_bye_cmd(phase); + write_netcmd_and_try_flush(cmd); +} + +void session::queue_done_cmd(size_t level, netcmd_item_type type) { @@ -1511,6 +1560,83 @@ } bool +session::process_bye_cmd(u8 phase) +{ + +// Ideal shutdown +// ~~~~~~~~~~~~~~~ +// +// I/O events state transitions +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~ +// client: C_WORKING +// server: S_WORKING +// 0. [refinement, data, deltas, etc.] +// client: C_SHUTDOWN +// 1. client -> "bye 0" +// 2. "bye 0" -> server +// server: S_SHUTDOWN +// 3. "bye 1" <- server +// 4. client <- "bye 1" +// client: C_CONFIRMED +// 5. client -> "bye 2" +// 6. "bye 2" -> server +// server: S_CONFIRMED +// 7. [server drops connection] +// +// +// Affects of I/O errors or disconnections +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// C_WORKING: report error and fault +// S_WORKING: report error and recover +// C_SHUTDOWN: report error and fault +// S_SHUTDOWN: report success and recover +// (and warn that client might falsely see error) +// C_CONFIRMED: report success +// S_CONFIRMED: report success + + switch (phase) + { + case 0: + if (voice == server_voice && + protocol_state == working_state) + { + protocol_state = shutdown_state; + queue_bye_cmd(1); + } + else + error("unexpected bye phase 0 received"); + break; + + case 1: + if (voice == client_voice && + protocol_state == shutdown_state) + { + protocol_state = confirmed_state; + queue_bye_cmd(2); + } + else + error("unexpected bye phase 1 received"); + break; + + case 2: + if (voice == server_voice && + protocol_state == shutdown_state) + { + protocol_state = confirmed_state; + return false; + } + else + error("unexpected bye phase 2 received"); + break; + + default: + error((F("unknown bye phase %d received") % phase).str()); + } + + return true; +} + +bool session::process_done_cmd(size_t level, netcmd_item_type type) { switch (type) @@ -1521,7 +1647,7 @@ case key_item: key_refiner.process_done_command(level); - if (key_refiner.done()) + if (key_refiner.done() && role != sink_role) send_all_data(key_item, key_refiner.items_to_send); break; @@ -1536,7 +1662,10 @@ case epoch_item: epoch_refiner.process_done_command(level); if (epoch_refiner.done()) - send_all_data(epoch_item, epoch_refiner.items_to_send); + { + send_all_data(epoch_item, epoch_refiner.items_to_send); + maybe_note_epochs_finished(); + } break; } return true; @@ -1788,6 +1917,7 @@ throw bad_decode(F("hash check failed for public key '%s' (%s);" " wanted '%s' got '%s'") % hitem % keyid % hitem % tmp); + P(F("writing public key '%s' (item %s)\n") % keyid % tmp); this->dbw.consume_public_key(keyid, pub); } break; @@ -1892,11 +2022,20 @@ void session::send_all_data(netcmd_item_type ty, set const & items) { + string typestr; + netcmd_item_type_to_string(ty, typestr); + for (set::const_iterator i = items.begin(); i != items.end(); ++i) { + hexenc hitem; + encode_hexenc(*i, hitem); + + P(F("send_all_data: %s item '%s'\n") % typestr % hitem); + if (data_exists(ty, *i, this->app)) { + P(F("send_all_data: %s item '%s' exists, sending\n") % typestr % hitem); string out; load_data(ty, *i, this->app, out); queue_data_cmd(ty, *i, out); @@ -1931,6 +2070,15 @@ } break; + case bye_cmd: + require(authenticated, "bye netcmd received when not authenticated"); + { + u8 phase; + cmd.read_bye_cmd(phase); + return process_bye_cmd(phase); + } + break; + case anonymous_cmd: require(! authenticated, "anonymous netcmd received when not authenticated"); require(voice == server_voice, "anonymous netcmd received in server voice"); @@ -2011,7 +2159,7 @@ break; case done_cmd: - require(authenticated, "done netcmd received when authenticated"); + require(authenticated, "done netcmd received when not authenticated"); { size_t level; netcmd_item_type type; @@ -2021,6 +2169,7 @@ break; case note_item_cmd: + require(authenticated, "note_item netcmd received when not authenticated"); { netcmd_item_type ty; id item; @@ -2030,6 +2179,7 @@ break; case note_shared_subtree_cmd: + require(authenticated, "note_shared_subtree netcmd received when not authenticated"); { netcmd_item_type ty; prefix pref; @@ -2040,7 +2190,7 @@ break; case data_cmd: - require(authenticated, "data netcmd received when authenticated"); + require(authenticated, "data netcmd received when not authenticated"); require(role == sink_role || role == source_and_sink_role, "data netcmd received in source or source/sink role"); @@ -2054,7 +2204,7 @@ break; case delta_cmd: - require(authenticated, "delta netcmd received when authenticated"); + require(authenticated, "delta netcmd received when not authenticated"); require(role == sink_role || role == source_and_sink_role, "delta netcmd received in source or source/sink role"); @@ -2094,6 +2244,7 @@ void session::maybe_step() { + P(F("+maybe_step\n")); if (done_all_refinements() && !rev_enumerator.done() && outbuf_size < constants::bufsz * 10) @@ -2101,8 +2252,24 @@ P(F("stepping enumerator\n")); rev_enumerator.step(); } + P(F("-maybe_step\n")); } +void +session::maybe_say_goodbye() +{ + P(F("+maybe say goodbye\n")); + if (voice == client_voice + && protocol_state == working_state + && finished_working()) + { + P(F("initiating shutdown\n")); + protocol_state = shutdown_state; + queue_bye_cmd(0); + } + P(F("-maybe say goodbye\n")); +} + bool session::arm() { @@ -2124,7 +2291,7 @@ if (encountered_error) return true; try - { + { if (!arm()) return true; @@ -2135,12 +2302,9 @@ if (inbuf.size() >= constants::netcmd_maxsz) W(F("input buffer for peer %s is overfull after netcmd dispatch\n") % peer_id); guard.commit(); - - if (finished_exchange_ok()) - return true; - + if (!ret) - P(F("failed to process '%s' packet") % cmd.get_cmd_code()); + P(F("finishing processing with '%d' packet") % cmd.get_cmd_code()); return ret; } catch (bad_decode & bd) @@ -2153,7 +2317,7 @@ W(F("error: %s\n") % err.msg); queue_error_cmd(err.msg); encountered_error = true; - return true;// don't terminate until we've send the error_cmd + return true; // don't terminate until we've send the error_cmd } } @@ -2190,6 +2354,9 @@ % sess.peer_id % bd.what); } + sess.maybe_step(); + sess.maybe_say_goodbye(); + probe.clear(); probe.add(sess.str, sess.which_events()); Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout); @@ -2198,61 +2365,62 @@ if (fd == -1 && !armed) { - E(false, F("timed out waiting for I/O with peer %s, disconnecting\n") % sess.peer_id); + E(false, (F("timed out waiting for I/O with " + "peer %s, disconnecting\n") + % sess.peer_id)); } - + + bool all_io_clean = true; + if (event & Netxx::Probe::ready_read) - { - if (sess.read_some()) - { - try - { - armed = sess.arm(); - } - catch (bad_decode & bd) - { - E(false, F("protocol error while processing peer %s: '%s'\n") - % sess.peer_id % bd.what); - } - } - else - { - E(false, F("read from fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id); - } - } + all_io_clean = all_io_clean && sess.read_some(); if (event & Netxx::Probe::ready_write) - { - if (! sess.write_some()) - { - E(false, F("write on fd %d (peer %s) failed, disconnecting\n") % fd % sess.peer_id); - return; - } - } + all_io_clean = all_io_clean && sess.write_some(); if (event & Netxx::Probe::ready_oobd) { - E(false, F("got OOB data on fd %d (peer %s), disconnecting\n") - % fd % sess.peer_id); + E(false, (F("got OOB data from " + "peer %s, disconnecting\n") + % sess.peer_id)); } if (armed) + if (!sess.process()) + { + // We failed during processing. This should only happen in + // client voice when we have a decode exception, or received an + // error from our server (which is translated to a decode + // exception). We call these cases E() errors. + E(false, F("processing failure while talking to " + "peer %s, disconnecting\n") + % sess.peer_id); + return; + } + + if (!all_io_clean) { - if (!sess.process()) + // We had an I/O error. We must decide if this represents a + // user-reported error or a clean disconnect. See protocol + // state diagram in session::process_bye_cmd. + + if (sess.protocol_state == session::confirmed_state) + { + P(F("successful exchange with %s\n") + % sess.peer_id); + return; + } + else if (sess.encountered_error) { - E(false, F("terminated exchange with %s\n") + P(F("peer %s disconnected after we informed them of error\n") % sess.peer_id); return; } - sess.maybe_step(); + else + E(false, (F("I/O failure while talking to " + "peer %s, disconnecting\n") + % sess.peer_id)); } - - if (sess.finished_exchange_ok()) - { - P(F("successful exchange with %s\n") - % sess.peer_id); - return; - } } } @@ -2266,6 +2434,8 @@ shared_ptr >::const_iterator i = sessions.begin(); i != sessions.end(); ++i) { + i->second->maybe_step(); + i->second->maybe_say_goodbye(); try { if (i->second->arm()) @@ -2345,8 +2515,24 @@ } else { - P(F("fd %d (peer %s) read failed, disconnecting\n") - % fd % sess->peer_id); + switch (sess->protocol_state) + { + case session::working_state: + P(F("peer %s read failed in working state (error)\n") + % sess->peer_id); + break; + + case session::shutdown_state: + P(F("peer %s read failed in shutdown state " + "(possibly client misreported error)\n") + % sess->peer_id); + break; + + case session::confirmed_state: + P(F("peer %s read failed in confirmed state (success)\n") + % sess->peer_id); + break; + } sessions.erase(fd); live_p = false; } @@ -2359,15 +2545,30 @@ map > & sessions, bool & live_p) { - if (! sess->write_some()) + if (!sess->write_some()) { - P(F("fd %d (peer %s) write failed, disconnecting\n") - % fd % sess->peer_id); + switch (sess->protocol_state) + { + case session::working_state: + P(F("peer %s write failed in working state (error)\n") + % sess->peer_id); + break; + + case session::shutdown_state: + P(F("peer %s write failed in shutdown state " + "(possibly client misreported error)\n") + % sess->peer_id); + break; + + case session::confirmed_state: + P(F("peer %s write failed in confirmed state (success)\n") + % sess->peer_id); + break; + } + sessions.erase(fd); live_p = false; } - else - sess->maybe_step(); } static void @@ -2383,15 +2584,13 @@ continue; else { - Netxx::socket_type fd = j->first; shared_ptr sess = j->second; if (!sess->process()) { - P(F("fd %d (peer %s) processing finished, disconnecting\n") - % fd % sess->peer_id); + P(F("peer %s processing finished, disconnecting\n") + % sess->peer_id); sessions.erase(j); } - sess->maybe_step(); } } } @@ -2414,12 +2613,6 @@ % i->first % i->second->peer_id); dead_clients.insert(i->first); } - if (i->second->finished_exchange_ok()) - { - P(F("fd %d (peer %s) exchanged all items and flushed output, disconnecting\n") - % i->first % i->second->peer_id); - dead_clients.insert(i->first); - } } for (set::const_iterator i = dead_clients.begin(); i != dead_clients.end(); ++i) @@ -2698,6 +2891,7 @@ app.db.get_key(*key, pub_encoded); hexenc keyhash; key_hash_code(*key, pub_encoded, keyhash); + P(F("noting key '%s' = '%s' to send\n") % *key % keyhash); id key_item; decode_hexenc(keyhash, key_item); key_refiner.note_local_item(key_item); ============================================================ --- packet.cc d08e5a64d015377161236974a8cac24d6469310d +++ packet.cc f39c8a694ef3c87a5d262c712fb567c2cbaa54e3 @@ -773,11 +773,12 @@ transaction_guard guard(pimpl->app.db); if (! pimpl->take_keys) { - W(F("skipping prohibited public key %s\n") % ident); + P(F("skipping prohibited public key %s\n") % ident); return; } if (! pimpl->app.db.public_key_exists(ident)) { + P(F("puttingskipping public key %s\n") % ident); pimpl->app.db.put_key(ident, k); if(on_pubkey_written) on_pubkey_written(ident); } @@ -787,7 +788,7 @@ pimpl->app.db.get_key(ident, tmp); if (!keys_match(ident, tmp, ident, k)) W(F("key '%s' is not equal to key '%s' in database\n") % ident % ident); - L(F("skipping existing public key %s\n") % ident); + P(F("skipping existing public key %s\n") % ident); } ++(pimpl->count); guard.commit(); @@ -845,7 +846,7 @@ void packet_db_valve::open_valve() { - L(F("packet valve opened\n")); + P(F("packet valve opened\n")); pimpl->valve_is_open = true; int written = 0; for (std::vector< boost::shared_ptr >::reverse_iterator @@ -857,7 +858,7 @@ ++written; } pimpl->packets.clear(); - L(F("wrote %i queued packets\n") % written); + P(F("wrote %i queued packets\n") % written); } #define DOIT(x) pimpl->do_packet(boost::shared_ptr(new x)); ============================================================ --- refiner.cc 3fe6d5d6fc1573591fe2d8b080920a6d55747f7d +++ refiner.cc 17fef57cd7411873901a8a58a69eef603b417dd4 @@ -117,6 +117,9 @@ void refiner::calculate_items_to_send_and_receive() { + if (calculated_items_to_send_and_receive) + return; + items_to_send.clear(); items_to_receive.clear(); @@ -134,6 +137,7 @@ % getpid() % items_to_send.size() % typestr); P(F("pid %d determined %d %s items to receive\n") % getpid() % items_to_receive.size() % typestr); + calculated_items_to_send_and_receive = true; } @@ -155,12 +159,14 @@ prefix pref; our_subtree.get_raw_prefix(pref); collect_items_in_subtree(table, pref, our_subtree.level, peer_items); + exchanged_data_since_last_done_cmd = true; } void refiner::note_subtree_shared_with_peer(prefix const & pref, size_t lev) { collect_items_in_subtree(table, pref, lev, peer_items); + exchanged_data_since_last_done_cmd = true; } @@ -206,7 +212,8 @@ refiner::refiner(netcmd_item_type type, refiner_callbacks & cb) : type(type), cb(cb), exchanged_data_since_last_done_cmd(false), - finished_refinement(false) + finished_refinement(0), + calculated_items_to_send_and_receive(false) { merkle_ptr root = merkle_ptr(new merkle_node()); root->type = type; @@ -217,6 +224,7 @@ refiner::note_item_in_peer(id const & item) { peer_items.insert(item); + exchanged_data_since_last_done_cmd = true; } @@ -247,6 +255,7 @@ "(in node '%s', level %d)\n") % typestr % hslotval % slot % hpref % lev); } + exchanged_data_since_last_done_cmd = true; } @@ -273,7 +282,7 @@ || level >= 0xff) { // Echo 'done' if we're shutting down - if (!finished_refinement) + if (finished_refinement < 3) { P(F("pid %d processing 'done' command => echoing shut down of %s refinement\n") % getpid() % typestr); @@ -283,13 +292,13 @@ % getpid() % typestr); // Mark ourselves shut down - finished_refinement = true; + finished_refinement++; // And prepare for queries from our host calculate_items_to_send_and_receive(); } else if (exchanged_data_since_last_done_cmd - && !finished_refinement) + && finished_refinement < 2) { // Echo 'done', we're still active. P(F("pid %d processing 'done' command => continuing to %s level %d\n") @@ -304,7 +313,11 @@ bool refiner::done() const { - return finished_refinement; + string typestr; + netcmd_item_type_to_string(type, typestr); + + P(F("%s refiner %s done\n") % typestr % (finished_refinement >= 2 ? "is" : "is not")); + return finished_refinement >= 2; } @@ -428,6 +441,7 @@ } } } + exchanged_data_since_last_done_cmd = true; } ============================================================ --- refiner.hh 1374f8d2f65822995c016ecff3bec86dc06bf1d5 +++ refiner.hh cdb2d13b9054ea56707690d422ee4349774df28c @@ -51,7 +51,8 @@ netcmd_item_type type; refiner_callbacks & cb; bool exchanged_data_since_last_done_cmd; - bool finished_refinement; + size_t finished_refinement; + bool calculated_items_to_send_and_receive; std::set local_items; std::set peer_items; ============================================================ --- tests/t_netsync_nocerts.at 5b1e9d87f055a58f2c7d3ee0cfaed2e185469955 +++ tests/t_netsync_nocerts.at a46961c621de2d5ffe721d3cbc7b6561b7964a1b @@ -1,10 +1,7 @@ AT_SETUP([(normal) netsync revision with no certs]) MONOTONE_SETUP NETSYNC_SETUP -# This test is a bug report. -AT_XFAIL_IF(true) - ADD_FILE(testfile, [blah stuff ]) COMMIT(testbranch)