# # # add_file "network/session.cc" # content [a462c9e3e503442b765ca70488c1c1736215b931] # # add_file "network/session.hh" # content [4c73ddc1f08b1609a8150e6831664bb52efca02f] # # add_file "network/wrapped_session.cc" # content [4441e6888f140109a33da720a6424b460d51e95d] # # add_file "network/wrapped_session.hh" # content [a280328f066580ee9cd81d5d2a194d6bc7e691da] # # patch "Makefile.am" # from [c6a102401efb2ea9778faf7a908371f682f4ace1] # to [aa109db4ce20ea27d2ae26d5e290132ec90a6d71] # # patch "enumerator.cc" # from [89890d88770c8a1cffbfc36fe1f4b8471fc06477] # to [fa0b2faee2460ab510375b0b25badc148b658f56] # # patch "enumerator.hh" # from [3c5c2ee377c33482e5442e9d59e8fdeb6facf182] # to [5e52b8fae270dcabc4ada249327adcd496521a73] # # patch "netcmd.cc" # from [b8a5d8e0ca337c099e75f2e4957400b29077ae01] # to [2863e7b2aed2d4866ad80cf92f2e08266c3d94eb] # # patch "netcmd.hh" # from [e6e9409fb9dae822a261493efd5084e544618db7] # to [224e57110c5007f2b551d59cf1d108fa26abc37a] # # patch "netsync.cc" # from [1a2949a97db64c97c5a0a92768ae0e6d9c676013] # to [ed3a26c1a06ca9dd90261cbec4ba82bb57c79ef0] # # patch "network/automate_listener.cc" # from [e3cc9c6951bd37f909ca6d1000475595d3f5c545] # to [b5556012c57bf1f2b22bf55a3042d1ade1ce133e] # # patch "network/automate_session.cc" # from [ebc7328238ef4bf127528a736a0ec9bf46efa340] # to [5e6e88a11cdcf49ee651351c5d7284755c953f54] # # patch "network/automate_session.hh" # from [c37e12bc01cccc8c1f91b1d7c0e51dcd54aff231] # to [b9eb6d161bfb175f616ef8fd4cdc86b33778e077] # # patch "network/netsync_listener.cc" # from [8dab9290b6c5fca7ecde13045a219bb971af9fb0] # to [3f5307dd5310d07c89da689808978710c101858d] # # patch "network/netsync_session.cc" # from [53d10a1f77135a9f4550aac3dc78df6f4123d9d6] # to [d29ead19e030ce5d2f3d9072f4302ad691b0e967] # # patch "network/netsync_session.hh" # from [c4c43a8ff350ed599508742258a53b9c52a765ff] # to [e848afaf9518a827735efc982ebe5ca6f3ed1d57] # # patch "network/session_base.cc" # from [157c8a8f35cea36433729d2876f0a4bfb4dd5937] # to [66e0f6b594d20aab21677ad43045d695ae83ae57] # # patch "network/session_base.hh" # from [0ffdf1741c214d0144014df839a534dc06a4b23e] # to [300d67536599a4a9b92927c54ad7def5415301ac] # ============================================================ --- network/session.cc a462c9e3e503442b765ca70488c1c1736215b931 +++ network/session.cc a462c9e3e503442b765ca70488c1c1736215b931 @@ -0,0 +1,669 @@ +// Copyright (C) 2009 Timothy Brownawell +// +// This program is made available under the GNU GPL version 2.0 or +// greater. See the accompanying file COPYING for details. +// +// This program is distributed WITHOUT ANY WARRANTY; without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. + +#include "base.hh" +#include "network/session.hh" + +#include "key_store.hh" +#include "database.hh" +#include "keys.hh" +#include "lua_hooks.hh" +#include "network/netsync_session.hh" +#include "options.hh" +#include "project.hh" +#include "vocab_cast.hh" + +using std::string; + +using boost::lexical_cast; +using boost::shared_ptr; + + +static const var_domain known_servers_domain = var_domain("known-servers"); + +size_t session::session_num = 0; + +session::session(options & opts, lua_hooks & lua, project_t & project, + key_store & keys, + protocol_voice voice, + std::string const & peer, + shared_ptr sock, + bool use_transport_auth) : + session_base(voice, peer, sock), + version(0), + max_version(opts.max_netsync_version), + min_version(opts.min_netsync_version), + use_transport_auth(use_transport_auth), + signing_key(keys.signing_key), + cmd_in(0), + armed(false), + received_remote_key(false), + session_key(constants::netsync_key_initializer), + read_hmac(netsync_session_key(constants::netsync_key_initializer), + use_transport_auth), + write_hmac(netsync_session_key(constants::netsync_key_initializer), + use_transport_auth), + authenticated(false), + completed_hello(false), + error_code(0), + session_id(++session_num), + opts(opts), + lua(lua), + project(project), + keys(keys), + peer(peer) +{ +} + +session::~session() +{ + if (wrapped) + wrapped->on_end(session_id); +} + +void session::set_inner(shared_ptr wrapped) +{ + this->wrapped = wrapped; +} + +id +session::mk_nonce() +{ + I(this->saved_nonce().empty()); + char buf[constants::merkle_hash_length_in_bytes]; + +#if BOTAN_VERSION_CODE >= BOTAN_VERSION_CODE_FOR(1,7,7) + keys.get_rng().randomize(reinterpret_cast(buf), + constants::merkle_hash_length_in_bytes); +#else + Botan::Global_RNG::randomize(reinterpret_cast(buf), + constants::merkle_hash_length_in_bytes); +#endif + this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes), + origin::internal); + I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes); + return this->saved_nonce; +} + +void +session::set_session_key(string const & key) +{ + session_key = netsync_session_key(key, origin::internal); + read_hmac.set_key(session_key); + write_hmac.set_key(session_key); +} + +void +session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted) +{ + if (use_transport_auth) + { + string hmac_key; + keys.decrypt_rsa(signing_key, hmac_key_encrypted, hmac_key); + set_session_key(hmac_key); + } +} + +bool session::arm() +{ + if (!armed) + { + // Don't pack the buffer unnecessarily. + if (output_overfull()) + return false; + + if (cmd_in.read(min_version, max_version, inbuf, read_hmac)) + { + armed = true; + } + } + return armed; +} + +void session::begin_service() +{ + netcmd cmd(0); + cmd.write_usher_cmd(utf8("", origin::internal)); + write_netcmd(cmd); +} + +bool session::do_work(transaction_guard & guard) +{ + bool armed = arm(); + bool is_goodbye = armed && cmd_in.get_cmd_code() == bye_cmd; + bool is_error = armed && cmd_in.get_cmd_code() == error_cmd; + if (completed_hello && !is_goodbye && !is_error) + { + try + { + if (encountered_error) + return true; + else + { + bool ok = wrapped->do_work(guard, armed ? &cmd_in : 0); + if (ok) + { + if (voice == client_voice + && protocol_state == working_state + && wrapped->finished_working()) + { + protocol_state = shutdown_state; + guard.do_checkpoint(); + queue_bye_cmd(0); + } + } + return ok; + } + } + catch (netsync_error & err) + { + W(F("error: %s") % err.msg); + string const errmsg(lexical_cast(error_code) + " " + err.msg); + L(FL("queueing 'error' command")); + netcmd cmd(get_version()); + cmd.write_error_cmd(errmsg); + write_netcmd(cmd); + encountered_error = true; + return true; // Don't terminate until we've send the error_cmd. + } + } + else + { + if (!armed) + return true; + switch (cmd_in.get_cmd_code()) + { + case usher_cmd: + { + utf8 msg; + cmd_in.read_usher_cmd(msg); + if (msg().size()) + { + if (msg()[0] == '!') + P(F("Received warning from usher: %s") % msg().substr(1)); + else + L(FL("Received greeting from usher: %s") % msg().substr(1)); + } + netcmd cmdout(version); + cmdout.write_usher_reply_cmd(utf8(peer_id, origin::internal), + wrapped->usher_reply_data()); + write_netcmd(cmdout); + L(FL("Sent reply.")); + return true; + } + case usher_reply_cmd: + { + u8 client_version; + utf8 server; + string pattern; + cmd_in.read_usher_reply_cmd(client_version, server, pattern); + + // netcmd::read() has already checked that the client isn't too old + if (client_version < max_version) + { + version = client_version; + } + L(FL("client has maximum version %d, using %d") + % widen(client_version) % widen(version)); + netcmd cmd(version); + + key_name name; + keypair kp; + keys.get_key_pair(signing_key, name, kp); + if (use_transport_auth) + { + cmd.write_hello_cmd(name, kp.pub, mk_nonce()); + } + else + { + cmd.write_hello_cmd(name, rsa_pub_key(), mk_nonce()); + } + write_netcmd(cmd); + return true; + } + case hello_cmd: + { // need to ask wrapped what to reply with (we're a client) + u8 server_version; + key_name their_keyname; + rsa_pub_key their_key; + id nonce; + cmd_in.read_hello_cmd(server_version, their_keyname, + their_key, nonce); + hello_nonce = nonce; + + I(!received_remote_key); + I(saved_nonce().empty()); + + // version sanity has already been checked by netcmd::read() + L(FL("received hello command; setting version from %d to %d") + % widen(get_version()) + % widen(server_version)); + version = server_version; + + if (use_transport_auth) + { + key_id remote_key; + key_hash_code(their_keyname, their_key, remote_peer_key_id); + + var_value printable_key_hash; + { + hexenc encoded_key_hash; + encode_hexenc(remote_key.inner(), encoded_key_hash); + printable_key_hash = typecast_vocab(encoded_key_hash); + } + L(FL("server key has name %s, hash %s") + % their_keyname % printable_key_hash); + var_key their_key_key(known_servers_domain, + var_name(get_peer(), origin::internal)); + if (project.db.var_exists(their_key_key)) + { + var_value expected_key_hash; + project.db.get_var(their_key_key, expected_key_hash); + if (expected_key_hash != printable_key_hash) + { + P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n" + "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n" + "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n" + "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n" + "it is also possible that the server key has just been changed\n" + "remote host sent key %s\n" + "I expected %s\n" + "'%s unset %s %s' overrides this check") + % printable_key_hash + % expected_key_hash + % prog_name % their_key_key.first % their_key_key.second); + E(false, origin::network, F("server key changed")); + } + } + else + { + P(F("first time connecting to server %s\n" + "I'll assume it's really them, but you might want to double-check\n" + "their key's fingerprint: %s") + % get_peer() + % printable_key_hash); + project.db.set_var(their_key_key, printable_key_hash); + } + + if (!project.db.public_key_exists(remote_peer_key_id)) + { + // this should now always return true since we just checked + // for the existence of this particular key + I(project.db.put_key(their_keyname, their_key)); + W(F("saving public key for %s to database") % their_keyname); + } + { + hexenc hnonce; + encode_hexenc(nonce, hnonce); + L(FL("received 'hello' netcmd from server '%s' with nonce '%s'") + % printable_key_hash % hnonce); + } + + I(project.db.public_key_exists(remote_peer_key_id)); + + // save their identity + received_remote_key = true; + } + + wrapped->request_service(); + + } + return true; + + case anonymous_cmd: + case auth_cmd: + case automate_cmd: + return handle_service_request(); + + case confirm_cmd: + { + authenticated = true; // maybe? + completed_hello = true; + wrapped->accept_service(); + } + return true; + + case bye_cmd: + { + u8 phase; + cmd_in.read_bye_cmd(phase); + return process_bye_cmd(phase, guard); + } + case error_cmd: + { + string errmsg; + cmd_in.read_error_cmd(errmsg); + + // "xxx string" with xxx being digits means there's an error code + if (errmsg.size() > 4 && errmsg.substr(3,1) == " ") + { + try + { + int err = boost::lexical_cast(errmsg.substr(0,3)); + if (err >= 100) + { + error_code = err; + throw bad_decode(F("received network error: %s") + % errmsg.substr(4)); + } + } + catch (boost::bad_lexical_cast) + { // ok, so it wasn't a number + } + } + throw bad_decode(F("received network error: %s") % errmsg); + } + default: + // ERROR + return false; + } + } +} + +void +session::request_netsync(protocol_role role, + globish const & our_include_pattern, + globish const & our_exclude_pattern) +{ + id nonce2(mk_nonce()); + netcmd request(version); + rsa_oaep_sha_data hmac_key_encrypted; + if (use_transport_auth) + project.db.encrypt_rsa(remote_peer_key_id, nonce2(), hmac_key_encrypted); + + if (use_transport_auth && signing_key.inner()() != "") + { + // get our key pair + load_key_pair(keys, signing_key); + + // make a signature with it; + // this also ensures our public key is in the database + rsa_sha1_signature sig; + keys.make_signature(project.db, signing_key, hello_nonce(), sig); + + request.write_auth_cmd(role, our_include_pattern, our_exclude_pattern, + signing_key, hello_nonce, + hmac_key_encrypted, sig); + } + else + { + request.write_anonymous_cmd(role, our_include_pattern, our_exclude_pattern, + hmac_key_encrypted); + } + write_netcmd(request); + set_session_key(nonce2()); + + key_identity_info remote_key; + remote_key.id = remote_peer_key_id; + if (remote_key.id.inner()().empty()) + project.complete_key_identity(keys, lua, remote_key); + + wrapped->on_begin(session_id, remote_key); +} + +void +session::request_automate() +{ + // TODO +} + +void +session::queue_bye_cmd(u8 phase) +{ + L(FL("queueing 'bye' command, phase %d") + % static_cast(phase)); + netcmd cmd(get_version()); + cmd.write_bye_cmd(phase); + write_netcmd(cmd); +} + +bool +session::process_bye_cmd(u8 phase, + transaction_guard & guard) +{ + +// Ideal shutdown +// ~~~~~~~~~~~~~~~ +// +// I/O events state transitions +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~ +// client: C_WORKING +// server: S_WORKING +// 0. [refinement, data, deltas, etc.] +// client: C_SHUTDOWN +// (client checkpoints here) +// 1. client -> "bye 0" +// 2. "bye 0" -> server +// server: S_SHUTDOWN +// (server checkpoints here) +// 3. "bye 1" <- server +// 4. client <- "bye 1" +// client: C_CONFIRMED +// 5. client -> "bye 2" +// 6. "bye 2" -> server +// server: S_CONFIRMED +// 7. [server drops connection] +// +// +// Affects of I/O errors or disconnections +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// C_WORKING: report error and fault +// S_WORKING: report error and recover +// C_SHUTDOWN: report error and fault +// S_SHUTDOWN: report success and recover +// (and warn that client might falsely see error) +// C_CONFIRMED: report success +// S_CONFIRMED: report success + + switch (phase) + { + case 0: + if (voice == server_voice && + protocol_state == working_state) + { + protocol_state = shutdown_state; + guard.do_checkpoint(); + queue_bye_cmd(1); + } + else + error(error_codes::bad_command, + "unexpected bye phase 0 received"); + break; + + case 1: + if (voice == client_voice && + protocol_state == shutdown_state) + { + protocol_state = confirmed_state; + queue_bye_cmd(2); + } + else + error(error_codes::bad_command, "unexpected bye phase 1 received"); + break; + + case 2: + if (voice == server_voice && + protocol_state == shutdown_state) + { + protocol_state = confirmed_state; + return false; + } + else + error(error_codes::bad_command, "unexpected bye phase 2 received"); + break; + + default: + error(error_codes::bad_command, + (F("unknown bye phase %d received") % phase).str()); + } + + return true; +} + +static +protocol_role +corresponding_role(protocol_role their_role) +{ + switch (their_role) + { + case source_role: + return sink_role; + case source_and_sink_role: + return source_and_sink_role; + case sink_role: + return source_role; + } + I(false); +} + +bool session::handle_service_request() +{ + enum { is_netsync, is_automate } is_what; + bool auth; + + // netsync parameters + protocol_role role; + globish their_include; + globish their_exclude; + + // auth parameters + key_id client_id; + id nonce1; + rsa_sha1_signature sig; + + rsa_oaep_sha_data hmac_encrypted; + + + switch (cmd_in.get_cmd_code()) + { + case anonymous_cmd: + cmd_in.read_anonymous_cmd(role, their_include, their_exclude, + hmac_encrypted); + L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' " + "in %s mode\n") + % their_include % their_exclude + % (role == source_and_sink_role ? _("source and sink") : + (role == source_role ? _("source") : _("sink")))); + + is_what = is_netsync; + auth = false; + break; + case auth_cmd: + cmd_in.read_auth_cmd(role, their_include, their_exclude, + client_id, nonce1, hmac_encrypted, sig); + L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' " + "exclude '%s' in %s mode with nonce1 '%s'\n") + % client_id % their_include % their_exclude + % (role == source_and_sink_role ? _("source and sink") : + (role == source_role ? _("source") : _("sink"))) + % nonce1); + is_what = is_netsync; + auth = true; + break; + case automate_cmd: + cmd_in.read_automate_cmd(client_id, nonce1, hmac_encrypted, sig); + is_what = is_automate; + auth = true; + break; + default: + I(false); + } + set_session_key(hmac_encrypted); + + if (auth && !project.db.public_key_exists(client_id)) + { + key_name their_name; + keypair their_pair; + if (keys.maybe_get_key_pair(client_id, their_name, their_pair)) + { + project.db.put_key(their_name, their_pair.pub); + } + else + { + auth = false; + } + } + if (auth) + { + if (!(nonce1 == saved_nonce)) + { + error(error_codes::failed_identification, + F("detected replay attack in auth netcmd").str()); + } + if (project.db.check_signature(client_id, nonce1(), sig) != cert_ok) + { + error(error_codes::failed_identification, + F("bad client signature").str()); + } + authenticated = true; + remote_peer_key_id = client_id; + } + + switch (is_what) + { + case is_netsync: + wrapped.reset(new netsync_session(opts, + lua, + project, + keys, + corresponding_role(role), + globish(), + globish())); + wrapped->set_owner(this); + break; + case is_automate: + break; + } + + // queue_confirm_cmd(); + completed_hello = true; + return true; +} + +void session::write_netcmd(netcmd const & cmd) +{ + if (!encountered_error) + { + string buf; + cmd.write(buf, write_hmac); + queue_output(buf); + } + else + L(FL("dropping outgoing netcmd (because we're in error unwind mode)")); +} + +u8 session::get_version() const +{ + return version; +} + +protocol_voice session::get_voice() const +{ + return voice; +} + +string session::get_peer() const +{ + return peer; +} + +void +session::error(int errcode, string const & errmsg) +{ + error_code = errcode; + throw netsync_error(errmsg); +} + +// Local Variables: +// mode: C++ +// fill-column: 76 +// c-file-style: "gnu" +// indent-tabs-mode: nil +// End: +// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: ============================================================ --- network/session.hh 4c73ddc1f08b1609a8150e6831664bb52efca02f +++ network/session.hh 4c73ddc1f08b1609a8150e6831664bb52efca02f @@ -0,0 +1,108 @@ +// Copyright (C) 2009 Timothy Brownawell +// +// This program is made available under the GNU GPL version 2.0 or +// greater. See the accompanying file COPYING for details. +// +// This program is distributed WITHOUT ANY WARRANTY; without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. + +#ifndef __SESSION_HH__ +#define __SESSION_HH__ + +#include "network/session_base.hh" +#include "network/wrapped_session.hh" + +#include "netcmd.hh" +#include "vocab.hh" + +class key_store; +class lua_hooks; +class options; +class project_t; + +class session : public session_base +{ + u8 version; + u8 max_version; + u8 min_version; + + bool use_transport_auth; + key_id const & signing_key; + + netcmd cmd_in; + bool armed; + + bool received_remote_key; + key_id remote_peer_key_id; + netsync_session_key session_key; + chained_hmac read_hmac; + chained_hmac write_hmac; + bool authenticated; + + void set_session_key(std::string const & key); + void set_session_key(rsa_oaep_sha_data const & key_encrypted); + + id hello_nonce; + id saved_nonce; + id mk_nonce(); + + bool completed_hello; + + int error_code; + + size_t session_id; + static size_t session_num; + + options & opts; + lua_hooks & lua; + project_t & project; + key_store & keys; + std::string peer; + boost::shared_ptr wrapped; + + void queue_bye_cmd(u8 phase); + bool process_bye_cmd(u8 phase, transaction_guard & guard); + + bool handle_service_request(); +public: + session(options & opts, lua_hooks & lua, project_t & project, + key_store & keys, + protocol_voice voice, + std::string const & peer, + boost::shared_ptr sock, + bool use_transport_auth = true); + ~session(); + void set_inner(boost::shared_ptr wrapped); + + bool arm(); + bool do_work(transaction_guard & guard); + void begin_service(); + + void write_netcmd(netcmd const & cmd); + u8 get_version() const; + protocol_voice get_voice() const; + std::string get_peer() const; + + void request_netsync(protocol_role role, + globish const & our_include_pattern, + globish const & our_exclude_pattern); + void request_automate(); + + // This method triggers a special "error unwind" mode to netsync. In this + // mode, all received data is ignored, and no new data is queued. We simply + // stay connected long enough for the current write buffer to be flushed, to + // ensure that our peer receives the error message. + // Affects read_some, write_some, and process . + void error(int errcode, std::string const & message); +}; + +#endif + +// Local Variables: +// mode: C++ +// fill-column: 76 +// c-file-style: "gnu" +// indent-tabs-mode: nil +// End: +// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: ============================================================ --- network/wrapped_session.cc 4441e6888f140109a33da720a6424b460d51e95d +++ network/wrapped_session.cc 4441e6888f140109a33da720a6424b460d51e95d @@ -0,0 +1,95 @@ +// Copyright (C) 2009 Timothy Brownawell +// +// This program is made available under the GNU GPL version 2.0 or +// greater. See the accompanying file COPYING for details. +// +// This program is distributed WITHOUT ANY WARRANTY; without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. + +#include "base.hh" +#include "network/wrapped_session.hh" + +#include "network/session.hh" + +using std::string; + +wrapped_session::wrapped_session() : + owner(0) +{ } + +wrapped_session::wrapped_session(session * owner) : + owner(owner) +{ } + +void wrapped_session::set_owner(session * owner) +{ + I(!this->owner); + this->owner = owner; +} + +void wrapped_session::write_netcmd(netcmd const & cmd) const +{ + owner->write_netcmd(cmd); +} + +u8 wrapped_session::get_version() const +{ + return owner->get_version(); +} + +void wrapped_session::error(int errcode, string const & message) +{ + owner->error(errcode, message); +} + +protocol_voice wrapped_session::get_voice() const +{ + return owner->get_voice(); +} + +bool wrapped_session::encountered_error() const +{ + return owner->encountered_error; +} + +string wrapped_session::get_peer() const +{ + return owner->get_peer(); +} + +bool wrapped_session::output_overfull() const +{ + return owner->output_overfull(); +} + +bool wrapped_session::shutdown_confirmed() const +{ + return owner->protocol_state == session_base::confirmed_state; +} + +void wrapped_session::request_netsync(protocol_role role, + globish const & include, + globish const & exclude) +{ + owner->request_netsync(role, include, exclude); +} + +void wrapped_session::request_automate() +{ + owner->request_automate(); +} + +void wrapped_session::on_begin(size_t ident, key_identity_info const & remote_key) +{ } + +void wrapped_session::on_end(size_t ident) +{ } + +// Local Variables: +// mode: C++ +// fill-column: 76 +// c-file-style: "gnu" +// indent-tabs-mode: nil +// End: +// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: ============================================================ --- network/wrapped_session.hh a280328f066580ee9cd81d5d2a194d6bc7e691da +++ network/wrapped_session.hh a280328f066580ee9cd81d5d2a194d6bc7e691da @@ -0,0 +1,72 @@ +// Copyright (C) 2009 Timothy Brownawell +// +// This program is made available under the GNU GPL version 2.0 or +// greater. See the accompanying file COPYING for details. +// +// This program is distributed WITHOUT ANY WARRANTY; without even the +// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +// PURPOSE. + +#ifndef __WRAPPED_SESSION_HH__ +#define __WRAPPED_SESSION_HH__ + +#include "netcmd.hh" // for protocol_voice +#include "numeric_vocab.hh" + +struct key_identity_info; +class netcmd; +class session; +class transaction_guard; + +struct netsync_error +{ + std::string msg; + netsync_error(std::string const & s): msg(s) {} +}; + + +class wrapped_session +{ + session * owner; +protected: + void write_netcmd(netcmd const & cmd) const; + u8 get_version() const; + void error(int errcode, std::string const & message); + protocol_voice get_voice() const; + std::string get_peer() const; + bool output_overfull() const; + bool encountered_error() const; + bool shutdown_confirmed() const; + + void request_netsync(protocol_role role, + globish const & include, + globish const & exclude); + void request_automate(); +public: + wrapped_session(); + explicit wrapped_session(session * owner); + void set_owner(session * owner); + + virtual bool do_work(transaction_guard & guard, + netcmd const * const in_cmd) = 0; + // Can I do anything without waiting for more input? + virtual bool have_work() const = 0; + + virtual void request_service() = 0; + virtual void accept_service() = 0; + virtual std::string usher_reply_data() const = 0; + virtual bool finished_working() const = 0; + + virtual void on_begin(size_t ident, key_identity_info const & remote_key); + virtual void on_end(size_t ident); +}; + +#endif + +// Local Variables: +// mode: C++ +// fill-column: 76 +// c-file-style: "gnu" +// indent-tabs-mode: nil +// End: +// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: ============================================================ --- Makefile.am c6a102401efb2ea9778faf7a908371f682f4ace1 +++ Makefile.am aa109db4ce20ea27d2ae26d5e290132ec90a6d71 @@ -68,7 +68,9 @@ MOST_SOURCES = \ network/netsync_session.hh network/netsync_session.cc \ network/reactable.hh network/reactable.cc \ network/reactor.hh network/reactor.cc \ + network/session.hh network/session.cc \ network/session_base.hh network/session_base.cc \ + network/wrapped_session.hh network/wrapped_session.cc \ netxx_pipe.cc netxx_pipe.hh \ netcmd.cc netcmd.hh \ merkle_tree.cc merkle_tree.hh \ ============================================================ --- enumerator.cc 89890d88770c8a1cffbfc36fe1f4b8471fc06477 +++ enumerator.cc fa0b2faee2460ab510375b0b25badc148b658f56 @@ -76,7 +76,7 @@ bool } bool -revision_enumerator::done() +revision_enumerator::done() const { return revs.empty() && items.empty(); } ============================================================ --- enumerator.hh 3c5c2ee377c33482e5442e9d59e8fdeb6facf182 +++ enumerator.hh 5e52b8fae270dcabc4ada249327adcd496521a73 @@ -77,7 +77,7 @@ public: void note_cert(revision_id const & rid, id const & cert_hash); void step(); - bool done(); + bool done() const; }; #endif // __ENUMERATOR_H__ ============================================================ --- netcmd.cc b8a5d8e0ca337c099e75f2e4957400b29077ae01 +++ netcmd.cc 2863e7b2aed2d4866ad80cf92f2e08266c3d94eb @@ -53,7 +53,7 @@ netcmd::netcmd(u8 ver) cmd_code(error_cmd) {} -size_t netcmd::encoded_size() +size_t netcmd::encoded_size() const { string tmp; insert_datum_uleb128(payload.size(), tmp); @@ -719,25 +719,24 @@ netcmd::read_usher_reply_cmd(u8 & versio void netcmd::read_usher_reply_cmd(u8 & version_out, - utf8 & server, globish & pattern) const + utf8 & server, string & pattern) const { version_out = this->version; string str; size_t pos = 0; extract_variable_length_string(payload, str, pos, "usher_reply netcmd, server"); server = utf8(str, origin::network); - extract_variable_length_string(payload, str, pos, "usher_reply netcmd, pattern"); - pattern = globish(str, origin::network); + extract_variable_length_string(payload, pattern, pos, "usher_reply netcmd, pattern"); assert_end_of_buffer(payload, pos, "usher_reply netcmd payload"); } void -netcmd::write_usher_reply_cmd(utf8 const & server, globish const & pattern) +netcmd::write_usher_reply_cmd(utf8 const & server, string const & pattern) { cmd_code = usher_reply_cmd; payload.clear(); insert_variable_length_string(server(), payload); - insert_variable_length_string(pattern(), payload); + insert_variable_length_string(pattern, payload); } ============================================================ --- netcmd.hh e6e9409fb9dae822a261493efd5084e544618db7 +++ netcmd.hh 224e57110c5007f2b551d59cf1d108fa26abc37a @@ -31,6 +31,22 @@ class app_state; class app_state; +namespace error_codes { + static const int no_error = 200; + static const int partial_transfer = 211; + static const int no_transfer = 212; + + static const int not_permitted = 412; + static const int unknown_key = 422; + static const int mixing_versions = 432; + + static const int role_mismatch = 512; + static const int bad_command = 521; + + static const int failed_identification = 532; + //static const int bad_data = 541; +} + typedef enum { server_voice, @@ -95,10 +111,10 @@ public: netcmd_code cmd_code; std::string payload; public: - netcmd(u8 ver); + explicit netcmd(u8 ver); netcmd_code get_cmd_code() const {return cmd_code;} u8 get_version() const { return version; } - size_t encoded_size(); + size_t encoded_size() const; bool operator==(netcmd const & other) const; @@ -206,8 +222,8 @@ public: void read_usher_cmd(utf8 & greeting) const; void write_usher_cmd(utf8 const & greeting); - void read_usher_reply_cmd(u8 & version, utf8 & server, globish & pattern) const; - void write_usher_reply_cmd(utf8 const & server, globish const & pattern); + void read_usher_reply_cmd(u8 & version, utf8 & server, std::string & pattern) const; + void write_usher_reply_cmd(utf8 const & server, std::string const & pattern); }; ============================================================ --- netsync.cc 1a2949a97db64c97c5a0a92768ae0e6d9c676013 +++ netsync.cc ed3a26c1a06ca9dd90261cbec4ba82bb57c79ef0 @@ -13,6 +13,7 @@ #include #include "netxx/sockopt.h" +#include "netxx/stream.h" #include "app_state.hh" #include "database.hh" @@ -21,6 +22,7 @@ #include "network/netsync_listener.hh" #include "network/netsync_session.hh" #include "network/reactor.hh" +#include "network/session.hh" #include "options.hh" #include "platform.hh" #include "project.hh" @@ -117,11 +119,14 @@ call_server(options & opts, Netxx::SockOpt socket_options(server->get_socketfd(), false); socket_options.set_non_blocking(); - shared_ptr sess(new netsync_session(opts, lua, project, keys, - role, client_voice, - info.client.include_pattern, - info.client.exclude_pattern, - info.client.unparsed(), server)); + shared_ptr sess(new session(opts, lua, project, keys, + client_voice, + info.client.unparsed(), server)); + shared_ptr wrapped(new netsync_session(opts, lua, project, + keys, role, + info.client.include_pattern, + info.client.exclude_pattern)); + sess->set_inner(wrapped); reactor react; react.add(sess, guard); @@ -141,7 +146,7 @@ call_server(options & opts, // exception). We call these cases E() errors. E(false, origin::network, F("processing failure while talking to peer %s, disconnecting") - % sess->peer_id); + % sess->get_peer()); return; } @@ -149,7 +154,7 @@ call_server(options & opts, E(io_ok, origin::network, F("timed out waiting for I/O with peer %s, disconnecting") - % sess->peer_id); + % sess->get_peer()); if (react.size() == 0) { @@ -160,27 +165,27 @@ call_server(options & opts, // user-reported error or a clean disconnect. See protocol // state diagram in netsync_session::process_bye_cmd. - if (sess->protocol_state == netsync_session::confirmed_state) + if (sess->protocol_state == session_base::confirmed_state) { P(F("successful exchange with %s") - % sess->peer_id); + % sess->get_peer()); return; } else if (sess->encountered_error) { P(F("peer %s disconnected after we informed them of error") - % sess->peer_id); + % sess->get_peer()); return; } else E(false, origin::network, (F("I/O failure while talking to peer %s, disconnecting") - % sess->peer_id)); + % sess->get_peer())); } } } -static shared_ptr +static shared_ptr session_from_server_sync_item(options & opts, lua_hooks & lua, project_t & project, @@ -216,20 +221,23 @@ session_from_server_sync_item(options & else if (request.what == "pull") role = sink_role; - shared_ptr sess(new netsync_session(opts, lua, - project, keys, - role, client_voice, - info.client.include_pattern, - info.client.exclude_pattern, - info.client.unparsed(), - server, true)); - + shared_ptr + sess(new session(opts, lua, project, keys, + client_voice, + info.client.unparsed(), server)); + shared_ptr + wrapped(new netsync_session(opts, lua, project, + keys, role, + info.client.include_pattern, + info.client.exclude_pattern, + true)); + sess->set_inner(wrapped); return sess; } catch (Netxx::NetworkException & e) { P(F("Network error: %s") % e.what()); - return shared_ptr(); + return shared_ptr(); } } @@ -278,14 +286,14 @@ serve_connections(app_state & app, server_initiated_sync_request request = server_initiated_sync_requests.front(); server_initiated_sync_requests.pop_front(); - shared_ptr sess + shared_ptr sess = session_from_server_sync_item(opts, lua, project, keys, request); if (sess) { react.add(sess, *guard); - L(FL("Opened connection to %s") % sess->peer_id); + L(FL("Opened connection to %s") % sess->get_peer()); } } @@ -304,10 +312,10 @@ serve_single_connection(project_t & proj static void serve_single_connection(project_t & project, - shared_ptr sess) + shared_ptr sess) { sess->begin_service(); - P(F("beginning service on %s") % sess->peer_id); + P(F("beginning service on %s") % sess->get_peer()); transaction_guard guard(project.db); @@ -356,11 +364,11 @@ run_netsync_protocol(app_state & app, if (opts.bind_stdio) { shared_ptr str(new Netxx::PipeStream(0,1)); - shared_ptr sess(new netsync_session(opts, lua, project, keys, - role, server_voice, - globish("*", origin::internal), - globish("", origin::internal), - "stdio", str)); + + shared_ptr + sess(new session(opts, lua, project, keys, + server_voice, + "stdio", str)); serve_single_connection(project, sess); } else ============================================================ --- network/automate_listener.cc e3cc9c6951bd37f909ca6d1000475595d3f5c545 +++ network/automate_listener.cc b5556012c57bf1f2b22bf55a3042d1ade1ce133e @@ -61,6 +61,7 @@ bool automate_listener::do_io(Netxx::Pro (new Netxx::Stream(client.get_socketfd(), timeout)); shared_ptr sess(new automate_session(app, + server_voice, lexical_cast(client), str)); I(guard); ============================================================ --- network/automate_session.cc ebc7328238ef4bf127528a736a0ec9bf46efa340 +++ network/automate_session.cc 5e6e88a11cdcf49ee651351c5d7284755c953f54 @@ -133,9 +133,10 @@ automate_session::automate_session(app_s } automate_session::automate_session(app_state & app, + protocol_voice voice, string const & peer_id, shared_ptr str) : - session_base(peer_id, str), + session_base(voice, peer_id, str), app(app), armed(false), os(oss, app.opts.automate_stdio_size) { } ============================================================ --- network/automate_session.hh c37e12bc01cccc8c1f91b1d7c0e51dcd54aff231 +++ network/automate_session.hh b9eb6d161bfb175f616ef8fd4cdc86b33778e077 @@ -37,6 +37,7 @@ public: automate_ostream os; public: automate_session(app_state & app, + protocol_voice voice, std::string const & peer_id, boost::shared_ptr str); bool arm(); ============================================================ --- network/netsync_listener.cc 8dab9290b6c5fca7ecde13045a219bb971af9fb0 +++ network/netsync_listener.cc 3f5307dd5310d07c89da689808978710c101858d @@ -19,6 +19,7 @@ #include "network/make_server.hh" #include "network/netsync_session.hh" #include "network/reactor.hh" +#include "network/session.hh" using std::list; using std::string; @@ -68,14 +69,13 @@ listener::do_io(Netxx::Probe::ready_type socket_options.set_non_blocking(); shared_ptr str = - shared_ptr - (new Netxx::Stream(client.get_socketfd(), timeout)); + shared_ptr(new Netxx::Stream(client.get_socketfd(), + timeout)); - shared_ptr sess(new netsync_session(opts, lua, project, keys, - role, server_voice, - globish("*", origin::internal), - globish("", origin::internal), - lexical_cast(client), str)); + shared_ptr sess(new session(opts, lua, project, keys, + server_voice, + lexical_cast(client), + str)); sess->begin_service(); I(guard); react.add(sess, *guard); ============================================================ --- network/netsync_session.cc 53d10a1f77135a9f4550aac3dc78df6f4123d9d6 +++ network/netsync_session.cc d29ead19e030ce5d2f3d9072f4302ad691b0e967 @@ -38,12 +38,6 @@ using boost::shared_ptr; -struct netsync_error -{ - string msg; - netsync_error(string const & s): msg(s) {} -}; - static inline void require(bool check, string const & context) { @@ -73,26 +67,15 @@ write_pubkey(key_name const & id, insert_variable_length_string(pub(), out); } - -size_t netsync_session::session_count = 0; - netsync_session::netsync_session(options & opts, lua_hooks & lua, project_t & project, key_store & keys, protocol_role role, - protocol_voice voice, globish const & our_include_pattern, globish const & our_exclude_pattern, - string const & peer, - shared_ptr sock, bool initiated_by_server) : - session_base(peer, sock), - version(opts.max_netsync_version), - max_version(opts.max_netsync_version), - min_version(opts.min_netsync_version), role(role), - voice(voice), our_include_pattern(our_include_pattern), our_exclude_pattern(our_exclude_pattern), our_matcher(our_include_pattern, our_exclude_pattern), @@ -101,8 +84,6 @@ netsync_session::netsync_session(options lua(lua), use_transport_auth(opts.use_transport_auth), signing_key(keys.signing_key), - cmd_in(opts.max_netsync_version), - armed(false), received_remote_key(false), session_key(constants::netsync_key_initializer), read_hmac(netsync_session_key(constants::netsync_key_initializer), @@ -120,14 +101,12 @@ netsync_session::netsync_session(options certs_in(0), certs_out(0), revs_in(0), revs_out(0), keys_in(0), keys_out(0), - session_id(++session_count), saved_nonce(""), - error_code(no_transfer), set_totals(false), - epoch_refiner(epoch_item, voice, *this), - key_refiner(key_item, voice, *this), - cert_refiner(cert_item, voice, *this), - rev_refiner(revision_item, voice, *this), + epoch_refiner(epoch_item, get_voice(), *this), + key_refiner(key_item, get_voice(), *this), + cert_refiner(cert_item, get_voice(), *this), + rev_refiner(revision_item, get_voice(), *this), rev_enumerator(project, *this), initiated_by_server(initiated_by_server) { @@ -141,14 +120,24 @@ netsync_session::~netsync_session() } netsync_session::~netsync_session() +{ } + +void netsync_session::on_begin(size_t ident, key_identity_info const & remote_key) { - if (protocol_state == confirmed_state) - error_code = no_error; - else if (error_code == no_transfer && + lua.hook_note_netsync_start(ident, "client", role, + get_peer(), remote_key, + our_include_pattern, our_exclude_pattern); +} + +void netsync_session::on_end(size_t ident) +{ + if (shutdown_confirmed()) + error_code = error_codes::no_error; + else if (error_code == error_codes::no_transfer && (revs_in || revs_out || certs_in || certs_out || keys_in || keys_out)) - error_code = partial_transfer; + error_code = error_codes::partial_transfer; vector unattached_written_certs; map > rev_written_certs; @@ -178,7 +167,7 @@ netsync_session::~netsync_session() key_identity_info identity; identity.id = *i; project.complete_key_identity(keys, lua, identity); - lua.hook_note_netsync_pubkey_received(identity, session_id); + lua.hook_note_netsync_pubkey_received(identity, ident); } //Revisions @@ -199,7 +188,7 @@ netsync_session::~netsync_session() revision_data rdat; project.db.get_revision(*i, rdat); lua.hook_note_netsync_revision_received(*i, rdat, certs, - session_id); + ident); } //Certs (not attached to a new revision) @@ -210,7 +199,7 @@ netsync_session::~netsync_session() identity.id = i->key; project.complete_key_identity(keys, lua, identity); lua.hook_note_netsync_cert_received(revision_id(i->ident), identity, - i->name, i->value, session_id); + i->name, i->value, ident); } } @@ -242,7 +231,7 @@ netsync_session::~netsync_session() key_identity_info identity; identity.id = *i; project.complete_key_identity(keys, lua, identity); - lua.hook_note_netsync_pubkey_sent(identity, session_id); + lua.hook_note_netsync_pubkey_sent(identity, ident); } //Revisions @@ -263,7 +252,7 @@ netsync_session::~netsync_session() revision_data rdat; project.db.get_revision(*i, rdat); lua.hook_note_netsync_revision_sent(*i, rdat, certs, - session_id); + ident); } //Certs (not attached to a new revision) @@ -274,17 +263,23 @@ netsync_session::~netsync_session() identity.id = i->key; project.complete_key_identity(keys, lua, identity); lua.hook_note_netsync_cert_sent(revision_id(i->ident), identity, - i->name, i->value, session_id); + i->name, i->value, ident); } } - lua.hook_note_netsync_end(session_id, error_code, + lua.hook_note_netsync_end(ident, error_code, bytes_in, bytes_out, certs_in, certs_out, revs_in, revs_out, keys_in, keys_out); } +string +netsync_session::usher_reply_data() const +{ + return our_include_pattern(); +} + bool netsync_session::process_this_rev(revision_id const & rev) { @@ -351,7 +346,7 @@ netsync_session::note_cert(id const & i) key_name keyname; rsa_pub_key junk; project.db.get_pubkey(c.key, keyname, junk); - if (version >= 7) + if (get_version() >= 7) { c.marshal_for_netio(keyname, str); } @@ -364,45 +359,8 @@ netsync_session::note_cert(id const & i) } -id -netsync_session::mk_nonce() -{ - I(this->saved_nonce().empty()); - char buf[constants::merkle_hash_length_in_bytes]; -#if BOTAN_VERSION_CODE >= BOTAN_VERSION_CODE_FOR(1,7,7) - keys.get_rng().randomize(reinterpret_cast(buf), - constants::merkle_hash_length_in_bytes); -#else - Botan::Global_RNG::randomize(reinterpret_cast(buf), - constants::merkle_hash_length_in_bytes); -#endif - this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes), - origin::internal); - I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes); - return this->saved_nonce; -} - void -netsync_session::set_session_key(string const & key) -{ - session_key = netsync_session_key(key, origin::internal); - read_hmac.set_key(session_key); - write_hmac.set_key(session_key); -} - -void -netsync_session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted) -{ - if (use_transport_auth) - { - string hmac_key; - keys.decrypt_rsa(signing_key, hmac_key_encrypted, hmac_key); - set_session_key(hmac_key); - } -} - -void netsync_session::setup_client_tickers() { // xgettext: please use short message and try to avoid multibytes chars @@ -434,7 +392,7 @@ bool } bool -netsync_session::done_all_refinements() +netsync_session::done_all_refinements() const { bool all = rev_refiner.done && cert_refiner.done @@ -443,7 +401,7 @@ netsync_session::done_all_refinements() if (all && !set_totals) { - L(FL("All refinements done for peer %s") % peer_id); + L(FL("All refinements done for peer %s") % get_peer()); if (cert_out_ticker.get()) cert_out_ticker->set_total(cert_refiner.items_to_send.size()); @@ -464,7 +422,7 @@ bool bool -netsync_session::received_all_items() +netsync_session::received_all_items() const { if (role == source_role) return true; @@ -476,7 +434,7 @@ bool } bool -netsync_session::finished_working() +netsync_session::finished_working() const { bool all = done_all_refinements() && received_all_items() @@ -486,7 +444,7 @@ bool } bool -netsync_session::queued_all_items() +netsync_session::queued_all_items() const { if (role == sink_role) return true; @@ -512,12 +470,12 @@ netsync_session::maybe_note_epochs_finis // If we ran into an error -- say a mismatched epoch -- don't do any // further refinements. - if (encountered_error) + if (encountered_error()) return; // But otherwise, we're ready to go. Start the next // set of refinements. - if (voice == client_voice) + if (get_voice() == client_voice) { L(FL("epoch refinement finished; beginning other refinements")); key_refiner.begin_refinement(); @@ -610,38 +568,14 @@ netsync_session::note_item_sent(netcmd_i } } -void -netsync_session::write_netcmd_and_try_flush(netcmd const & cmd) -{ - if (!encountered_error) - { - string buf; - cmd.write(buf, write_hmac); - queue_output(buf); - } - else - L(FL("dropping outgoing netcmd (because we're in error unwind mode)")); -} -// This method triggers a special "error unwind" mode to netsync. In this -// mode, all received data is ignored, and no new data is queued. We simply -// stay connected long enough for the current write buffer to be flushed, to -// ensure that our peer receives the error message. -// Affects read_some, write_some, and process . -void -netsync_session::error(int errcode, string const & errmsg) -{ - error_code = errcode; - throw netsync_error(errmsg); -} - bool -netsync_session::do_work(transaction_guard & guard) +netsync_session::do_work(transaction_guard & guard, + netcmd const * const cmd_in) { - if (process(guard)) + if (!cmd_in || process(guard, *cmd_in)) { maybe_step(); - maybe_say_goodbye(guard); return true; } else @@ -667,25 +601,6 @@ void // senders void -netsync_session::queue_error_cmd(string const & errmsg) -{ - L(FL("queueing 'error' command")); - netcmd cmd(version); - cmd.write_error_cmd(errmsg); - write_netcmd_and_try_flush(cmd); -} - -void -netsync_session::queue_bye_cmd(u8 phase) -{ - L(FL("queueing 'bye' command, phase %d") - % static_cast(phase)); - netcmd cmd(version); - cmd.write_bye_cmd(phase); - write_netcmd_and_try_flush(cmd); -} - -void netsync_session::queue_done_cmd(netcmd_item_type type, size_t n_items) { @@ -693,9 +608,9 @@ netsync_session::queue_done_cmd(netcmd_i netcmd_item_type_to_string(type, typestr); L(FL("queueing 'done' command for %s (%d items)") % typestr % n_items); - netcmd cmd(version); + netcmd cmd(get_version()); cmd.write_done_cmd(type, n_items); - write_netcmd_and_try_flush(cmd); + write_netcmd(cmd); } void @@ -703,55 +618,42 @@ netsync_session::queue_hello_cmd(key_nam rsa_pub_key const & pub, id const & nonce) { - netcmd cmd(version); + netcmd cmd(get_version()); if (use_transport_auth) cmd.write_hello_cmd(key_name, pub, nonce); else cmd.write_hello_cmd(key_name, rsa_pub_key(), nonce); - write_netcmd_and_try_flush(cmd); + write_netcmd(cmd); } void -netsync_session::queue_anonymous_cmd(protocol_role role, - globish const & include_pattern, - globish const & exclude_pattern, - id const & nonce2) +netsync_session::request_service() { - netcmd cmd(version); - rsa_oaep_sha_data hmac_key_encrypted; - if (use_transport_auth) - project.db.encrypt_rsa(remote_peer_key_id, nonce2(), hmac_key_encrypted); - cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern, - hmac_key_encrypted); - write_netcmd_and_try_flush(cmd); - set_session_key(nonce2()); -} -void -netsync_session::queue_auth_cmd(protocol_role role, - globish const & include_pattern, - globish const & exclude_pattern, - key_id const & client, - id const & nonce1, - id const & nonce2, - rsa_sha1_signature const & signature) -{ - netcmd cmd(version); - rsa_oaep_sha_data hmac_key_encrypted; - I(use_transport_auth); - project.db.encrypt_rsa(remote_peer_key_id, nonce2(), hmac_key_encrypted); - cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client, - nonce1, hmac_key_encrypted, signature); - write_netcmd_and_try_flush(cmd); - set_session_key(nonce2()); + // clients always include in the synchronization set, every branch that the + // user requested + set all_branches, ok_branches; + project.get_branch_list(all_branches); + for (set::const_iterator i = all_branches.begin(); + i != all_branches.end(); i++) + { + if (our_matcher((*i)())) + ok_branches.insert(*i); + } + rebuild_merkle_trees(ok_branches); + + if (!initiated_by_server) + setup_client_tickers(); + + request_netsync(role, our_include_pattern, our_exclude_pattern); } void netsync_session::queue_confirm_cmd() { - netcmd cmd(version); + netcmd cmd(get_version()); cmd.write_confirm_cmd(); - write_netcmd_and_try_flush(cmd); + write_netcmd(cmd); } void @@ -764,9 +666,9 @@ netsync_session::queue_refine_cmd(refine L(FL("queueing refinement %s of %s node '%s', level %d") % (ty == refinement_query ? "query" : "response") % typestr % hpref() % static_cast(node.level)); - netcmd cmd(version); + netcmd cmd(get_version()); cmd.write_refine_cmd(ty, node); - write_netcmd_and_try_flush(cmd); + write_netcmd(cmd); } void @@ -791,17 +693,17 @@ netsync_session::queue_data_cmd(netcmd_i L(FL("queueing %d bytes of data for %s item '%s'") % dat.size() % typestr % hid()); - netcmd cmd(version); + netcmd cmd(get_version()); // TODO: This pair of functions will make two copies of a large // file, the first in cmd.write_data_cmd, and the second in - // write_netcmd_and_try_flush when the data is copied from the + // write_netcmd when the data is copied from the // cmd.payload variable to the string buffer for output. This // double copy should be collapsed out, it may be better to use // a string_queue for output as well as input, as that will reduce // the amount of mallocs that happen when the string queue is large // enough to just store the data. cmd.write_data_cmd(type, item, dat); - write_netcmd_and_try_flush(cmd); + write_netcmd(cmd); note_item_sent(type, item); } @@ -832,9 +734,9 @@ netsync_session::queue_delta_cmd(netcmd_ L(FL("queueing %s delta '%s' -> '%s'") % typestr % base_hid() % ident_hid()); - netcmd cmd(version); + netcmd cmd(get_version()); cmd.write_delta_cmd(type, base, ident, del); - write_netcmd_and_try_flush(cmd); + write_netcmd(cmd); note_item_sent(type, ident); } @@ -842,413 +744,6 @@ bool // processors bool -netsync_session::process_error_cmd(string const & errmsg) -{ - // "xxx string" with xxx being digits means there's an error code - if (errmsg.size() > 4 && errmsg.substr(3,1) == " ") - { - try - { - int err = boost::lexical_cast(errmsg.substr(0,3)); - if (err >= 100) - { - error_code = err; - throw bad_decode(F("received network error: %s") - % errmsg.substr(4)); - } - } - catch (boost::bad_lexical_cast) - { // ok, so it wasn't a number - } - } - throw bad_decode(F("received network error: %s") % errmsg); -} - -static const var_domain known_servers_domain = var_domain("known-servers"); - -bool -netsync_session::process_hello_cmd(u8 server_version, - key_name const & their_keyname, - rsa_pub_key const & their_key, - id const & nonce) -{ - I(!this->received_remote_key); - I(this->saved_nonce().empty()); - - // version sanity has already been checked by netcmd::read() - L(FL("received hello command; setting version from %d to %d") - % widen(version) - % widen(server_version)); - version = server_version; - - key_identity_info their_identity; - if (use_transport_auth) - { - key_hash_code(their_keyname, their_key, their_identity.id); - var_value printable_key_hash; - { - hexenc encoded_key_hash; - encode_hexenc(their_identity.id.inner(), encoded_key_hash); - printable_key_hash = typecast_vocab(encoded_key_hash); - } - L(FL("server key has name %s, hash %s") - % their_keyname % printable_key_hash); - var_key their_key_key(known_servers_domain, - var_name(peer_id, origin::internal)); - if (project.db.var_exists(their_key_key)) - { - var_value expected_key_hash; - project.db.get_var(their_key_key, expected_key_hash); - if (expected_key_hash != printable_key_hash) - { - P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n" - "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n" - "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n" - "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n" - "it is also possible that the server key has just been changed\n" - "remote host sent key %s\n" - "I expected %s\n" - "'%s unset %s %s' overrides this check") - % printable_key_hash - % expected_key_hash - % prog_name % their_key_key.first % their_key_key.second); - E(false, origin::network, F("server key changed")); - } - } - else - { - P(F("first time connecting to server %s\n" - "I'll assume it's really them, but you might want to double-check\n" - "their key's fingerprint: %s") - % peer_id - % printable_key_hash); - project.db.set_var(their_key_key, printable_key_hash); - } - - if (!project.db.public_key_exists(their_identity.id)) - { - // this should now always return true since we just checked - // for the existence of this particular key - I(project.db.put_key(their_keyname, their_key)); - W(F("saving public key for %s to database") % their_keyname); - } - - { - hexenc hnonce; - encode_hexenc(nonce, hnonce); - L(FL("received 'hello' netcmd from server '%s' with nonce '%s'") - % printable_key_hash % hnonce); - } - - I(project.db.public_key_exists(their_identity.id)); - project.complete_key_identity(keys, lua, their_identity); - - // save their identity - this->received_remote_key = true; - this->remote_peer_key_id = their_identity.id; - } - - // clients always include in the synchronization set, every branch that the - // user requested - set all_branches, ok_branches; - project.get_branch_list(all_branches); - for (set::const_iterator i = all_branches.begin(); - i != all_branches.end(); i++) - { - if (our_matcher((*i)())) - ok_branches.insert(*i); - } - rebuild_merkle_trees(ok_branches); - - if (!initiated_by_server) - setup_client_tickers(); - - if (use_transport_auth && signing_key.inner()() != "") - { - // get our key pair - load_key_pair(keys, signing_key); - - // make a signature with it; - // this also ensures our public key is in the database - rsa_sha1_signature sig; - keys.make_signature(project.db, signing_key, nonce(), sig); - - // make a new nonce of our own and send off the 'auth' - queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern, - signing_key, nonce, mk_nonce(), sig); - } - else - { - queue_anonymous_cmd(this->role, our_include_pattern, - our_exclude_pattern, mk_nonce()); - } - - lua.hook_note_netsync_start(session_id, "client", this->role, - peer_id, their_identity, - our_include_pattern, our_exclude_pattern); - return true; -} - -bool -netsync_session::process_anonymous_cmd(protocol_role their_role, - globish const & their_include_pattern, - globish const & their_exclude_pattern) -{ - // Internally netsync thinks in terms of sources and sinks. Users like - // thinking of repositories as "readonly", "readwrite", or "writeonly". - // - // We therefore use the read/write terminology when dealing with the UI: - // if the user asks to run a "read only" service, this means they are - // willing to be a source but not a sink. - // - // nb: The "role" here is the role the *client* wants to play - // so we need to check that the opposite role is allowed for us, - // in our this->role field. - // - - lua.hook_note_netsync_start(session_id, "server", their_role, - peer_id, key_identity_info(), - their_include_pattern, their_exclude_pattern); - - // Client must be a sink and server must be a source (anonymous - // read-only), unless transport auth is disabled. - // - // If running in no-transport-auth mode, we operate anonymously and - // permit adoption of any role. - - if (use_transport_auth) - { - if (their_role != sink_role) - { - this->saved_nonce = id(""); - error(not_permitted, - F("rejected attempt at anonymous connection for write").str()); - } - - if (this->role == sink_role) - { - this->saved_nonce = id(""); - error(role_mismatch, - F("rejected attempt at anonymous connection while running as sink").str()); - } - } - - set all_branches, ok_branches; - project.get_branch_list(all_branches); - globish_matcher their_matcher(their_include_pattern, their_exclude_pattern); - for (set::const_iterator i = all_branches.begin(); - i != all_branches.end(); i++) - { - if (their_matcher((*i)())) - { - if (use_transport_auth && - !lua.hook_get_netsync_read_permitted((*i)())) - { - error(not_permitted, - (F("anonymous access to branch '%s' denied by server") - % *i).str()); - } - else - ok_branches.insert(*i); - } - } - - if (use_transport_auth) - { - P(F("allowed anonymous read permission for '%s' excluding '%s'") - % their_include_pattern % their_exclude_pattern); - this->role = source_role; - } - else - { - P(F("allowed anonymous read/write permission for '%s' excluding '%s'") - % their_include_pattern % their_exclude_pattern); - assume_corresponding_role(their_role); - } - - rebuild_merkle_trees(ok_branches); - - this->remote_peer_key_id = key_id(); - this->authenticated = true; - return true; -} - -void -netsync_session::assume_corresponding_role(protocol_role their_role) -{ - // Assume the (possibly degraded) opposite role. - switch (their_role) - { - case source_role: - I(this->role != source_role); - this->role = sink_role; - break; - - case source_and_sink_role: - I(this->role == source_and_sink_role); - break; - - case sink_role: - I(this->role != sink_role); - this->role = source_role; - break; - } -} - -bool -netsync_session::process_auth_cmd(protocol_role their_role, - globish const & their_include_pattern, - globish const & their_exclude_pattern, - key_id const & client, - id const & nonce1, - rsa_sha1_signature const & signature) -{ - I(!this->received_remote_key); - I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes); - - globish_matcher their_matcher(their_include_pattern, their_exclude_pattern); - - if (!project.db.public_key_exists(client)) - { - // If it's not in the db, it still could be in the keystore if we - // have the private key that goes with it. - key_name their_key_id; - keypair their_keypair; - if (keys.maybe_get_key_pair(client, their_key_id, their_keypair)) - project.db.put_key(their_key_id, their_keypair.pub); - else - { - return process_anonymous_cmd(their_role, - their_include_pattern, - their_exclude_pattern); - /* - this->saved_nonce = id(""); - - lua.hook_note_netsync_start(session_id, "server", their_role, - peer_id, key_name("-unknown-"), - their_include_pattern, - their_exclude_pattern); - error(unknown_key, - (F("remote public key hash '%s' is unknown") - % client).str()); - */ - } - } - - // Get their public key. - key_name their_id; - rsa_pub_key their_key; - project.db.get_pubkey(client, their_id, their_key); - key_identity_info client_identity; - client_identity.id = client; - project.complete_key_identity(keys, lua, client_identity); - - lua.hook_note_netsync_start(session_id, "server", their_role, - peer_id, client_identity, - their_include_pattern, their_exclude_pattern); - - // Check that they replied with the nonce we asked for. - if (!(nonce1 == this->saved_nonce)) - { - this->saved_nonce = id(""); - error(failed_identification, - F("detected replay attack in auth netcmd").str()); - } - - // Internally netsync thinks in terms of sources and sinks. users like - // thinking of repositories as "readonly", "readwrite", or "writeonly". - // - // We therefore use the read/write terminology when dealing with the UI: - // if the user asks to run a "read only" service, this means they are - // willing to be a source but not a sink. - // - // nb: The "their_role" here is the role the *client* wants to play - // so we need to check that the opposite role is allowed for us, - // in our this->role field. - - // Client as sink, server as source (reading). - - if (their_role == sink_role || their_role == source_and_sink_role) - { - if (this->role != source_role && this->role != source_and_sink_role) - { - this->saved_nonce = id(""); - error(not_permitted, - (F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink") - % their_id % their_include_pattern % their_exclude_pattern).str()); - } - } - - set all_branches, ok_branches; - project.get_branch_list(all_branches); - for (set::const_iterator i = all_branches.begin(); - i != all_branches.end(); i++) - { - if (their_matcher((*i)())) - { - if (!lua.hook_get_netsync_read_permitted((*i)(), client_identity)) - { - error(not_permitted, - (F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'") - % their_id % their_include_pattern % their_exclude_pattern % *i).str()); - } - else - ok_branches.insert(*i); - } - } - - // If we're source_and_sink_role, continue even with no branches readable - // eg. serve --db=empty.db - P(F("allowed '%s' read permission for '%s' excluding '%s'") - % their_id % their_include_pattern % their_exclude_pattern); - - // Client as source, server as sink (writing). - - if (their_role == source_role || their_role == source_and_sink_role) - { - if (this->role != sink_role && this->role != source_and_sink_role) - { - this->saved_nonce = id(""); - error(not_permitted, - (F("denied '%s' write permission for '%s' excluding '%s' while running as pure source") - % their_id % their_include_pattern % their_exclude_pattern).str()); - } - - if (!lua.hook_get_netsync_write_permitted(client_identity)) - { - this->saved_nonce = id(""); - error(not_permitted, - (F("denied '%s' write permission for '%s' excluding '%s'") - % their_id % their_include_pattern % their_exclude_pattern).str()); - } - - P(F("allowed '%s' write permission for '%s' excluding '%s'") - % their_id % their_include_pattern % their_exclude_pattern); - } - - rebuild_merkle_trees(ok_branches); - - this->received_remote_key = true; - - // Check the signature. - if (project.db.check_signature(client, nonce1(), signature) == cert_ok) - { - // Get our private key and sign back. - L(FL("client signature OK, accepting authentication")); - this->authenticated = true; - this->remote_peer_key_id = client; - - assume_corresponding_role(their_role); - return true; - } - else - { - error(failed_identification, (F("bad client signature")).str()); - } - return false; -} - -bool netsync_session::process_refine_cmd(refinement_type ty, merkle_node const & node) { string typestr; @@ -1281,87 +776,7 @@ netsync_session::process_refine_cmd(refi return true; } -bool -netsync_session::process_bye_cmd(u8 phase, - transaction_guard & guard) -{ -// Ideal shutdown -// ~~~~~~~~~~~~~~~ -// -// I/O events state transitions -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~ -// client: C_WORKING -// server: S_WORKING -// 0. [refinement, data, deltas, etc.] -// client: C_SHUTDOWN -// (client checkpoints here) -// 1. client -> "bye 0" -// 2. "bye 0" -> server -// server: S_SHUTDOWN -// (server checkpoints here) -// 3. "bye 1" <- server -// 4. client <- "bye 1" -// client: C_CONFIRMED -// 5. client -> "bye 2" -// 6. "bye 2" -> server -// server: S_CONFIRMED -// 7. [server drops connection] -// -// -// Affects of I/O errors or disconnections -// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -// C_WORKING: report error and fault -// S_WORKING: report error and recover -// C_SHUTDOWN: report error and fault -// S_SHUTDOWN: report success and recover -// (and warn that client might falsely see error) -// C_CONFIRMED: report success -// S_CONFIRMED: report success - - switch (phase) - { - case 0: - if (voice == server_voice && - protocol_state == working_state) - { - protocol_state = shutdown_state; - guard.do_checkpoint(); - queue_bye_cmd(1); - } - else - error(bad_command, "unexpected bye phase 0 received"); - break; - - case 1: - if (voice == client_voice && - protocol_state == shutdown_state) - { - protocol_state = confirmed_state; - queue_bye_cmd(2); - } - else - error(bad_command, "unexpected bye phase 1 received"); - break; - - case 2: - if (voice == server_voice && - protocol_state == shutdown_state) - { - protocol_state = confirmed_state; - return false; - } - else - error(bad_command, "unexpected bye phase 2 received"); - break; - - default: - error(bad_command, (F("unknown bye phase %d received") % phase).str()); - } - - return true; -} - bool netsync_session::process_done_cmd(netcmd_item_type type, size_t n_items) { @@ -1401,7 +816,7 @@ void } void -netsync_session::respond_to_confirm_cmd() +netsync_session::accept_service() { epoch_refiner.begin_refinement(); } @@ -1489,7 +904,7 @@ netsync_session::load_data(netcmd_item_t key_name keyname; rsa_pub_key junk; project.db.get_pubkey(c.key, keyname, junk); - if (version >= 7) + if (get_version() >= 7) { c.marshal_for_netio(keyname, out); } @@ -1559,12 +974,13 @@ netsync_session::process_data_cmd(netcmd hexenc their_epoch; encode_hexenc(i->second.inner(), my_epoch); encode_hexenc(epoch.inner(), their_epoch); - error(mixing_versions, + bool am_server = (get_voice() == server_voice); + error(error_codes::mixing_versions, (F("Mismatched epoch on branch %s." " Server has '%s', client has '%s'.") % branch - % (voice == server_voice ? my_epoch : their_epoch)() - % (voice == server_voice ? their_epoch : my_epoch)()).str()); + % (am_server ? my_epoch : their_epoch)() + % (am_server ? their_epoch : my_epoch)()).str()); } } maybe_note_epochs_finished(); @@ -1587,7 +1003,7 @@ netsync_session::process_data_cmd(netcmd if (project.db.put_key(keyid, pub)) written_keys.push_back(key_id(item)); else - error(partial_transfer, + error(error_codes::partial_transfer, (F("Received duplicate key %s") % keyid).str()); } break; @@ -1597,7 +1013,7 @@ netsync_session::process_data_cmd(netcmd cert c; bool matched; key_name keyname; - if (version >= 7) + if (get_version() >= 7) { matched = cert::read_cert(project.db, dat, c, keyname); if (!matched) @@ -1694,25 +1110,7 @@ netsync_session::process_delta_cmd(netcm return true; } -bool -netsync_session::process_usher_cmd(utf8 const & msg) -{ - if (msg().size()) - { - if (msg()[0] == '!') - P(F("Received warning from usher: %s") % msg().substr(1)); - else - L(FL("Received greeting from usher: %s") % msg().substr(1)); - } - netcmd cmdout(version); - cmdout.write_usher_reply_cmd(utf8(peer_id, origin::internal), - our_include_pattern); - write_netcmd_and_try_flush(cmdout); - L(FL("Sent reply.")); - return true; -} - void netsync_session::send_all_data(netcmd_item_type ty, set const & items) { @@ -1741,104 +1139,6 @@ netsync_session::dispatch_payload(netcmd switch (cmd.get_cmd_code()) { - - case error_cmd: - { - string errmsg; - cmd.read_error_cmd(errmsg); - return process_error_cmd(errmsg); - } - break; - - case hello_cmd: - require(! authenticated, "hello netcmd received when not authenticated"); - require(voice == client_voice, "hello netcmd received in client voice"); - { - u8 server_version(0); - key_name server_keyname; - rsa_pub_key server_key; - id nonce; - cmd.read_hello_cmd(server_version, server_keyname, server_key, nonce); - return process_hello_cmd(server_version, server_keyname, server_key, nonce); - } - break; - - case bye_cmd: - require(authenticated, "bye netcmd received when not authenticated"); - { - u8 phase; - cmd.read_bye_cmd(phase); - return process_bye_cmd(phase, guard); - } - break; - - case anonymous_cmd: - require(! authenticated, "anonymous netcmd received when not authenticated"); - require(voice == server_voice, "anonymous netcmd received in server voice"); - require(role == source_role || - role == source_and_sink_role, - "anonymous netcmd received in source or source/sink role"); - { - protocol_role role; - globish their_include_pattern, their_exclude_pattern; - rsa_oaep_sha_data hmac_key_encrypted; - cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted); - L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' " - "in %s mode\n") - % their_include_pattern % their_exclude_pattern - % (role == source_and_sink_role ? _("source and sink") : - (role == source_role ? _("source") : _("sink")))); - - set_session_key(hmac_key_encrypted); - if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern)) - return false; - queue_confirm_cmd(); - return true; - } - break; - - case auth_cmd: - require(! authenticated, "auth netcmd received when not authenticated"); - require(voice == server_voice, "auth netcmd received in server voice"); - { - protocol_role role; - rsa_sha1_signature signature; - globish their_include_pattern, their_exclude_pattern; - key_id client; - id nonce1, nonce2; - rsa_oaep_sha_data hmac_key_encrypted; - cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern, - client, nonce1, hmac_key_encrypted, signature); - - L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' " - "exclude '%s' in %s mode with nonce1 '%s'\n") - % client % their_include_pattern % their_exclude_pattern - % (role == source_and_sink_role ? _("source and sink") : - (role == source_role ? _("source") : _("sink"))) - % nonce1); - - set_session_key(hmac_key_encrypted); - - if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern, - client, nonce1, signature)) - return false; - queue_confirm_cmd(); - return true; - } - break; - - case confirm_cmd: - require(! authenticated, "confirm netcmd received when not authenticated"); - require(voice == client_voice, "confirm netcmd received in client voice"); - { - string signature; - cmd.read_confirm_cmd(); - this->authenticated = true; - respond_to_confirm_cmd(); - return true; - } - break; - case refine_cmd: require(authenticated, "refine netcmd received when authenticated"); { @@ -1887,62 +1187,18 @@ netsync_session::dispatch_payload(netcmd } break; - case usher_cmd: - { - utf8 greeting; - cmd.read_usher_cmd(greeting); - return process_usher_cmd(greeting); - } - break; - - case usher_reply_cmd: - { - u8 client_version; - utf8 server; - globish pattern; - cmd.read_usher_reply_cmd(client_version, server, pattern); - return process_usher_reply_cmd(client_version, server, pattern); - } - break; + default: + return false; } return false; } -// This kicks off the whole cascade starting from "hello". -void -netsync_session::begin_service() -{ - queue_usher_cmd(utf8("", origin::internal)); -} - -void -netsync_session::queue_usher_cmd(utf8 const & message) -{ - L(FL("queueing 'usher' command")); - netcmd cmd(0); - cmd.write_usher_cmd(message); - write_netcmd_and_try_flush(cmd); -} - bool -netsync_session::process_usher_reply_cmd(u8 client_version, - utf8 const & server, - globish const & pattern) +netsync_session::have_work() const { - // netcmd::read() has already checked that the client isn't too old - if (client_version < max_version) - { - version = client_version; - } - L(FL("client has maximum version %d, using %d") - % widen(client_version) % widen(version)); - - key_name name; - keypair kp; - if (use_transport_auth) - keys.get_key_pair(signing_key, name, kp); - queue_hello_cmd(name, kp.pub, mk_nonce()); - return true; + return done_all_refinements() + && !rev_enumerator.done() + && !output_overfull(); } void @@ -1950,9 +1206,7 @@ netsync_session::maybe_step() { date_t start_time = date_t::now(); - while (done_all_refinements() - && !rev_enumerator.done() - && !output_overfull()) + while (have_work()) { rev_enumerator.step(); @@ -1964,83 +1218,34 @@ netsync_session::maybe_step() } } -void -netsync_session::maybe_say_goodbye(transaction_guard & guard) +bool netsync_session::process(transaction_guard & guard, + netcmd const & cmd_in) { - if (voice == client_voice - && protocol_state == working_state - && finished_working()) - { - protocol_state = shutdown_state; - guard.do_checkpoint(); - queue_bye_cmd(0); - } -} - -bool -netsync_session::arm() -{ - if (!armed) - { - // Don't pack the buffer unnecessarily. - if (output_overfull()) - return false; - - if (cmd_in.read(min_version, max_version, inbuf, read_hmac)) - { - armed = true; - } - } - return armed; -} - -bool netsync_session::process(transaction_guard & guard) -{ - if (encountered_error) - return true; try { - if (!arm()) - return true; - - armed = false; - L(FL("processing %d byte input buffer from peer %s") - % inbuf.size() % peer_id); - size_t sz = cmd_in.encoded_size(); bool ret = dispatch_payload(cmd_in, guard); - if (inbuf.size() >= constants::netcmd_maxsz) - W(F("input buffer for peer %s is overfull " - "after netcmd dispatch") % peer_id); - guard.maybe_checkpoint(sz); if (!ret) L(FL("peer %s finishing processing with '%d' packet") - % peer_id % cmd_in.get_cmd_code()); + % get_peer() % cmd_in.get_cmd_code()); return ret; } catch (bad_decode & bd) { W(F("protocol error while processing peer %s: '%s'") - % peer_id % bd.what); + % get_peer() % bd.what); return false; } catch (recoverable_failure & rf) { W(F("recoverable '%s' error while processing peer %s: '%s'") % origin::type_to_string(rf.caused_by()) - % peer_id % rf.what()); + % get_peer() % rf.what()); return false; } - catch (netsync_error & err) - { - W(F("error: %s") % err.msg); - queue_error_cmd(boost::lexical_cast(error_code) + " " + err.msg); - encountered_error = true; - return true; // Don't terminate until we've send the error_cmd. - } } static void ============================================================ --- network/netsync_session.hh c4c43a8ff350ed599508742258a53b9c52a765ff +++ network/netsync_session.hh e848afaf9518a827735efc982ebe5ca6f3ed1d57 @@ -22,7 +22,7 @@ #include "refiner.hh" #include "ui.hh" -#include "network/session_base.hh" +#include "network/wrapped_session.hh" class cert; @@ -30,13 +30,9 @@ netsync_session: netsync_session: public refiner_callbacks, public enumerator_callbacks, - public session_base + public wrapped_session { - u8 version; - u8 max_version; - u8 min_version; protocol_role role; - protocol_voice const voice; globish our_include_pattern; globish our_exclude_pattern; globish_matcher our_matcher; @@ -48,12 +44,6 @@ netsync_session: key_id const & signing_key; std::vector keys_to_push; - netcmd cmd_in; - bool armed; -public: - bool arm(); -private: - bool received_remote_key; key_id remote_peer_key_id; netsync_session_key session_key; @@ -71,11 +61,6 @@ private: size_t certs_in, certs_out; size_t revs_in, revs_out; size_t keys_in, keys_out; - // used to identify this session to the netsync hooks. - // We can't just use saved_nonce, because that's blank for all - // anonymous connections and could lead to confusion. - size_t session_id; - static size_t session_count; // These are read from the server, written to the local database std::vector written_revisions; @@ -89,23 +74,9 @@ private: id saved_nonce; - static const int no_error = 200; - static const int partial_transfer = 211; - static const int no_transfer = 212; - - static const int not_permitted = 412; - static const int unknown_key = 422; - static const int mixing_versions = 432; - - static const int role_mismatch = 512; - static const int bad_command = 521; - - static const int failed_identification = 532; - //static const int bad_data = 541; - int error_code; - bool set_totals; + mutable bool set_totals; // Interface to refinement. refiner epoch_refiner; @@ -132,62 +103,44 @@ public: project_t & project, key_store & keys, protocol_role role, - protocol_voice voice, globish const & our_include_pattern, globish const & our_exclude_pattern, - std::string const & peer, - boost::shared_ptr sock, bool initiated_by_server = false); virtual ~netsync_session(); -private: - id mk_nonce(); + std::string usher_reply_data() const; + bool have_work() const; + void accept_service(); + void request_service(); - void set_session_key(std::string const & key); - void set_session_key(rsa_oaep_sha_data const & key_encrypted); + void on_begin(size_t ident, key_identity_info const & remote_key); + void on_end(size_t ident); +private: void setup_client_tickers(); - bool done_all_refinements(); - bool queued_all_items(); - bool received_all_items(); - bool finished_working(); + bool done_all_refinements() const; + bool queued_all_items() const; + bool received_all_items() const; + bool finished_working() const; void maybe_step(); - void maybe_say_goodbye(transaction_guard & guard); void note_item_arrived(netcmd_item_type ty, id const & i); void maybe_note_epochs_finished(); void note_item_sent(netcmd_item_type ty, id const & i); public: - bool do_work(transaction_guard & guard); + bool do_work(transaction_guard & guard, + netcmd const * const cmd_in); private: void note_bytes_in(int count); void note_bytes_out(int count); - void error(int errcode, std::string const & errmsg); - - void write_netcmd_and_try_flush(netcmd const & cmd); - // Outgoing queue-writers. - void queue_usher_cmd(utf8 const & message); - void queue_bye_cmd(u8 phase); - void queue_error_cmd(std::string const & errmsg); void queue_done_cmd(netcmd_item_type type, size_t n_items); void queue_hello_cmd(key_name const & key_name, rsa_pub_key const & pub_encoded, id const & nonce); - void queue_anonymous_cmd(protocol_role role, - globish const & include_pattern, - globish const & exclude_pattern, - id const & nonce2); - void queue_auth_cmd(protocol_role role, - globish const & include_pattern, - globish const & exclude_pattern, - key_id const & client, - id const & nonce1, - id const & nonce2, - rsa_sha1_signature const & signature); void queue_confirm_cmd(); void queue_refine_cmd(refinement_type ty, merkle_node const & node); void queue_data_cmd(netcmd_item_type type, @@ -199,12 +152,10 @@ private: delta const & del); // Incoming dispatch-called methods. - bool process_error_cmd(std::string const & errmsg); bool process_hello_cmd(u8 server_version, key_name const & server_keyname, rsa_pub_key const & server_key, id const & nonce); - bool process_bye_cmd(u8 phase, transaction_guard & guard); bool process_anonymous_cmd(protocol_role role, globish const & their_include_pattern, globish const & their_exclude_pattern); @@ -223,18 +174,12 @@ private: id const & base, id const & ident, delta const & del); - bool process_usher_cmd(utf8 const & msg); - bool process_usher_reply_cmd(u8 client_version, - utf8 const & server, - globish const & pattern); // The incoming dispatcher. bool dispatch_payload(netcmd const & cmd, transaction_guard & guard); // Various helpers. - void assume_corresponding_role(protocol_role their_role); - void respond_to_confirm_cmd(); bool data_exists(netcmd_item_type type, id const & item); void load_data(netcmd_item_type type, @@ -244,10 +189,9 @@ private: void rebuild_merkle_trees(std::set const & branches); void send_all_data(netcmd_item_type ty, std::set const & items); -public: - void begin_service(); private: - bool process(transaction_guard & guard); + bool process(transaction_guard & guard, + netcmd const & cmd_in); bool initiated_by_server; }; ============================================================ --- network/session_base.cc 157c8a8f35cea36433729d2876f0a4bfb4dd5937 +++ network/session_base.cc 66e0f6b594d20aab21677ad43045d695ae83ae57 @@ -19,12 +19,14 @@ using boost::shared_ptr; using boost::shared_ptr; -session_base::session_base(string const & peer_id, +session_base::session_base(protocol_voice voice, + string const & peer_id, shared_ptr str) : outbuf_bytes(0), peer_id(peer_id), str(str), last_io_time(::time(NULL)), protocol_state(working_state), + voice(voice), encountered_error(false) { } ============================================================ --- network/session_base.hh 0ffdf1741c214d0144014df839a534dc06a4b23e +++ network/session_base.hh 300d67536599a4a9b92927c54ad7def5415301ac @@ -17,6 +17,7 @@ #include "netxx/stream.h" +#include "netcmd.hh" // for protocol_voice #include "network/reactable.hh" #include "string_queue.hh" @@ -34,6 +35,7 @@ protected: size_t outbuf_bytes; // so we can avoid queueing up too much stuff protected: void queue_output(std::string const & s); +public: bool output_overfull() const; bool output_empty() const; public: @@ -52,9 +54,12 @@ public: } protocol_state; + protocol_voice const voice; + bool encountered_error; - session_base(std::string const & peer_id, + session_base(protocol_voice voice, + std::string const & peer_id, boost::shared_ptr str); virtual ~session_base(); virtual bool arm() = 0;