#
#
# add_file "network/session.cc"
# content [a462c9e3e503442b765ca70488c1c1736215b931]
#
# add_file "network/session.hh"
# content [4c73ddc1f08b1609a8150e6831664bb52efca02f]
#
# add_file "network/wrapped_session.cc"
# content [4441e6888f140109a33da720a6424b460d51e95d]
#
# add_file "network/wrapped_session.hh"
# content [a280328f066580ee9cd81d5d2a194d6bc7e691da]
#
# patch "Makefile.am"
# from [c6a102401efb2ea9778faf7a908371f682f4ace1]
# to [aa109db4ce20ea27d2ae26d5e290132ec90a6d71]
#
# patch "enumerator.cc"
# from [89890d88770c8a1cffbfc36fe1f4b8471fc06477]
# to [fa0b2faee2460ab510375b0b25badc148b658f56]
#
# patch "enumerator.hh"
# from [3c5c2ee377c33482e5442e9d59e8fdeb6facf182]
# to [5e52b8fae270dcabc4ada249327adcd496521a73]
#
# patch "netcmd.cc"
# from [b8a5d8e0ca337c099e75f2e4957400b29077ae01]
# to [2863e7b2aed2d4866ad80cf92f2e08266c3d94eb]
#
# patch "netcmd.hh"
# from [e6e9409fb9dae822a261493efd5084e544618db7]
# to [224e57110c5007f2b551d59cf1d108fa26abc37a]
#
# patch "netsync.cc"
# from [1a2949a97db64c97c5a0a92768ae0e6d9c676013]
# to [ed3a26c1a06ca9dd90261cbec4ba82bb57c79ef0]
#
# patch "network/automate_listener.cc"
# from [e3cc9c6951bd37f909ca6d1000475595d3f5c545]
# to [b5556012c57bf1f2b22bf55a3042d1ade1ce133e]
#
# patch "network/automate_session.cc"
# from [ebc7328238ef4bf127528a736a0ec9bf46efa340]
# to [5e6e88a11cdcf49ee651351c5d7284755c953f54]
#
# patch "network/automate_session.hh"
# from [c37e12bc01cccc8c1f91b1d7c0e51dcd54aff231]
# to [b9eb6d161bfb175f616ef8fd4cdc86b33778e077]
#
# patch "network/netsync_listener.cc"
# from [8dab9290b6c5fca7ecde13045a219bb971af9fb0]
# to [3f5307dd5310d07c89da689808978710c101858d]
#
# patch "network/netsync_session.cc"
# from [53d10a1f77135a9f4550aac3dc78df6f4123d9d6]
# to [d29ead19e030ce5d2f3d9072f4302ad691b0e967]
#
# patch "network/netsync_session.hh"
# from [c4c43a8ff350ed599508742258a53b9c52a765ff]
# to [e848afaf9518a827735efc982ebe5ca6f3ed1d57]
#
# patch "network/session_base.cc"
# from [157c8a8f35cea36433729d2876f0a4bfb4dd5937]
# to [66e0f6b594d20aab21677ad43045d695ae83ae57]
#
# patch "network/session_base.hh"
# from [0ffdf1741c214d0144014df839a534dc06a4b23e]
# to [300d67536599a4a9b92927c54ad7def5415301ac]
#
============================================================
--- network/session.cc a462c9e3e503442b765ca70488c1c1736215b931
+++ network/session.cc a462c9e3e503442b765ca70488c1c1736215b931
@@ -0,0 +1,669 @@
+// Copyright (C) 2009 Timothy Brownawell
+//
+// This program is made available under the GNU GPL version 2.0 or
+// greater. See the accompanying file COPYING for details.
+//
+// This program is distributed WITHOUT ANY WARRANTY; without even the
+// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+// PURPOSE.
+
+#include "base.hh"
+#include "network/session.hh"
+
+#include "key_store.hh"
+#include "database.hh"
+#include "keys.hh"
+#include "lua_hooks.hh"
+#include "network/netsync_session.hh"
+#include "options.hh"
+#include "project.hh"
+#include "vocab_cast.hh"
+
+using std::string;
+
+using boost::lexical_cast;
+using boost::shared_ptr;
+
+
+static const var_domain known_servers_domain = var_domain("known-servers");
+
+size_t session::session_num = 0;
+
+session::session(options & opts, lua_hooks & lua, project_t & project,
+ key_store & keys,
+ protocol_voice voice,
+ std::string const & peer,
+ shared_ptr sock,
+ bool use_transport_auth) :
+ session_base(voice, peer, sock),
+ version(0),
+ max_version(opts.max_netsync_version),
+ min_version(opts.min_netsync_version),
+ use_transport_auth(use_transport_auth),
+ signing_key(keys.signing_key),
+ cmd_in(0),
+ armed(false),
+ received_remote_key(false),
+ session_key(constants::netsync_key_initializer),
+ read_hmac(netsync_session_key(constants::netsync_key_initializer),
+ use_transport_auth),
+ write_hmac(netsync_session_key(constants::netsync_key_initializer),
+ use_transport_auth),
+ authenticated(false),
+ completed_hello(false),
+ error_code(0),
+ session_id(++session_num),
+ opts(opts),
+ lua(lua),
+ project(project),
+ keys(keys),
+ peer(peer)
+{
+}
+
+session::~session()
+{
+ if (wrapped)
+ wrapped->on_end(session_id);
+}
+
+void session::set_inner(shared_ptr wrapped)
+{
+ this->wrapped = wrapped;
+}
+
+id
+session::mk_nonce()
+{
+ I(this->saved_nonce().empty());
+ char buf[constants::merkle_hash_length_in_bytes];
+
+#if BOTAN_VERSION_CODE >= BOTAN_VERSION_CODE_FOR(1,7,7)
+ keys.get_rng().randomize(reinterpret_cast(buf),
+ constants::merkle_hash_length_in_bytes);
+#else
+ Botan::Global_RNG::randomize(reinterpret_cast(buf),
+ constants::merkle_hash_length_in_bytes);
+#endif
+ this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes),
+ origin::internal);
+ I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
+ return this->saved_nonce;
+}
+
+void
+session::set_session_key(string const & key)
+{
+ session_key = netsync_session_key(key, origin::internal);
+ read_hmac.set_key(session_key);
+ write_hmac.set_key(session_key);
+}
+
+void
+session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
+{
+ if (use_transport_auth)
+ {
+ string hmac_key;
+ keys.decrypt_rsa(signing_key, hmac_key_encrypted, hmac_key);
+ set_session_key(hmac_key);
+ }
+}
+
+bool session::arm()
+{
+ if (!armed)
+ {
+ // Don't pack the buffer unnecessarily.
+ if (output_overfull())
+ return false;
+
+ if (cmd_in.read(min_version, max_version, inbuf, read_hmac))
+ {
+ armed = true;
+ }
+ }
+ return armed;
+}
+
+void session::begin_service()
+{
+ netcmd cmd(0);
+ cmd.write_usher_cmd(utf8("", origin::internal));
+ write_netcmd(cmd);
+}
+
+bool session::do_work(transaction_guard & guard)
+{
+ bool armed = arm();
+ bool is_goodbye = armed && cmd_in.get_cmd_code() == bye_cmd;
+ bool is_error = armed && cmd_in.get_cmd_code() == error_cmd;
+ if (completed_hello && !is_goodbye && !is_error)
+ {
+ try
+ {
+ if (encountered_error)
+ return true;
+ else
+ {
+ bool ok = wrapped->do_work(guard, armed ? &cmd_in : 0);
+ if (ok)
+ {
+ if (voice == client_voice
+ && protocol_state == working_state
+ && wrapped->finished_working())
+ {
+ protocol_state = shutdown_state;
+ guard.do_checkpoint();
+ queue_bye_cmd(0);
+ }
+ }
+ return ok;
+ }
+ }
+ catch (netsync_error & err)
+ {
+ W(F("error: %s") % err.msg);
+ string const errmsg(lexical_cast(error_code) + " " + err.msg);
+ L(FL("queueing 'error' command"));
+ netcmd cmd(get_version());
+ cmd.write_error_cmd(errmsg);
+ write_netcmd(cmd);
+ encountered_error = true;
+ return true; // Don't terminate until we've send the error_cmd.
+ }
+ }
+ else
+ {
+ if (!armed)
+ return true;
+ switch (cmd_in.get_cmd_code())
+ {
+ case usher_cmd:
+ {
+ utf8 msg;
+ cmd_in.read_usher_cmd(msg);
+ if (msg().size())
+ {
+ if (msg()[0] == '!')
+ P(F("Received warning from usher: %s") % msg().substr(1));
+ else
+ L(FL("Received greeting from usher: %s") % msg().substr(1));
+ }
+ netcmd cmdout(version);
+ cmdout.write_usher_reply_cmd(utf8(peer_id, origin::internal),
+ wrapped->usher_reply_data());
+ write_netcmd(cmdout);
+ L(FL("Sent reply."));
+ return true;
+ }
+ case usher_reply_cmd:
+ {
+ u8 client_version;
+ utf8 server;
+ string pattern;
+ cmd_in.read_usher_reply_cmd(client_version, server, pattern);
+
+ // netcmd::read() has already checked that the client isn't too old
+ if (client_version < max_version)
+ {
+ version = client_version;
+ }
+ L(FL("client has maximum version %d, using %d")
+ % widen(client_version) % widen(version));
+ netcmd cmd(version);
+
+ key_name name;
+ keypair kp;
+ keys.get_key_pair(signing_key, name, kp);
+ if (use_transport_auth)
+ {
+ cmd.write_hello_cmd(name, kp.pub, mk_nonce());
+ }
+ else
+ {
+ cmd.write_hello_cmd(name, rsa_pub_key(), mk_nonce());
+ }
+ write_netcmd(cmd);
+ return true;
+ }
+ case hello_cmd:
+ { // need to ask wrapped what to reply with (we're a client)
+ u8 server_version;
+ key_name their_keyname;
+ rsa_pub_key their_key;
+ id nonce;
+ cmd_in.read_hello_cmd(server_version, their_keyname,
+ their_key, nonce);
+ hello_nonce = nonce;
+
+ I(!received_remote_key);
+ I(saved_nonce().empty());
+
+ // version sanity has already been checked by netcmd::read()
+ L(FL("received hello command; setting version from %d to %d")
+ % widen(get_version())
+ % widen(server_version));
+ version = server_version;
+
+ if (use_transport_auth)
+ {
+ key_id remote_key;
+ key_hash_code(their_keyname, their_key, remote_peer_key_id);
+
+ var_value printable_key_hash;
+ {
+ hexenc encoded_key_hash;
+ encode_hexenc(remote_key.inner(), encoded_key_hash);
+ printable_key_hash = typecast_vocab(encoded_key_hash);
+ }
+ L(FL("server key has name %s, hash %s")
+ % their_keyname % printable_key_hash);
+ var_key their_key_key(known_servers_domain,
+ var_name(get_peer(), origin::internal));
+ if (project.db.var_exists(their_key_key))
+ {
+ var_value expected_key_hash;
+ project.db.get_var(their_key_key, expected_key_hash);
+ if (expected_key_hash != printable_key_hash)
+ {
+ P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
+ "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
+ "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
+ "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
+ "it is also possible that the server key has just been changed\n"
+ "remote host sent key %s\n"
+ "I expected %s\n"
+ "'%s unset %s %s' overrides this check")
+ % printable_key_hash
+ % expected_key_hash
+ % prog_name % their_key_key.first % their_key_key.second);
+ E(false, origin::network, F("server key changed"));
+ }
+ }
+ else
+ {
+ P(F("first time connecting to server %s\n"
+ "I'll assume it's really them, but you might want to double-check\n"
+ "their key's fingerprint: %s")
+ % get_peer()
+ % printable_key_hash);
+ project.db.set_var(their_key_key, printable_key_hash);
+ }
+
+ if (!project.db.public_key_exists(remote_peer_key_id))
+ {
+ // this should now always return true since we just checked
+ // for the existence of this particular key
+ I(project.db.put_key(their_keyname, their_key));
+ W(F("saving public key for %s to database") % their_keyname);
+ }
+ {
+ hexenc hnonce;
+ encode_hexenc(nonce, hnonce);
+ L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
+ % printable_key_hash % hnonce);
+ }
+
+ I(project.db.public_key_exists(remote_peer_key_id));
+
+ // save their identity
+ received_remote_key = true;
+ }
+
+ wrapped->request_service();
+
+ }
+ return true;
+
+ case anonymous_cmd:
+ case auth_cmd:
+ case automate_cmd:
+ return handle_service_request();
+
+ case confirm_cmd:
+ {
+ authenticated = true; // maybe?
+ completed_hello = true;
+ wrapped->accept_service();
+ }
+ return true;
+
+ case bye_cmd:
+ {
+ u8 phase;
+ cmd_in.read_bye_cmd(phase);
+ return process_bye_cmd(phase, guard);
+ }
+ case error_cmd:
+ {
+ string errmsg;
+ cmd_in.read_error_cmd(errmsg);
+
+ // "xxx string" with xxx being digits means there's an error code
+ if (errmsg.size() > 4 && errmsg.substr(3,1) == " ")
+ {
+ try
+ {
+ int err = boost::lexical_cast(errmsg.substr(0,3));
+ if (err >= 100)
+ {
+ error_code = err;
+ throw bad_decode(F("received network error: %s")
+ % errmsg.substr(4));
+ }
+ }
+ catch (boost::bad_lexical_cast)
+ { // ok, so it wasn't a number
+ }
+ }
+ throw bad_decode(F("received network error: %s") % errmsg);
+ }
+ default:
+ // ERROR
+ return false;
+ }
+ }
+}
+
+void
+session::request_netsync(protocol_role role,
+ globish const & our_include_pattern,
+ globish const & our_exclude_pattern)
+{
+ id nonce2(mk_nonce());
+ netcmd request(version);
+ rsa_oaep_sha_data hmac_key_encrypted;
+ if (use_transport_auth)
+ project.db.encrypt_rsa(remote_peer_key_id, nonce2(), hmac_key_encrypted);
+
+ if (use_transport_auth && signing_key.inner()() != "")
+ {
+ // get our key pair
+ load_key_pair(keys, signing_key);
+
+ // make a signature with it;
+ // this also ensures our public key is in the database
+ rsa_sha1_signature sig;
+ keys.make_signature(project.db, signing_key, hello_nonce(), sig);
+
+ request.write_auth_cmd(role, our_include_pattern, our_exclude_pattern,
+ signing_key, hello_nonce,
+ hmac_key_encrypted, sig);
+ }
+ else
+ {
+ request.write_anonymous_cmd(role, our_include_pattern, our_exclude_pattern,
+ hmac_key_encrypted);
+ }
+ write_netcmd(request);
+ set_session_key(nonce2());
+
+ key_identity_info remote_key;
+ remote_key.id = remote_peer_key_id;
+ if (remote_key.id.inner()().empty())
+ project.complete_key_identity(keys, lua, remote_key);
+
+ wrapped->on_begin(session_id, remote_key);
+}
+
+void
+session::request_automate()
+{
+ // TODO
+}
+
+void
+session::queue_bye_cmd(u8 phase)
+{
+ L(FL("queueing 'bye' command, phase %d")
+ % static_cast(phase));
+ netcmd cmd(get_version());
+ cmd.write_bye_cmd(phase);
+ write_netcmd(cmd);
+}
+
+bool
+session::process_bye_cmd(u8 phase,
+ transaction_guard & guard)
+{
+
+// Ideal shutdown
+// ~~~~~~~~~~~~~~~
+//
+// I/O events state transitions
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
+// client: C_WORKING
+// server: S_WORKING
+// 0. [refinement, data, deltas, etc.]
+// client: C_SHUTDOWN
+// (client checkpoints here)
+// 1. client -> "bye 0"
+// 2. "bye 0" -> server
+// server: S_SHUTDOWN
+// (server checkpoints here)
+// 3. "bye 1" <- server
+// 4. client <- "bye 1"
+// client: C_CONFIRMED
+// 5. client -> "bye 2"
+// 6. "bye 2" -> server
+// server: S_CONFIRMED
+// 7. [server drops connection]
+//
+//
+// Affects of I/O errors or disconnections
+// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+// C_WORKING: report error and fault
+// S_WORKING: report error and recover
+// C_SHUTDOWN: report error and fault
+// S_SHUTDOWN: report success and recover
+// (and warn that client might falsely see error)
+// C_CONFIRMED: report success
+// S_CONFIRMED: report success
+
+ switch (phase)
+ {
+ case 0:
+ if (voice == server_voice &&
+ protocol_state == working_state)
+ {
+ protocol_state = shutdown_state;
+ guard.do_checkpoint();
+ queue_bye_cmd(1);
+ }
+ else
+ error(error_codes::bad_command,
+ "unexpected bye phase 0 received");
+ break;
+
+ case 1:
+ if (voice == client_voice &&
+ protocol_state == shutdown_state)
+ {
+ protocol_state = confirmed_state;
+ queue_bye_cmd(2);
+ }
+ else
+ error(error_codes::bad_command, "unexpected bye phase 1 received");
+ break;
+
+ case 2:
+ if (voice == server_voice &&
+ protocol_state == shutdown_state)
+ {
+ protocol_state = confirmed_state;
+ return false;
+ }
+ else
+ error(error_codes::bad_command, "unexpected bye phase 2 received");
+ break;
+
+ default:
+ error(error_codes::bad_command,
+ (F("unknown bye phase %d received") % phase).str());
+ }
+
+ return true;
+}
+
+static
+protocol_role
+corresponding_role(protocol_role their_role)
+{
+ switch (their_role)
+ {
+ case source_role:
+ return sink_role;
+ case source_and_sink_role:
+ return source_and_sink_role;
+ case sink_role:
+ return source_role;
+ }
+ I(false);
+}
+
+bool session::handle_service_request()
+{
+ enum { is_netsync, is_automate } is_what;
+ bool auth;
+
+ // netsync parameters
+ protocol_role role;
+ globish their_include;
+ globish their_exclude;
+
+ // auth parameters
+ key_id client_id;
+ id nonce1;
+ rsa_sha1_signature sig;
+
+ rsa_oaep_sha_data hmac_encrypted;
+
+
+ switch (cmd_in.get_cmd_code())
+ {
+ case anonymous_cmd:
+ cmd_in.read_anonymous_cmd(role, their_include, their_exclude,
+ hmac_encrypted);
+ L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
+ "in %s mode\n")
+ % their_include % their_exclude
+ % (role == source_and_sink_role ? _("source and sink") :
+ (role == source_role ? _("source") : _("sink"))));
+
+ is_what = is_netsync;
+ auth = false;
+ break;
+ case auth_cmd:
+ cmd_in.read_auth_cmd(role, their_include, their_exclude,
+ client_id, nonce1, hmac_encrypted, sig);
+ L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
+ "exclude '%s' in %s mode with nonce1 '%s'\n")
+ % client_id % their_include % their_exclude
+ % (role == source_and_sink_role ? _("source and sink") :
+ (role == source_role ? _("source") : _("sink")))
+ % nonce1);
+ is_what = is_netsync;
+ auth = true;
+ break;
+ case automate_cmd:
+ cmd_in.read_automate_cmd(client_id, nonce1, hmac_encrypted, sig);
+ is_what = is_automate;
+ auth = true;
+ break;
+ default:
+ I(false);
+ }
+ set_session_key(hmac_encrypted);
+
+ if (auth && !project.db.public_key_exists(client_id))
+ {
+ key_name their_name;
+ keypair their_pair;
+ if (keys.maybe_get_key_pair(client_id, their_name, their_pair))
+ {
+ project.db.put_key(their_name, their_pair.pub);
+ }
+ else
+ {
+ auth = false;
+ }
+ }
+ if (auth)
+ {
+ if (!(nonce1 == saved_nonce))
+ {
+ error(error_codes::failed_identification,
+ F("detected replay attack in auth netcmd").str());
+ }
+ if (project.db.check_signature(client_id, nonce1(), sig) != cert_ok)
+ {
+ error(error_codes::failed_identification,
+ F("bad client signature").str());
+ }
+ authenticated = true;
+ remote_peer_key_id = client_id;
+ }
+
+ switch (is_what)
+ {
+ case is_netsync:
+ wrapped.reset(new netsync_session(opts,
+ lua,
+ project,
+ keys,
+ corresponding_role(role),
+ globish(),
+ globish()));
+ wrapped->set_owner(this);
+ break;
+ case is_automate:
+ break;
+ }
+
+ // queue_confirm_cmd();
+ completed_hello = true;
+ return true;
+}
+
+void session::write_netcmd(netcmd const & cmd)
+{
+ if (!encountered_error)
+ {
+ string buf;
+ cmd.write(buf, write_hmac);
+ queue_output(buf);
+ }
+ else
+ L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
+}
+
+u8 session::get_version() const
+{
+ return version;
+}
+
+protocol_voice session::get_voice() const
+{
+ return voice;
+}
+
+string session::get_peer() const
+{
+ return peer;
+}
+
+void
+session::error(int errcode, string const & errmsg)
+{
+ error_code = errcode;
+ throw netsync_error(errmsg);
+}
+
+// Local Variables:
+// mode: C++
+// fill-column: 76
+// c-file-style: "gnu"
+// indent-tabs-mode: nil
+// End:
+// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:
============================================================
--- network/session.hh 4c73ddc1f08b1609a8150e6831664bb52efca02f
+++ network/session.hh 4c73ddc1f08b1609a8150e6831664bb52efca02f
@@ -0,0 +1,108 @@
+// Copyright (C) 2009 Timothy Brownawell
+//
+// This program is made available under the GNU GPL version 2.0 or
+// greater. See the accompanying file COPYING for details.
+//
+// This program is distributed WITHOUT ANY WARRANTY; without even the
+// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+// PURPOSE.
+
+#ifndef __SESSION_HH__
+#define __SESSION_HH__
+
+#include "network/session_base.hh"
+#include "network/wrapped_session.hh"
+
+#include "netcmd.hh"
+#include "vocab.hh"
+
+class key_store;
+class lua_hooks;
+class options;
+class project_t;
+
+class session : public session_base
+{
+ u8 version;
+ u8 max_version;
+ u8 min_version;
+
+ bool use_transport_auth;
+ key_id const & signing_key;
+
+ netcmd cmd_in;
+ bool armed;
+
+ bool received_remote_key;
+ key_id remote_peer_key_id;
+ netsync_session_key session_key;
+ chained_hmac read_hmac;
+ chained_hmac write_hmac;
+ bool authenticated;
+
+ void set_session_key(std::string const & key);
+ void set_session_key(rsa_oaep_sha_data const & key_encrypted);
+
+ id hello_nonce;
+ id saved_nonce;
+ id mk_nonce();
+
+ bool completed_hello;
+
+ int error_code;
+
+ size_t session_id;
+ static size_t session_num;
+
+ options & opts;
+ lua_hooks & lua;
+ project_t & project;
+ key_store & keys;
+ std::string peer;
+ boost::shared_ptr wrapped;
+
+ void queue_bye_cmd(u8 phase);
+ bool process_bye_cmd(u8 phase, transaction_guard & guard);
+
+ bool handle_service_request();
+public:
+ session(options & opts, lua_hooks & lua, project_t & project,
+ key_store & keys,
+ protocol_voice voice,
+ std::string const & peer,
+ boost::shared_ptr sock,
+ bool use_transport_auth = true);
+ ~session();
+ void set_inner(boost::shared_ptr wrapped);
+
+ bool arm();
+ bool do_work(transaction_guard & guard);
+ void begin_service();
+
+ void write_netcmd(netcmd const & cmd);
+ u8 get_version() const;
+ protocol_voice get_voice() const;
+ std::string get_peer() const;
+
+ void request_netsync(protocol_role role,
+ globish const & our_include_pattern,
+ globish const & our_exclude_pattern);
+ void request_automate();
+
+ // This method triggers a special "error unwind" mode to netsync. In this
+ // mode, all received data is ignored, and no new data is queued. We simply
+ // stay connected long enough for the current write buffer to be flushed, to
+ // ensure that our peer receives the error message.
+ // Affects read_some, write_some, and process .
+ void error(int errcode, std::string const & message);
+};
+
+#endif
+
+// Local Variables:
+// mode: C++
+// fill-column: 76
+// c-file-style: "gnu"
+// indent-tabs-mode: nil
+// End:
+// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:
============================================================
--- network/wrapped_session.cc 4441e6888f140109a33da720a6424b460d51e95d
+++ network/wrapped_session.cc 4441e6888f140109a33da720a6424b460d51e95d
@@ -0,0 +1,95 @@
+// Copyright (C) 2009 Timothy Brownawell
+//
+// This program is made available under the GNU GPL version 2.0 or
+// greater. See the accompanying file COPYING for details.
+//
+// This program is distributed WITHOUT ANY WARRANTY; without even the
+// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+// PURPOSE.
+
+#include "base.hh"
+#include "network/wrapped_session.hh"
+
+#include "network/session.hh"
+
+using std::string;
+
+wrapped_session::wrapped_session() :
+ owner(0)
+{ }
+
+wrapped_session::wrapped_session(session * owner) :
+ owner(owner)
+{ }
+
+void wrapped_session::set_owner(session * owner)
+{
+ I(!this->owner);
+ this->owner = owner;
+}
+
+void wrapped_session::write_netcmd(netcmd const & cmd) const
+{
+ owner->write_netcmd(cmd);
+}
+
+u8 wrapped_session::get_version() const
+{
+ return owner->get_version();
+}
+
+void wrapped_session::error(int errcode, string const & message)
+{
+ owner->error(errcode, message);
+}
+
+protocol_voice wrapped_session::get_voice() const
+{
+ return owner->get_voice();
+}
+
+bool wrapped_session::encountered_error() const
+{
+ return owner->encountered_error;
+}
+
+string wrapped_session::get_peer() const
+{
+ return owner->get_peer();
+}
+
+bool wrapped_session::output_overfull() const
+{
+ return owner->output_overfull();
+}
+
+bool wrapped_session::shutdown_confirmed() const
+{
+ return owner->protocol_state == session_base::confirmed_state;
+}
+
+void wrapped_session::request_netsync(protocol_role role,
+ globish const & include,
+ globish const & exclude)
+{
+ owner->request_netsync(role, include, exclude);
+}
+
+void wrapped_session::request_automate()
+{
+ owner->request_automate();
+}
+
+void wrapped_session::on_begin(size_t ident, key_identity_info const & remote_key)
+{ }
+
+void wrapped_session::on_end(size_t ident)
+{ }
+
+// Local Variables:
+// mode: C++
+// fill-column: 76
+// c-file-style: "gnu"
+// indent-tabs-mode: nil
+// End:
+// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:
============================================================
--- network/wrapped_session.hh a280328f066580ee9cd81d5d2a194d6bc7e691da
+++ network/wrapped_session.hh a280328f066580ee9cd81d5d2a194d6bc7e691da
@@ -0,0 +1,72 @@
+// Copyright (C) 2009 Timothy Brownawell
+//
+// This program is made available under the GNU GPL version 2.0 or
+// greater. See the accompanying file COPYING for details.
+//
+// This program is distributed WITHOUT ANY WARRANTY; without even the
+// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+// PURPOSE.
+
+#ifndef __WRAPPED_SESSION_HH__
+#define __WRAPPED_SESSION_HH__
+
+#include "netcmd.hh" // for protocol_voice
+#include "numeric_vocab.hh"
+
+struct key_identity_info;
+class netcmd;
+class session;
+class transaction_guard;
+
+struct netsync_error
+{
+ std::string msg;
+ netsync_error(std::string const & s): msg(s) {}
+};
+
+
+class wrapped_session
+{
+ session * owner;
+protected:
+ void write_netcmd(netcmd const & cmd) const;
+ u8 get_version() const;
+ void error(int errcode, std::string const & message);
+ protocol_voice get_voice() const;
+ std::string get_peer() const;
+ bool output_overfull() const;
+ bool encountered_error() const;
+ bool shutdown_confirmed() const;
+
+ void request_netsync(protocol_role role,
+ globish const & include,
+ globish const & exclude);
+ void request_automate();
+public:
+ wrapped_session();
+ explicit wrapped_session(session * owner);
+ void set_owner(session * owner);
+
+ virtual bool do_work(transaction_guard & guard,
+ netcmd const * const in_cmd) = 0;
+ // Can I do anything without waiting for more input?
+ virtual bool have_work() const = 0;
+
+ virtual void request_service() = 0;
+ virtual void accept_service() = 0;
+ virtual std::string usher_reply_data() const = 0;
+ virtual bool finished_working() const = 0;
+
+ virtual void on_begin(size_t ident, key_identity_info const & remote_key);
+ virtual void on_end(size_t ident);
+};
+
+#endif
+
+// Local Variables:
+// mode: C++
+// fill-column: 76
+// c-file-style: "gnu"
+// indent-tabs-mode: nil
+// End:
+// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:
============================================================
--- Makefile.am c6a102401efb2ea9778faf7a908371f682f4ace1
+++ Makefile.am aa109db4ce20ea27d2ae26d5e290132ec90a6d71
@@ -68,7 +68,9 @@ MOST_SOURCES = \
network/netsync_session.hh network/netsync_session.cc \
network/reactable.hh network/reactable.cc \
network/reactor.hh network/reactor.cc \
+ network/session.hh network/session.cc \
network/session_base.hh network/session_base.cc \
+ network/wrapped_session.hh network/wrapped_session.cc \
netxx_pipe.cc netxx_pipe.hh \
netcmd.cc netcmd.hh \
merkle_tree.cc merkle_tree.hh \
============================================================
--- enumerator.cc 89890d88770c8a1cffbfc36fe1f4b8471fc06477
+++ enumerator.cc fa0b2faee2460ab510375b0b25badc148b658f56
@@ -76,7 +76,7 @@ bool
}
bool
-revision_enumerator::done()
+revision_enumerator::done() const
{
return revs.empty() && items.empty();
}
============================================================
--- enumerator.hh 3c5c2ee377c33482e5442e9d59e8fdeb6facf182
+++ enumerator.hh 5e52b8fae270dcabc4ada249327adcd496521a73
@@ -77,7 +77,7 @@ public:
void note_cert(revision_id const & rid,
id const & cert_hash);
void step();
- bool done();
+ bool done() const;
};
#endif // __ENUMERATOR_H__
============================================================
--- netcmd.cc b8a5d8e0ca337c099e75f2e4957400b29077ae01
+++ netcmd.cc 2863e7b2aed2d4866ad80cf92f2e08266c3d94eb
@@ -53,7 +53,7 @@ netcmd::netcmd(u8 ver)
cmd_code(error_cmd)
{}
-size_t netcmd::encoded_size()
+size_t netcmd::encoded_size() const
{
string tmp;
insert_datum_uleb128(payload.size(), tmp);
@@ -719,25 +719,24 @@ netcmd::read_usher_reply_cmd(u8 & versio
void
netcmd::read_usher_reply_cmd(u8 & version_out,
- utf8 & server, globish & pattern) const
+ utf8 & server, string & pattern) const
{
version_out = this->version;
string str;
size_t pos = 0;
extract_variable_length_string(payload, str, pos, "usher_reply netcmd, server");
server = utf8(str, origin::network);
- extract_variable_length_string(payload, str, pos, "usher_reply netcmd, pattern");
- pattern = globish(str, origin::network);
+ extract_variable_length_string(payload, pattern, pos, "usher_reply netcmd, pattern");
assert_end_of_buffer(payload, pos, "usher_reply netcmd payload");
}
void
-netcmd::write_usher_reply_cmd(utf8 const & server, globish const & pattern)
+netcmd::write_usher_reply_cmd(utf8 const & server, string const & pattern)
{
cmd_code = usher_reply_cmd;
payload.clear();
insert_variable_length_string(server(), payload);
- insert_variable_length_string(pattern(), payload);
+ insert_variable_length_string(pattern, payload);
}
============================================================
--- netcmd.hh e6e9409fb9dae822a261493efd5084e544618db7
+++ netcmd.hh 224e57110c5007f2b551d59cf1d108fa26abc37a
@@ -31,6 +31,22 @@ class app_state;
class app_state;
+namespace error_codes {
+ static const int no_error = 200;
+ static const int partial_transfer = 211;
+ static const int no_transfer = 212;
+
+ static const int not_permitted = 412;
+ static const int unknown_key = 422;
+ static const int mixing_versions = 432;
+
+ static const int role_mismatch = 512;
+ static const int bad_command = 521;
+
+ static const int failed_identification = 532;
+ //static const int bad_data = 541;
+}
+
typedef enum
{
server_voice,
@@ -95,10 +111,10 @@ public:
netcmd_code cmd_code;
std::string payload;
public:
- netcmd(u8 ver);
+ explicit netcmd(u8 ver);
netcmd_code get_cmd_code() const {return cmd_code;}
u8 get_version() const { return version; }
- size_t encoded_size();
+ size_t encoded_size() const;
bool operator==(netcmd const & other) const;
@@ -206,8 +222,8 @@ public:
void read_usher_cmd(utf8 & greeting) const;
void write_usher_cmd(utf8 const & greeting);
- void read_usher_reply_cmd(u8 & version, utf8 & server, globish & pattern) const;
- void write_usher_reply_cmd(utf8 const & server, globish const & pattern);
+ void read_usher_reply_cmd(u8 & version, utf8 & server, std::string & pattern) const;
+ void write_usher_reply_cmd(utf8 const & server, std::string const & pattern);
};
============================================================
--- netsync.cc 1a2949a97db64c97c5a0a92768ae0e6d9c676013
+++ netsync.cc ed3a26c1a06ca9dd90261cbec4ba82bb57c79ef0
@@ -13,6 +13,7 @@
#include
#include "netxx/sockopt.h"
+#include "netxx/stream.h"
#include "app_state.hh"
#include "database.hh"
@@ -21,6 +22,7 @@
#include "network/netsync_listener.hh"
#include "network/netsync_session.hh"
#include "network/reactor.hh"
+#include "network/session.hh"
#include "options.hh"
#include "platform.hh"
#include "project.hh"
@@ -117,11 +119,14 @@ call_server(options & opts,
Netxx::SockOpt socket_options(server->get_socketfd(), false);
socket_options.set_non_blocking();
- shared_ptr sess(new netsync_session(opts, lua, project, keys,
- role, client_voice,
- info.client.include_pattern,
- info.client.exclude_pattern,
- info.client.unparsed(), server));
+ shared_ptr sess(new session(opts, lua, project, keys,
+ client_voice,
+ info.client.unparsed(), server));
+ shared_ptr wrapped(new netsync_session(opts, lua, project,
+ keys, role,
+ info.client.include_pattern,
+ info.client.exclude_pattern));
+ sess->set_inner(wrapped);
reactor react;
react.add(sess, guard);
@@ -141,7 +146,7 @@ call_server(options & opts,
// exception). We call these cases E() errors.
E(false, origin::network,
F("processing failure while talking to peer %s, disconnecting")
- % sess->peer_id);
+ % sess->get_peer());
return;
}
@@ -149,7 +154,7 @@ call_server(options & opts,
E(io_ok, origin::network,
F("timed out waiting for I/O with peer %s, disconnecting")
- % sess->peer_id);
+ % sess->get_peer());
if (react.size() == 0)
{
@@ -160,27 +165,27 @@ call_server(options & opts,
// user-reported error or a clean disconnect. See protocol
// state diagram in netsync_session::process_bye_cmd.
- if (sess->protocol_state == netsync_session::confirmed_state)
+ if (sess->protocol_state == session_base::confirmed_state)
{
P(F("successful exchange with %s")
- % sess->peer_id);
+ % sess->get_peer());
return;
}
else if (sess->encountered_error)
{
P(F("peer %s disconnected after we informed them of error")
- % sess->peer_id);
+ % sess->get_peer());
return;
}
else
E(false, origin::network,
(F("I/O failure while talking to peer %s, disconnecting")
- % sess->peer_id));
+ % sess->get_peer()));
}
}
}
-static shared_ptr
+static shared_ptr
session_from_server_sync_item(options & opts,
lua_hooks & lua,
project_t & project,
@@ -216,20 +221,23 @@ session_from_server_sync_item(options &
else if (request.what == "pull")
role = sink_role;
- shared_ptr sess(new netsync_session(opts, lua,
- project, keys,
- role, client_voice,
- info.client.include_pattern,
- info.client.exclude_pattern,
- info.client.unparsed(),
- server, true));
-
+ shared_ptr
+ sess(new session(opts, lua, project, keys,
+ client_voice,
+ info.client.unparsed(), server));
+ shared_ptr
+ wrapped(new netsync_session(opts, lua, project,
+ keys, role,
+ info.client.include_pattern,
+ info.client.exclude_pattern,
+ true));
+ sess->set_inner(wrapped);
return sess;
}
catch (Netxx::NetworkException & e)
{
P(F("Network error: %s") % e.what());
- return shared_ptr();
+ return shared_ptr();
}
}
@@ -278,14 +286,14 @@ serve_connections(app_state & app,
server_initiated_sync_request request
= server_initiated_sync_requests.front();
server_initiated_sync_requests.pop_front();
- shared_ptr sess
+ shared_ptr sess
= session_from_server_sync_item(opts, lua, project, keys,
request);
if (sess)
{
react.add(sess, *guard);
- L(FL("Opened connection to %s") % sess->peer_id);
+ L(FL("Opened connection to %s") % sess->get_peer());
}
}
@@ -304,10 +312,10 @@ serve_single_connection(project_t & proj
static void
serve_single_connection(project_t & project,
- shared_ptr sess)
+ shared_ptr sess)
{
sess->begin_service();
- P(F("beginning service on %s") % sess->peer_id);
+ P(F("beginning service on %s") % sess->get_peer());
transaction_guard guard(project.db);
@@ -356,11 +364,11 @@ run_netsync_protocol(app_state & app,
if (opts.bind_stdio)
{
shared_ptr str(new Netxx::PipeStream(0,1));
- shared_ptr sess(new netsync_session(opts, lua, project, keys,
- role, server_voice,
- globish("*", origin::internal),
- globish("", origin::internal),
- "stdio", str));
+
+ shared_ptr
+ sess(new session(opts, lua, project, keys,
+ server_voice,
+ "stdio", str));
serve_single_connection(project, sess);
}
else
============================================================
--- network/automate_listener.cc e3cc9c6951bd37f909ca6d1000475595d3f5c545
+++ network/automate_listener.cc b5556012c57bf1f2b22bf55a3042d1ade1ce133e
@@ -61,6 +61,7 @@ bool automate_listener::do_io(Netxx::Pro
(new Netxx::Stream(client.get_socketfd(), timeout));
shared_ptr sess(new automate_session(app,
+ server_voice,
lexical_cast(client),
str));
I(guard);
============================================================
--- network/automate_session.cc ebc7328238ef4bf127528a736a0ec9bf46efa340
+++ network/automate_session.cc 5e6e88a11cdcf49ee651351c5d7284755c953f54
@@ -133,9 +133,10 @@ automate_session::automate_session(app_s
}
automate_session::automate_session(app_state & app,
+ protocol_voice voice,
string const & peer_id,
shared_ptr str) :
- session_base(peer_id, str),
+ session_base(voice, peer_id, str),
app(app), armed(false),
os(oss, app.opts.automate_stdio_size)
{ }
============================================================
--- network/automate_session.hh c37e12bc01cccc8c1f91b1d7c0e51dcd54aff231
+++ network/automate_session.hh b9eb6d161bfb175f616ef8fd4cdc86b33778e077
@@ -37,6 +37,7 @@ public:
automate_ostream os;
public:
automate_session(app_state & app,
+ protocol_voice voice,
std::string const & peer_id,
boost::shared_ptr str);
bool arm();
============================================================
--- network/netsync_listener.cc 8dab9290b6c5fca7ecde13045a219bb971af9fb0
+++ network/netsync_listener.cc 3f5307dd5310d07c89da689808978710c101858d
@@ -19,6 +19,7 @@
#include "network/make_server.hh"
#include "network/netsync_session.hh"
#include "network/reactor.hh"
+#include "network/session.hh"
using std::list;
using std::string;
@@ -68,14 +69,13 @@ listener::do_io(Netxx::Probe::ready_type
socket_options.set_non_blocking();
shared_ptr str =
- shared_ptr
- (new Netxx::Stream(client.get_socketfd(), timeout));
+ shared_ptr(new Netxx::Stream(client.get_socketfd(),
+ timeout));
- shared_ptr sess(new netsync_session(opts, lua, project, keys,
- role, server_voice,
- globish("*", origin::internal),
- globish("", origin::internal),
- lexical_cast(client), str));
+ shared_ptr sess(new session(opts, lua, project, keys,
+ server_voice,
+ lexical_cast(client),
+ str));
sess->begin_service();
I(guard);
react.add(sess, *guard);
============================================================
--- network/netsync_session.cc 53d10a1f77135a9f4550aac3dc78df6f4123d9d6
+++ network/netsync_session.cc d29ead19e030ce5d2f3d9072f4302ad691b0e967
@@ -38,12 +38,6 @@ using boost::shared_ptr;
-struct netsync_error
-{
- string msg;
- netsync_error(string const & s): msg(s) {}
-};
-
static inline void
require(bool check, string const & context)
{
@@ -73,26 +67,15 @@ write_pubkey(key_name const & id,
insert_variable_length_string(pub(), out);
}
-
-size_t netsync_session::session_count = 0;
-
netsync_session::netsync_session(options & opts,
lua_hooks & lua,
project_t & project,
key_store & keys,
protocol_role role,
- protocol_voice voice,
globish const & our_include_pattern,
globish const & our_exclude_pattern,
- string const & peer,
- shared_ptr sock,
bool initiated_by_server) :
- session_base(peer, sock),
- version(opts.max_netsync_version),
- max_version(opts.max_netsync_version),
- min_version(opts.min_netsync_version),
role(role),
- voice(voice),
our_include_pattern(our_include_pattern),
our_exclude_pattern(our_exclude_pattern),
our_matcher(our_include_pattern, our_exclude_pattern),
@@ -101,8 +84,6 @@ netsync_session::netsync_session(options
lua(lua),
use_transport_auth(opts.use_transport_auth),
signing_key(keys.signing_key),
- cmd_in(opts.max_netsync_version),
- armed(false),
received_remote_key(false),
session_key(constants::netsync_key_initializer),
read_hmac(netsync_session_key(constants::netsync_key_initializer),
@@ -120,14 +101,12 @@ netsync_session::netsync_session(options
certs_in(0), certs_out(0),
revs_in(0), revs_out(0),
keys_in(0), keys_out(0),
- session_id(++session_count),
saved_nonce(""),
- error_code(no_transfer),
set_totals(false),
- epoch_refiner(epoch_item, voice, *this),
- key_refiner(key_item, voice, *this),
- cert_refiner(cert_item, voice, *this),
- rev_refiner(revision_item, voice, *this),
+ epoch_refiner(epoch_item, get_voice(), *this),
+ key_refiner(key_item, get_voice(), *this),
+ cert_refiner(cert_item, get_voice(), *this),
+ rev_refiner(revision_item, get_voice(), *this),
rev_enumerator(project, *this),
initiated_by_server(initiated_by_server)
{
@@ -141,14 +120,24 @@ netsync_session::~netsync_session()
}
netsync_session::~netsync_session()
+{ }
+
+void netsync_session::on_begin(size_t ident, key_identity_info const & remote_key)
{
- if (protocol_state == confirmed_state)
- error_code = no_error;
- else if (error_code == no_transfer &&
+ lua.hook_note_netsync_start(ident, "client", role,
+ get_peer(), remote_key,
+ our_include_pattern, our_exclude_pattern);
+}
+
+void netsync_session::on_end(size_t ident)
+{
+ if (shutdown_confirmed())
+ error_code = error_codes::no_error;
+ else if (error_code == error_codes::no_transfer &&
(revs_in || revs_out ||
certs_in || certs_out ||
keys_in || keys_out))
- error_code = partial_transfer;
+ error_code = error_codes::partial_transfer;
vector unattached_written_certs;
map > rev_written_certs;
@@ -178,7 +167,7 @@ netsync_session::~netsync_session()
key_identity_info identity;
identity.id = *i;
project.complete_key_identity(keys, lua, identity);
- lua.hook_note_netsync_pubkey_received(identity, session_id);
+ lua.hook_note_netsync_pubkey_received(identity, ident);
}
//Revisions
@@ -199,7 +188,7 @@ netsync_session::~netsync_session()
revision_data rdat;
project.db.get_revision(*i, rdat);
lua.hook_note_netsync_revision_received(*i, rdat, certs,
- session_id);
+ ident);
}
//Certs (not attached to a new revision)
@@ -210,7 +199,7 @@ netsync_session::~netsync_session()
identity.id = i->key;
project.complete_key_identity(keys, lua, identity);
lua.hook_note_netsync_cert_received(revision_id(i->ident), identity,
- i->name, i->value, session_id);
+ i->name, i->value, ident);
}
}
@@ -242,7 +231,7 @@ netsync_session::~netsync_session()
key_identity_info identity;
identity.id = *i;
project.complete_key_identity(keys, lua, identity);
- lua.hook_note_netsync_pubkey_sent(identity, session_id);
+ lua.hook_note_netsync_pubkey_sent(identity, ident);
}
//Revisions
@@ -263,7 +252,7 @@ netsync_session::~netsync_session()
revision_data rdat;
project.db.get_revision(*i, rdat);
lua.hook_note_netsync_revision_sent(*i, rdat, certs,
- session_id);
+ ident);
}
//Certs (not attached to a new revision)
@@ -274,17 +263,23 @@ netsync_session::~netsync_session()
identity.id = i->key;
project.complete_key_identity(keys, lua, identity);
lua.hook_note_netsync_cert_sent(revision_id(i->ident), identity,
- i->name, i->value, session_id);
+ i->name, i->value, ident);
}
}
- lua.hook_note_netsync_end(session_id, error_code,
+ lua.hook_note_netsync_end(ident, error_code,
bytes_in, bytes_out,
certs_in, certs_out,
revs_in, revs_out,
keys_in, keys_out);
}
+string
+netsync_session::usher_reply_data() const
+{
+ return our_include_pattern();
+}
+
bool
netsync_session::process_this_rev(revision_id const & rev)
{
@@ -351,7 +346,7 @@ netsync_session::note_cert(id const & i)
key_name keyname;
rsa_pub_key junk;
project.db.get_pubkey(c.key, keyname, junk);
- if (version >= 7)
+ if (get_version() >= 7)
{
c.marshal_for_netio(keyname, str);
}
@@ -364,45 +359,8 @@ netsync_session::note_cert(id const & i)
}
-id
-netsync_session::mk_nonce()
-{
- I(this->saved_nonce().empty());
- char buf[constants::merkle_hash_length_in_bytes];
-#if BOTAN_VERSION_CODE >= BOTAN_VERSION_CODE_FOR(1,7,7)
- keys.get_rng().randomize(reinterpret_cast(buf),
- constants::merkle_hash_length_in_bytes);
-#else
- Botan::Global_RNG::randomize(reinterpret_cast(buf),
- constants::merkle_hash_length_in_bytes);
-#endif
- this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes),
- origin::internal);
- I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
- return this->saved_nonce;
-}
-
void
-netsync_session::set_session_key(string const & key)
-{
- session_key = netsync_session_key(key, origin::internal);
- read_hmac.set_key(session_key);
- write_hmac.set_key(session_key);
-}
-
-void
-netsync_session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
-{
- if (use_transport_auth)
- {
- string hmac_key;
- keys.decrypt_rsa(signing_key, hmac_key_encrypted, hmac_key);
- set_session_key(hmac_key);
- }
-}
-
-void
netsync_session::setup_client_tickers()
{
// xgettext: please use short message and try to avoid multibytes chars
@@ -434,7 +392,7 @@ bool
}
bool
-netsync_session::done_all_refinements()
+netsync_session::done_all_refinements() const
{
bool all = rev_refiner.done
&& cert_refiner.done
@@ -443,7 +401,7 @@ netsync_session::done_all_refinements()
if (all && !set_totals)
{
- L(FL("All refinements done for peer %s") % peer_id);
+ L(FL("All refinements done for peer %s") % get_peer());
if (cert_out_ticker.get())
cert_out_ticker->set_total(cert_refiner.items_to_send.size());
@@ -464,7 +422,7 @@ bool
bool
-netsync_session::received_all_items()
+netsync_session::received_all_items() const
{
if (role == source_role)
return true;
@@ -476,7 +434,7 @@ bool
}
bool
-netsync_session::finished_working()
+netsync_session::finished_working() const
{
bool all = done_all_refinements()
&& received_all_items()
@@ -486,7 +444,7 @@ bool
}
bool
-netsync_session::queued_all_items()
+netsync_session::queued_all_items() const
{
if (role == sink_role)
return true;
@@ -512,12 +470,12 @@ netsync_session::maybe_note_epochs_finis
// If we ran into an error -- say a mismatched epoch -- don't do any
// further refinements.
- if (encountered_error)
+ if (encountered_error())
return;
// But otherwise, we're ready to go. Start the next
// set of refinements.
- if (voice == client_voice)
+ if (get_voice() == client_voice)
{
L(FL("epoch refinement finished; beginning other refinements"));
key_refiner.begin_refinement();
@@ -610,38 +568,14 @@ netsync_session::note_item_sent(netcmd_i
}
}
-void
-netsync_session::write_netcmd_and_try_flush(netcmd const & cmd)
-{
- if (!encountered_error)
- {
- string buf;
- cmd.write(buf, write_hmac);
- queue_output(buf);
- }
- else
- L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
-}
-// This method triggers a special "error unwind" mode to netsync. In this
-// mode, all received data is ignored, and no new data is queued. We simply
-// stay connected long enough for the current write buffer to be flushed, to
-// ensure that our peer receives the error message.
-// Affects read_some, write_some, and process .
-void
-netsync_session::error(int errcode, string const & errmsg)
-{
- error_code = errcode;
- throw netsync_error(errmsg);
-}
-
bool
-netsync_session::do_work(transaction_guard & guard)
+netsync_session::do_work(transaction_guard & guard,
+ netcmd const * const cmd_in)
{
- if (process(guard))
+ if (!cmd_in || process(guard, *cmd_in))
{
maybe_step();
- maybe_say_goodbye(guard);
return true;
}
else
@@ -667,25 +601,6 @@ void
// senders
void
-netsync_session::queue_error_cmd(string const & errmsg)
-{
- L(FL("queueing 'error' command"));
- netcmd cmd(version);
- cmd.write_error_cmd(errmsg);
- write_netcmd_and_try_flush(cmd);
-}
-
-void
-netsync_session::queue_bye_cmd(u8 phase)
-{
- L(FL("queueing 'bye' command, phase %d")
- % static_cast(phase));
- netcmd cmd(version);
- cmd.write_bye_cmd(phase);
- write_netcmd_and_try_flush(cmd);
-}
-
-void
netsync_session::queue_done_cmd(netcmd_item_type type,
size_t n_items)
{
@@ -693,9 +608,9 @@ netsync_session::queue_done_cmd(netcmd_i
netcmd_item_type_to_string(type, typestr);
L(FL("queueing 'done' command for %s (%d items)")
% typestr % n_items);
- netcmd cmd(version);
+ netcmd cmd(get_version());
cmd.write_done_cmd(type, n_items);
- write_netcmd_and_try_flush(cmd);
+ write_netcmd(cmd);
}
void
@@ -703,55 +618,42 @@ netsync_session::queue_hello_cmd(key_nam
rsa_pub_key const & pub,
id const & nonce)
{
- netcmd cmd(version);
+ netcmd cmd(get_version());
if (use_transport_auth)
cmd.write_hello_cmd(key_name, pub, nonce);
else
cmd.write_hello_cmd(key_name, rsa_pub_key(), nonce);
- write_netcmd_and_try_flush(cmd);
+ write_netcmd(cmd);
}
void
-netsync_session::queue_anonymous_cmd(protocol_role role,
- globish const & include_pattern,
- globish const & exclude_pattern,
- id const & nonce2)
+netsync_session::request_service()
{
- netcmd cmd(version);
- rsa_oaep_sha_data hmac_key_encrypted;
- if (use_transport_auth)
- project.db.encrypt_rsa(remote_peer_key_id, nonce2(), hmac_key_encrypted);
- cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
- hmac_key_encrypted);
- write_netcmd_and_try_flush(cmd);
- set_session_key(nonce2());
-}
-void
-netsync_session::queue_auth_cmd(protocol_role role,
- globish const & include_pattern,
- globish const & exclude_pattern,
- key_id const & client,
- id const & nonce1,
- id const & nonce2,
- rsa_sha1_signature const & signature)
-{
- netcmd cmd(version);
- rsa_oaep_sha_data hmac_key_encrypted;
- I(use_transport_auth);
- project.db.encrypt_rsa(remote_peer_key_id, nonce2(), hmac_key_encrypted);
- cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
- nonce1, hmac_key_encrypted, signature);
- write_netcmd_and_try_flush(cmd);
- set_session_key(nonce2());
+ // clients always include in the synchronization set, every branch that the
+ // user requested
+ set all_branches, ok_branches;
+ project.get_branch_list(all_branches);
+ for (set::const_iterator i = all_branches.begin();
+ i != all_branches.end(); i++)
+ {
+ if (our_matcher((*i)()))
+ ok_branches.insert(*i);
+ }
+ rebuild_merkle_trees(ok_branches);
+
+ if (!initiated_by_server)
+ setup_client_tickers();
+
+ request_netsync(role, our_include_pattern, our_exclude_pattern);
}
void
netsync_session::queue_confirm_cmd()
{
- netcmd cmd(version);
+ netcmd cmd(get_version());
cmd.write_confirm_cmd();
- write_netcmd_and_try_flush(cmd);
+ write_netcmd(cmd);
}
void
@@ -764,9 +666,9 @@ netsync_session::queue_refine_cmd(refine
L(FL("queueing refinement %s of %s node '%s', level %d")
% (ty == refinement_query ? "query" : "response")
% typestr % hpref() % static_cast(node.level));
- netcmd cmd(version);
+ netcmd cmd(get_version());
cmd.write_refine_cmd(ty, node);
- write_netcmd_and_try_flush(cmd);
+ write_netcmd(cmd);
}
void
@@ -791,17 +693,17 @@ netsync_session::queue_data_cmd(netcmd_i
L(FL("queueing %d bytes of data for %s item '%s'")
% dat.size() % typestr % hid());
- netcmd cmd(version);
+ netcmd cmd(get_version());
// TODO: This pair of functions will make two copies of a large
// file, the first in cmd.write_data_cmd, and the second in
- // write_netcmd_and_try_flush when the data is copied from the
+ // write_netcmd when the data is copied from the
// cmd.payload variable to the string buffer for output. This
// double copy should be collapsed out, it may be better to use
// a string_queue for output as well as input, as that will reduce
// the amount of mallocs that happen when the string queue is large
// enough to just store the data.
cmd.write_data_cmd(type, item, dat);
- write_netcmd_and_try_flush(cmd);
+ write_netcmd(cmd);
note_item_sent(type, item);
}
@@ -832,9 +734,9 @@ netsync_session::queue_delta_cmd(netcmd_
L(FL("queueing %s delta '%s' -> '%s'")
% typestr % base_hid() % ident_hid());
- netcmd cmd(version);
+ netcmd cmd(get_version());
cmd.write_delta_cmd(type, base, ident, del);
- write_netcmd_and_try_flush(cmd);
+ write_netcmd(cmd);
note_item_sent(type, ident);
}
@@ -842,413 +744,6 @@ bool
// processors
bool
-netsync_session::process_error_cmd(string const & errmsg)
-{
- // "xxx string" with xxx being digits means there's an error code
- if (errmsg.size() > 4 && errmsg.substr(3,1) == " ")
- {
- try
- {
- int err = boost::lexical_cast(errmsg.substr(0,3));
- if (err >= 100)
- {
- error_code = err;
- throw bad_decode(F("received network error: %s")
- % errmsg.substr(4));
- }
- }
- catch (boost::bad_lexical_cast)
- { // ok, so it wasn't a number
- }
- }
- throw bad_decode(F("received network error: %s") % errmsg);
-}
-
-static const var_domain known_servers_domain = var_domain("known-servers");
-
-bool
-netsync_session::process_hello_cmd(u8 server_version,
- key_name const & their_keyname,
- rsa_pub_key const & their_key,
- id const & nonce)
-{
- I(!this->received_remote_key);
- I(this->saved_nonce().empty());
-
- // version sanity has already been checked by netcmd::read()
- L(FL("received hello command; setting version from %d to %d")
- % widen(version)
- % widen(server_version));
- version = server_version;
-
- key_identity_info their_identity;
- if (use_transport_auth)
- {
- key_hash_code(their_keyname, their_key, their_identity.id);
- var_value printable_key_hash;
- {
- hexenc encoded_key_hash;
- encode_hexenc(their_identity.id.inner(), encoded_key_hash);
- printable_key_hash = typecast_vocab(encoded_key_hash);
- }
- L(FL("server key has name %s, hash %s")
- % their_keyname % printable_key_hash);
- var_key their_key_key(known_servers_domain,
- var_name(peer_id, origin::internal));
- if (project.db.var_exists(their_key_key))
- {
- var_value expected_key_hash;
- project.db.get_var(their_key_key, expected_key_hash);
- if (expected_key_hash != printable_key_hash)
- {
- P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
- "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
- "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
- "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
- "it is also possible that the server key has just been changed\n"
- "remote host sent key %s\n"
- "I expected %s\n"
- "'%s unset %s %s' overrides this check")
- % printable_key_hash
- % expected_key_hash
- % prog_name % their_key_key.first % their_key_key.second);
- E(false, origin::network, F("server key changed"));
- }
- }
- else
- {
- P(F("first time connecting to server %s\n"
- "I'll assume it's really them, but you might want to double-check\n"
- "their key's fingerprint: %s")
- % peer_id
- % printable_key_hash);
- project.db.set_var(their_key_key, printable_key_hash);
- }
-
- if (!project.db.public_key_exists(their_identity.id))
- {
- // this should now always return true since we just checked
- // for the existence of this particular key
- I(project.db.put_key(their_keyname, their_key));
- W(F("saving public key for %s to database") % their_keyname);
- }
-
- {
- hexenc hnonce;
- encode_hexenc(nonce, hnonce);
- L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
- % printable_key_hash % hnonce);
- }
-
- I(project.db.public_key_exists(their_identity.id));
- project.complete_key_identity(keys, lua, their_identity);
-
- // save their identity
- this->received_remote_key = true;
- this->remote_peer_key_id = their_identity.id;
- }
-
- // clients always include in the synchronization set, every branch that the
- // user requested
- set all_branches, ok_branches;
- project.get_branch_list(all_branches);
- for (set::const_iterator i = all_branches.begin();
- i != all_branches.end(); i++)
- {
- if (our_matcher((*i)()))
- ok_branches.insert(*i);
- }
- rebuild_merkle_trees(ok_branches);
-
- if (!initiated_by_server)
- setup_client_tickers();
-
- if (use_transport_auth && signing_key.inner()() != "")
- {
- // get our key pair
- load_key_pair(keys, signing_key);
-
- // make a signature with it;
- // this also ensures our public key is in the database
- rsa_sha1_signature sig;
- keys.make_signature(project.db, signing_key, nonce(), sig);
-
- // make a new nonce of our own and send off the 'auth'
- queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
- signing_key, nonce, mk_nonce(), sig);
- }
- else
- {
- queue_anonymous_cmd(this->role, our_include_pattern,
- our_exclude_pattern, mk_nonce());
- }
-
- lua.hook_note_netsync_start(session_id, "client", this->role,
- peer_id, their_identity,
- our_include_pattern, our_exclude_pattern);
- return true;
-}
-
-bool
-netsync_session::process_anonymous_cmd(protocol_role their_role,
- globish const & their_include_pattern,
- globish const & their_exclude_pattern)
-{
- // Internally netsync thinks in terms of sources and sinks. Users like
- // thinking of repositories as "readonly", "readwrite", or "writeonly".
- //
- // We therefore use the read/write terminology when dealing with the UI:
- // if the user asks to run a "read only" service, this means they are
- // willing to be a source but not a sink.
- //
- // nb: The "role" here is the role the *client* wants to play
- // so we need to check that the opposite role is allowed for us,
- // in our this->role field.
- //
-
- lua.hook_note_netsync_start(session_id, "server", their_role,
- peer_id, key_identity_info(),
- their_include_pattern, their_exclude_pattern);
-
- // Client must be a sink and server must be a source (anonymous
- // read-only), unless transport auth is disabled.
- //
- // If running in no-transport-auth mode, we operate anonymously and
- // permit adoption of any role.
-
- if (use_transport_auth)
- {
- if (their_role != sink_role)
- {
- this->saved_nonce = id("");
- error(not_permitted,
- F("rejected attempt at anonymous connection for write").str());
- }
-
- if (this->role == sink_role)
- {
- this->saved_nonce = id("");
- error(role_mismatch,
- F("rejected attempt at anonymous connection while running as sink").str());
- }
- }
-
- set all_branches, ok_branches;
- project.get_branch_list(all_branches);
- globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
- for (set::const_iterator i = all_branches.begin();
- i != all_branches.end(); i++)
- {
- if (their_matcher((*i)()))
- {
- if (use_transport_auth &&
- !lua.hook_get_netsync_read_permitted((*i)()))
- {
- error(not_permitted,
- (F("anonymous access to branch '%s' denied by server")
- % *i).str());
- }
- else
- ok_branches.insert(*i);
- }
- }
-
- if (use_transport_auth)
- {
- P(F("allowed anonymous read permission for '%s' excluding '%s'")
- % their_include_pattern % their_exclude_pattern);
- this->role = source_role;
- }
- else
- {
- P(F("allowed anonymous read/write permission for '%s' excluding '%s'")
- % their_include_pattern % their_exclude_pattern);
- assume_corresponding_role(their_role);
- }
-
- rebuild_merkle_trees(ok_branches);
-
- this->remote_peer_key_id = key_id();
- this->authenticated = true;
- return true;
-}
-
-void
-netsync_session::assume_corresponding_role(protocol_role their_role)
-{
- // Assume the (possibly degraded) opposite role.
- switch (their_role)
- {
- case source_role:
- I(this->role != source_role);
- this->role = sink_role;
- break;
-
- case source_and_sink_role:
- I(this->role == source_and_sink_role);
- break;
-
- case sink_role:
- I(this->role != sink_role);
- this->role = source_role;
- break;
- }
-}
-
-bool
-netsync_session::process_auth_cmd(protocol_role their_role,
- globish const & their_include_pattern,
- globish const & their_exclude_pattern,
- key_id const & client,
- id const & nonce1,
- rsa_sha1_signature const & signature)
-{
- I(!this->received_remote_key);
- I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
-
- globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
-
- if (!project.db.public_key_exists(client))
- {
- // If it's not in the db, it still could be in the keystore if we
- // have the private key that goes with it.
- key_name their_key_id;
- keypair their_keypair;
- if (keys.maybe_get_key_pair(client, their_key_id, their_keypair))
- project.db.put_key(their_key_id, their_keypair.pub);
- else
- {
- return process_anonymous_cmd(their_role,
- their_include_pattern,
- their_exclude_pattern);
- /*
- this->saved_nonce = id("");
-
- lua.hook_note_netsync_start(session_id, "server", their_role,
- peer_id, key_name("-unknown-"),
- their_include_pattern,
- their_exclude_pattern);
- error(unknown_key,
- (F("remote public key hash '%s' is unknown")
- % client).str());
- */
- }
- }
-
- // Get their public key.
- key_name their_id;
- rsa_pub_key their_key;
- project.db.get_pubkey(client, their_id, their_key);
- key_identity_info client_identity;
- client_identity.id = client;
- project.complete_key_identity(keys, lua, client_identity);
-
- lua.hook_note_netsync_start(session_id, "server", their_role,
- peer_id, client_identity,
- their_include_pattern, their_exclude_pattern);
-
- // Check that they replied with the nonce we asked for.
- if (!(nonce1 == this->saved_nonce))
- {
- this->saved_nonce = id("");
- error(failed_identification,
- F("detected replay attack in auth netcmd").str());
- }
-
- // Internally netsync thinks in terms of sources and sinks. users like
- // thinking of repositories as "readonly", "readwrite", or "writeonly".
- //
- // We therefore use the read/write terminology when dealing with the UI:
- // if the user asks to run a "read only" service, this means they are
- // willing to be a source but not a sink.
- //
- // nb: The "their_role" here is the role the *client* wants to play
- // so we need to check that the opposite role is allowed for us,
- // in our this->role field.
-
- // Client as sink, server as source (reading).
-
- if (their_role == sink_role || their_role == source_and_sink_role)
- {
- if (this->role != source_role && this->role != source_and_sink_role)
- {
- this->saved_nonce = id("");
- error(not_permitted,
- (F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink")
- % their_id % their_include_pattern % their_exclude_pattern).str());
- }
- }
-
- set all_branches, ok_branches;
- project.get_branch_list(all_branches);
- for (set::const_iterator i = all_branches.begin();
- i != all_branches.end(); i++)
- {
- if (their_matcher((*i)()))
- {
- if (!lua.hook_get_netsync_read_permitted((*i)(), client_identity))
- {
- error(not_permitted,
- (F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'")
- % their_id % their_include_pattern % their_exclude_pattern % *i).str());
- }
- else
- ok_branches.insert(*i);
- }
- }
-
- // If we're source_and_sink_role, continue even with no branches readable
- // eg. serve --db=empty.db
- P(F("allowed '%s' read permission for '%s' excluding '%s'")
- % their_id % their_include_pattern % their_exclude_pattern);
-
- // Client as source, server as sink (writing).
-
- if (their_role == source_role || their_role == source_and_sink_role)
- {
- if (this->role != sink_role && this->role != source_and_sink_role)
- {
- this->saved_nonce = id("");
- error(not_permitted,
- (F("denied '%s' write permission for '%s' excluding '%s' while running as pure source")
- % their_id % their_include_pattern % their_exclude_pattern).str());
- }
-
- if (!lua.hook_get_netsync_write_permitted(client_identity))
- {
- this->saved_nonce = id("");
- error(not_permitted,
- (F("denied '%s' write permission for '%s' excluding '%s'")
- % their_id % their_include_pattern % their_exclude_pattern).str());
- }
-
- P(F("allowed '%s' write permission for '%s' excluding '%s'")
- % their_id % their_include_pattern % their_exclude_pattern);
- }
-
- rebuild_merkle_trees(ok_branches);
-
- this->received_remote_key = true;
-
- // Check the signature.
- if (project.db.check_signature(client, nonce1(), signature) == cert_ok)
- {
- // Get our private key and sign back.
- L(FL("client signature OK, accepting authentication"));
- this->authenticated = true;
- this->remote_peer_key_id = client;
-
- assume_corresponding_role(their_role);
- return true;
- }
- else
- {
- error(failed_identification, (F("bad client signature")).str());
- }
- return false;
-}
-
-bool
netsync_session::process_refine_cmd(refinement_type ty, merkle_node const & node)
{
string typestr;
@@ -1281,87 +776,7 @@ netsync_session::process_refine_cmd(refi
return true;
}
-bool
-netsync_session::process_bye_cmd(u8 phase,
- transaction_guard & guard)
-{
-// Ideal shutdown
-// ~~~~~~~~~~~~~~~
-//
-// I/O events state transitions
-// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
-// client: C_WORKING
-// server: S_WORKING
-// 0. [refinement, data, deltas, etc.]
-// client: C_SHUTDOWN
-// (client checkpoints here)
-// 1. client -> "bye 0"
-// 2. "bye 0" -> server
-// server: S_SHUTDOWN
-// (server checkpoints here)
-// 3. "bye 1" <- server
-// 4. client <- "bye 1"
-// client: C_CONFIRMED
-// 5. client -> "bye 2"
-// 6. "bye 2" -> server
-// server: S_CONFIRMED
-// 7. [server drops connection]
-//
-//
-// Affects of I/O errors or disconnections
-// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-// C_WORKING: report error and fault
-// S_WORKING: report error and recover
-// C_SHUTDOWN: report error and fault
-// S_SHUTDOWN: report success and recover
-// (and warn that client might falsely see error)
-// C_CONFIRMED: report success
-// S_CONFIRMED: report success
-
- switch (phase)
- {
- case 0:
- if (voice == server_voice &&
- protocol_state == working_state)
- {
- protocol_state = shutdown_state;
- guard.do_checkpoint();
- queue_bye_cmd(1);
- }
- else
- error(bad_command, "unexpected bye phase 0 received");
- break;
-
- case 1:
- if (voice == client_voice &&
- protocol_state == shutdown_state)
- {
- protocol_state = confirmed_state;
- queue_bye_cmd(2);
- }
- else
- error(bad_command, "unexpected bye phase 1 received");
- break;
-
- case 2:
- if (voice == server_voice &&
- protocol_state == shutdown_state)
- {
- protocol_state = confirmed_state;
- return false;
- }
- else
- error(bad_command, "unexpected bye phase 2 received");
- break;
-
- default:
- error(bad_command, (F("unknown bye phase %d received") % phase).str());
- }
-
- return true;
-}
-
bool
netsync_session::process_done_cmd(netcmd_item_type type, size_t n_items)
{
@@ -1401,7 +816,7 @@ void
}
void
-netsync_session::respond_to_confirm_cmd()
+netsync_session::accept_service()
{
epoch_refiner.begin_refinement();
}
@@ -1489,7 +904,7 @@ netsync_session::load_data(netcmd_item_t
key_name keyname;
rsa_pub_key junk;
project.db.get_pubkey(c.key, keyname, junk);
- if (version >= 7)
+ if (get_version() >= 7)
{
c.marshal_for_netio(keyname, out);
}
@@ -1559,12 +974,13 @@ netsync_session::process_data_cmd(netcmd
hexenc their_epoch;
encode_hexenc(i->second.inner(), my_epoch);
encode_hexenc(epoch.inner(), their_epoch);
- error(mixing_versions,
+ bool am_server = (get_voice() == server_voice);
+ error(error_codes::mixing_versions,
(F("Mismatched epoch on branch %s."
" Server has '%s', client has '%s'.")
% branch
- % (voice == server_voice ? my_epoch : their_epoch)()
- % (voice == server_voice ? their_epoch : my_epoch)()).str());
+ % (am_server ? my_epoch : their_epoch)()
+ % (am_server ? their_epoch : my_epoch)()).str());
}
}
maybe_note_epochs_finished();
@@ -1587,7 +1003,7 @@ netsync_session::process_data_cmd(netcmd
if (project.db.put_key(keyid, pub))
written_keys.push_back(key_id(item));
else
- error(partial_transfer,
+ error(error_codes::partial_transfer,
(F("Received duplicate key %s") % keyid).str());
}
break;
@@ -1597,7 +1013,7 @@ netsync_session::process_data_cmd(netcmd
cert c;
bool matched;
key_name keyname;
- if (version >= 7)
+ if (get_version() >= 7)
{
matched = cert::read_cert(project.db, dat, c, keyname);
if (!matched)
@@ -1694,25 +1110,7 @@ netsync_session::process_delta_cmd(netcm
return true;
}
-bool
-netsync_session::process_usher_cmd(utf8 const & msg)
-{
- if (msg().size())
- {
- if (msg()[0] == '!')
- P(F("Received warning from usher: %s") % msg().substr(1));
- else
- L(FL("Received greeting from usher: %s") % msg().substr(1));
- }
- netcmd cmdout(version);
- cmdout.write_usher_reply_cmd(utf8(peer_id, origin::internal),
- our_include_pattern);
- write_netcmd_and_try_flush(cmdout);
- L(FL("Sent reply."));
- return true;
-}
-
void
netsync_session::send_all_data(netcmd_item_type ty, set const & items)
{
@@ -1741,104 +1139,6 @@ netsync_session::dispatch_payload(netcmd
switch (cmd.get_cmd_code())
{
-
- case error_cmd:
- {
- string errmsg;
- cmd.read_error_cmd(errmsg);
- return process_error_cmd(errmsg);
- }
- break;
-
- case hello_cmd:
- require(! authenticated, "hello netcmd received when not authenticated");
- require(voice == client_voice, "hello netcmd received in client voice");
- {
- u8 server_version(0);
- key_name server_keyname;
- rsa_pub_key server_key;
- id nonce;
- cmd.read_hello_cmd(server_version, server_keyname, server_key, nonce);
- return process_hello_cmd(server_version, server_keyname, server_key, nonce);
- }
- break;
-
- case bye_cmd:
- require(authenticated, "bye netcmd received when not authenticated");
- {
- u8 phase;
- cmd.read_bye_cmd(phase);
- return process_bye_cmd(phase, guard);
- }
- break;
-
- case anonymous_cmd:
- require(! authenticated, "anonymous netcmd received when not authenticated");
- require(voice == server_voice, "anonymous netcmd received in server voice");
- require(role == source_role ||
- role == source_and_sink_role,
- "anonymous netcmd received in source or source/sink role");
- {
- protocol_role role;
- globish their_include_pattern, their_exclude_pattern;
- rsa_oaep_sha_data hmac_key_encrypted;
- cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
- L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
- "in %s mode\n")
- % their_include_pattern % their_exclude_pattern
- % (role == source_and_sink_role ? _("source and sink") :
- (role == source_role ? _("source") : _("sink"))));
-
- set_session_key(hmac_key_encrypted);
- if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
- return false;
- queue_confirm_cmd();
- return true;
- }
- break;
-
- case auth_cmd:
- require(! authenticated, "auth netcmd received when not authenticated");
- require(voice == server_voice, "auth netcmd received in server voice");
- {
- protocol_role role;
- rsa_sha1_signature signature;
- globish their_include_pattern, their_exclude_pattern;
- key_id client;
- id nonce1, nonce2;
- rsa_oaep_sha_data hmac_key_encrypted;
- cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
- client, nonce1, hmac_key_encrypted, signature);
-
- L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
- "exclude '%s' in %s mode with nonce1 '%s'\n")
- % client % their_include_pattern % their_exclude_pattern
- % (role == source_and_sink_role ? _("source and sink") :
- (role == source_role ? _("source") : _("sink")))
- % nonce1);
-
- set_session_key(hmac_key_encrypted);
-
- if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
- client, nonce1, signature))
- return false;
- queue_confirm_cmd();
- return true;
- }
- break;
-
- case confirm_cmd:
- require(! authenticated, "confirm netcmd received when not authenticated");
- require(voice == client_voice, "confirm netcmd received in client voice");
- {
- string signature;
- cmd.read_confirm_cmd();
- this->authenticated = true;
- respond_to_confirm_cmd();
- return true;
- }
- break;
-
case refine_cmd:
require(authenticated, "refine netcmd received when authenticated");
{
@@ -1887,62 +1187,18 @@ netsync_session::dispatch_payload(netcmd
}
break;
- case usher_cmd:
- {
- utf8 greeting;
- cmd.read_usher_cmd(greeting);
- return process_usher_cmd(greeting);
- }
- break;
-
- case usher_reply_cmd:
- {
- u8 client_version;
- utf8 server;
- globish pattern;
- cmd.read_usher_reply_cmd(client_version, server, pattern);
- return process_usher_reply_cmd(client_version, server, pattern);
- }
- break;
+ default:
+ return false;
}
return false;
}
-// This kicks off the whole cascade starting from "hello".
-void
-netsync_session::begin_service()
-{
- queue_usher_cmd(utf8("", origin::internal));
-}
-
-void
-netsync_session::queue_usher_cmd(utf8 const & message)
-{
- L(FL("queueing 'usher' command"));
- netcmd cmd(0);
- cmd.write_usher_cmd(message);
- write_netcmd_and_try_flush(cmd);
-}
-
bool
-netsync_session::process_usher_reply_cmd(u8 client_version,
- utf8 const & server,
- globish const & pattern)
+netsync_session::have_work() const
{
- // netcmd::read() has already checked that the client isn't too old
- if (client_version < max_version)
- {
- version = client_version;
- }
- L(FL("client has maximum version %d, using %d")
- % widen(client_version) % widen(version));
-
- key_name name;
- keypair kp;
- if (use_transport_auth)
- keys.get_key_pair(signing_key, name, kp);
- queue_hello_cmd(name, kp.pub, mk_nonce());
- return true;
+ return done_all_refinements()
+ && !rev_enumerator.done()
+ && !output_overfull();
}
void
@@ -1950,9 +1206,7 @@ netsync_session::maybe_step()
{
date_t start_time = date_t::now();
- while (done_all_refinements()
- && !rev_enumerator.done()
- && !output_overfull())
+ while (have_work())
{
rev_enumerator.step();
@@ -1964,83 +1218,34 @@ netsync_session::maybe_step()
}
}
-void
-netsync_session::maybe_say_goodbye(transaction_guard & guard)
+bool netsync_session::process(transaction_guard & guard,
+ netcmd const & cmd_in)
{
- if (voice == client_voice
- && protocol_state == working_state
- && finished_working())
- {
- protocol_state = shutdown_state;
- guard.do_checkpoint();
- queue_bye_cmd(0);
- }
-}
-
-bool
-netsync_session::arm()
-{
- if (!armed)
- {
- // Don't pack the buffer unnecessarily.
- if (output_overfull())
- return false;
-
- if (cmd_in.read(min_version, max_version, inbuf, read_hmac))
- {
- armed = true;
- }
- }
- return armed;
-}
-
-bool netsync_session::process(transaction_guard & guard)
-{
- if (encountered_error)
- return true;
try
{
- if (!arm())
- return true;
-
- armed = false;
- L(FL("processing %d byte input buffer from peer %s")
- % inbuf.size() % peer_id);
-
size_t sz = cmd_in.encoded_size();
bool ret = dispatch_payload(cmd_in, guard);
- if (inbuf.size() >= constants::netcmd_maxsz)
- W(F("input buffer for peer %s is overfull "
- "after netcmd dispatch") % peer_id);
-
guard.maybe_checkpoint(sz);
if (!ret)
L(FL("peer %s finishing processing with '%d' packet")
- % peer_id % cmd_in.get_cmd_code());
+ % get_peer() % cmd_in.get_cmd_code());
return ret;
}
catch (bad_decode & bd)
{
W(F("protocol error while processing peer %s: '%s'")
- % peer_id % bd.what);
+ % get_peer() % bd.what);
return false;
}
catch (recoverable_failure & rf)
{
W(F("recoverable '%s' error while processing peer %s: '%s'")
% origin::type_to_string(rf.caused_by())
- % peer_id % rf.what());
+ % get_peer() % rf.what());
return false;
}
- catch (netsync_error & err)
- {
- W(F("error: %s") % err.msg);
- queue_error_cmd(boost::lexical_cast(error_code) + " " + err.msg);
- encountered_error = true;
- return true; // Don't terminate until we've send the error_cmd.
- }
}
static void
============================================================
--- network/netsync_session.hh c4c43a8ff350ed599508742258a53b9c52a765ff
+++ network/netsync_session.hh e848afaf9518a827735efc982ebe5ca6f3ed1d57
@@ -22,7 +22,7 @@
#include "refiner.hh"
#include "ui.hh"
-#include "network/session_base.hh"
+#include "network/wrapped_session.hh"
class cert;
@@ -30,13 +30,9 @@ netsync_session:
netsync_session:
public refiner_callbacks,
public enumerator_callbacks,
- public session_base
+ public wrapped_session
{
- u8 version;
- u8 max_version;
- u8 min_version;
protocol_role role;
- protocol_voice const voice;
globish our_include_pattern;
globish our_exclude_pattern;
globish_matcher our_matcher;
@@ -48,12 +44,6 @@ netsync_session:
key_id const & signing_key;
std::vector keys_to_push;
- netcmd cmd_in;
- bool armed;
-public:
- bool arm();
-private:
-
bool received_remote_key;
key_id remote_peer_key_id;
netsync_session_key session_key;
@@ -71,11 +61,6 @@ private:
size_t certs_in, certs_out;
size_t revs_in, revs_out;
size_t keys_in, keys_out;
- // used to identify this session to the netsync hooks.
- // We can't just use saved_nonce, because that's blank for all
- // anonymous connections and could lead to confusion.
- size_t session_id;
- static size_t session_count;
// These are read from the server, written to the local database
std::vector written_revisions;
@@ -89,23 +74,9 @@ private:
id saved_nonce;
- static const int no_error = 200;
- static const int partial_transfer = 211;
- static const int no_transfer = 212;
-
- static const int not_permitted = 412;
- static const int unknown_key = 422;
- static const int mixing_versions = 432;
-
- static const int role_mismatch = 512;
- static const int bad_command = 521;
-
- static const int failed_identification = 532;
- //static const int bad_data = 541;
-
int error_code;
- bool set_totals;
+ mutable bool set_totals;
// Interface to refinement.
refiner epoch_refiner;
@@ -132,62 +103,44 @@ public:
project_t & project,
key_store & keys,
protocol_role role,
- protocol_voice voice,
globish const & our_include_pattern,
globish const & our_exclude_pattern,
- std::string const & peer,
- boost::shared_ptr sock,
bool initiated_by_server = false);
virtual ~netsync_session();
-private:
- id mk_nonce();
+ std::string usher_reply_data() const;
+ bool have_work() const;
+ void accept_service();
+ void request_service();
- void set_session_key(std::string const & key);
- void set_session_key(rsa_oaep_sha_data const & key_encrypted);
+ void on_begin(size_t ident, key_identity_info const & remote_key);
+ void on_end(size_t ident);
+private:
void setup_client_tickers();
- bool done_all_refinements();
- bool queued_all_items();
- bool received_all_items();
- bool finished_working();
+ bool done_all_refinements() const;
+ bool queued_all_items() const;
+ bool received_all_items() const;
+ bool finished_working() const;
void maybe_step();
- void maybe_say_goodbye(transaction_guard & guard);
void note_item_arrived(netcmd_item_type ty, id const & i);
void maybe_note_epochs_finished();
void note_item_sent(netcmd_item_type ty, id const & i);
public:
- bool do_work(transaction_guard & guard);
+ bool do_work(transaction_guard & guard,
+ netcmd const * const cmd_in);
private:
void note_bytes_in(int count);
void note_bytes_out(int count);
- void error(int errcode, std::string const & errmsg);
-
- void write_netcmd_and_try_flush(netcmd const & cmd);
-
// Outgoing queue-writers.
- void queue_usher_cmd(utf8 const & message);
- void queue_bye_cmd(u8 phase);
- void queue_error_cmd(std::string const & errmsg);
void queue_done_cmd(netcmd_item_type type, size_t n_items);
void queue_hello_cmd(key_name const & key_name,
rsa_pub_key const & pub_encoded,
id const & nonce);
- void queue_anonymous_cmd(protocol_role role,
- globish const & include_pattern,
- globish const & exclude_pattern,
- id const & nonce2);
- void queue_auth_cmd(protocol_role role,
- globish const & include_pattern,
- globish const & exclude_pattern,
- key_id const & client,
- id const & nonce1,
- id const & nonce2,
- rsa_sha1_signature const & signature);
void queue_confirm_cmd();
void queue_refine_cmd(refinement_type ty, merkle_node const & node);
void queue_data_cmd(netcmd_item_type type,
@@ -199,12 +152,10 @@ private:
delta const & del);
// Incoming dispatch-called methods.
- bool process_error_cmd(std::string const & errmsg);
bool process_hello_cmd(u8 server_version,
key_name const & server_keyname,
rsa_pub_key const & server_key,
id const & nonce);
- bool process_bye_cmd(u8 phase, transaction_guard & guard);
bool process_anonymous_cmd(protocol_role role,
globish const & their_include_pattern,
globish const & their_exclude_pattern);
@@ -223,18 +174,12 @@ private:
id const & base,
id const & ident,
delta const & del);
- bool process_usher_cmd(utf8 const & msg);
- bool process_usher_reply_cmd(u8 client_version,
- utf8 const & server,
- globish const & pattern);
// The incoming dispatcher.
bool dispatch_payload(netcmd const & cmd,
transaction_guard & guard);
// Various helpers.
- void assume_corresponding_role(protocol_role their_role);
- void respond_to_confirm_cmd();
bool data_exists(netcmd_item_type type,
id const & item);
void load_data(netcmd_item_type type,
@@ -244,10 +189,9 @@ private:
void rebuild_merkle_trees(std::set const & branches);
void send_all_data(netcmd_item_type ty, std::set const & items);
-public:
- void begin_service();
private:
- bool process(transaction_guard & guard);
+ bool process(transaction_guard & guard,
+ netcmd const & cmd_in);
bool initiated_by_server;
};
============================================================
--- network/session_base.cc 157c8a8f35cea36433729d2876f0a4bfb4dd5937
+++ network/session_base.cc 66e0f6b594d20aab21677ad43045d695ae83ae57
@@ -19,12 +19,14 @@ using boost::shared_ptr;
using boost::shared_ptr;
-session_base::session_base(string const & peer_id,
+session_base::session_base(protocol_voice voice,
+ string const & peer_id,
shared_ptr str) :
outbuf_bytes(0),
peer_id(peer_id), str(str),
last_io_time(::time(NULL)),
protocol_state(working_state),
+ voice(voice),
encountered_error(false)
{ }
============================================================
--- network/session_base.hh 0ffdf1741c214d0144014df839a534dc06a4b23e
+++ network/session_base.hh 300d67536599a4a9b92927c54ad7def5415301ac
@@ -17,6 +17,7 @@
#include "netxx/stream.h"
+#include "netcmd.hh" // for protocol_voice
#include "network/reactable.hh"
#include "string_queue.hh"
@@ -34,6 +35,7 @@ protected:
size_t outbuf_bytes; // so we can avoid queueing up too much stuff
protected:
void queue_output(std::string const & s);
+public:
bool output_overfull() const;
bool output_empty() const;
public:
@@ -52,9 +54,12 @@ public:
}
protocol_state;
+ protocol_voice const voice;
+
bool encountered_error;
- session_base(std::string const & peer_id,
+ session_base(protocol_voice voice,
+ std::string const & peer_id,
boost::shared_ptr str);
virtual ~session_base();
virtual bool arm() = 0;