# # # patch "netsync.cc" # from [e7e33910da65973573d29cc2fc69c5bff5aec228] # to [9b3abe03331c5a761b92cf0537c739a3057caed7] # ============================================================ --- netsync.cc e7e33910da65973573d29cc2fc69c5bff5aec228 +++ netsync.cc 9b3abe03331c5a761b92cf0537c739a3057caed7 @@ -312,10 +312,45 @@ struct netsync_error netsync_error(string const & s): msg(s) {} }; +class session_base +{ +public: + string peer_id; + shared_ptr str; + time_t last_io_time; + + enum + { + working_state, + shutdown_state, + confirmed_state + } + protocol_state; + + bool encountered_error; + + session_base(string const & peer_id, + shared_ptr str) : + peer_id(peer_id), str(str), + last_io_time(::time(NULL)), + protocol_state(working_state), + encountered_error(false) + { } + virtual ~session_base() + { } + virtual bool arm() = 0; + virtual Netxx::Probe::ready_type which_events() = 0; + virtual bool do_io(Netxx::Probe::ready_type) = 0; + virtual bool do_work(transaction_guard & guard) = 0; + + //virtual void begin_service(); +}; + class session: public refiner_callbacks, - public enumerator_callbacks + public enumerator_callbacks, + public session_base { protocol_role role; protocol_voice const voice; @@ -323,20 +358,13 @@ session: globish our_exclude_pattern; globish_matcher our_matcher; -public: project_t & project; -private: key_store & keys; lua_hooks & lua; bool use_transport_auth; rsa_keypair_id const & signing_key; vector const & keys_to_push; -public: - string peer_id; - shared_ptr str; -private: - string_queue inbuf; // deque of pair deque< pair > outbuf; @@ -358,9 +386,6 @@ private: chained_hmac write_hmac; bool authenticated; -public: - time_t last_io_time; -private: auto_ptr byte_in_ticker; auto_ptr byte_out_ticker; auto_ptr cert_in_ticker; @@ -389,18 +414,6 @@ private: id saved_nonce; -public: - enum - { - working_state, - shutdown_state, - confirmed_state - } - protocol_state; - - bool encountered_error; -private: - static const int no_error = 200; static const int partial_transfer = 211; static const int no_transfer = 212; @@ -574,6 +587,7 @@ session::session(options & opts, string const & peer, shared_ptr sock, bool initiated_by_server) : + session_base(peer, sock), role(role), voice(voice), our_include_pattern(our_include_pattern), @@ -585,8 +599,6 @@ session::session(options & opts, use_transport_auth(opts.use_transport_auth), signing_key(opts.signing_key), keys_to_push(opts.keys_to_push), - peer_id(peer), - str(sock), inbuf(), outbuf_size(0), armed(false), @@ -598,7 +610,6 @@ session::session(options & opts, write_hmac(netsync_session_key(constants::netsync_key_initializer), use_transport_auth), authenticated(false), - last_io_time(::time(NULL)), byte_in_ticker(NULL), byte_out_ticker(NULL), cert_in_ticker(NULL), @@ -611,8 +622,6 @@ session::session(options & opts, keys_in(0), keys_out(0), session_id(++session_count), saved_nonce(""), - protocol_state(working_state), - encountered_error(false), error_code(no_transfer), set_totals(false), epoch_refiner(epoch_item, voice, *this), @@ -2585,8 +2594,22 @@ class reactor; class reactor; -class listener +class listener_base { +public: + shared_ptr srv; + listener_base(shared_ptr srv) + : srv(srv) + { + } + virtual ~listener_base() + { + } + virtual bool do_io(Netxx::Probe::ready_type event) = 0; +}; + +class listener : public listener_base +{ options & opts; lua_hooks & lua; project_t & project; @@ -2600,7 +2623,6 @@ public: shared_ptr &guard; Netxx::Address addr; public: - shared_ptr srv; listener(options & opts, lua_hooks & lua, @@ -2611,14 +2633,15 @@ public: std::list const & addresses, shared_ptr &guard, bool use_ipv6) - : opts(opts), lua(lua), project(project), keys(keys), + : listener_base(shared_ptr()), + opts(opts), lua(lua), project(project), keys(keys), react(react), role(role), timeout(static_cast(constants::netsync_timeout_seconds)), guard(guard), - addr(use_ipv6), - srv(make_server(addresses, constants::netsync_default_port, - timeout, use_ipv6, addr)) + addr(use_ipv6) { + srv = make_server(addresses, constants::netsync_default_port, + timeout, use_ipv6, addr); } bool do_io(Netxx::Probe::ready_type event); @@ -2630,15 +2653,15 @@ class reactor Netxx::Timeout forever, timeout, instant; Netxx::PipeCompatibleProbe probe; - set > sessions; - set > listeners; + set > sessions; + set > listeners; - map > session_lookup; - map > listener_lookup; + map > session_lookup; + map > listener_lookup; bool readying; int have_armed; - void ready_for_io(shared_ptr sess, transaction_guard & guard) + void ready_for_io(shared_ptr sess, transaction_guard & guard) { if (sess->do_work(guard)) { @@ -2677,7 +2700,7 @@ class reactor remove(sess); } } - void ready_for_io(shared_ptr listen, transaction_guard & guard) + void ready_for_io(shared_ptr listen, transaction_guard & guard) { if (sessions.size() >= constants::netsync_connection_limit) { @@ -2702,7 +2725,7 @@ public: readying(false), have_armed(0) { } - void add(shared_ptr sess, transaction_guard & guard) + void add(shared_ptr sess, transaction_guard & guard) { I(!have_pipe); if (sess->str->get_socketfd() == -1) @@ -2715,25 +2738,25 @@ public: if (readying) ready_for_io(sess, guard); } - void add(shared_ptr listen, transaction_guard & guard) + void add(shared_ptr listen, transaction_guard & guard) { I(!have_pipe); listeners.insert(listen); if (readying) ready_for_io(listen, guard); } - void remove(shared_ptr sess) + void remove(shared_ptr sess) { - set >::iterator i = sessions.find(sess); + set >::iterator i = sessions.find(sess); if (i != sessions.end()) { sessions.erase(i); have_pipe = false; } } - void remove(shared_ptr listen) + void remove(shared_ptr listen) { - set >::iterator i = listeners.find(listen); + set >::iterator i = listeners.find(listen); if (i != listeners.end()) listeners.erase(i); } @@ -2750,16 +2773,16 @@ public: probe.clear(); session_lookup.clear(); - set > s_todo = sessions; - for (set >::iterator i = s_todo.begin(); + set > s_todo = sessions; + for (set >::iterator i = s_todo.begin(); i != s_todo.end(); ++i) { ready_for_io(*i, guard); } listener_lookup.clear(); - set > l_todo = listeners; - for (set >::iterator i = l_todo.begin(); + set > l_todo = listeners; + for (set >::iterator i = l_todo.begin(); i != l_todo.end(); ++i) { ready_for_io(*i, guard); @@ -2796,9 +2819,9 @@ public: timed_out = false; - map >::iterator s + map >::iterator s = session_lookup.find(fd); - map >::iterator l + map >::iterator l = listener_lookup.find(fd); if (s != session_lookup.end()) { @@ -2832,8 +2855,8 @@ public: void prune() { time_t now = ::time(NULL); - set > s_todo = sessions; - for (set >::iterator i = s_todo.begin(); + set > s_todo = sessions; + for (set >::iterator i = s_todo.begin(); i != s_todo.end(); ++i) { if (static_cast((*i)->last_io_time + constants::netsync_timeout_seconds) @@ -2878,7 +2901,7 @@ listener::do_io(Netxx::Probe::ready_type lexical_cast(client), str)); sess->begin_service(); I(guard); - react.add(sess, *guard); + react.add(shared_ptr(sess), *guard); } return true; } @@ -2953,7 +2976,7 @@ call_server(options & opts, info.client.unparsed(), server)); reactor react; - react.add(sess, guard); + react.add(shared_ptr(sess), guard); while (true) { @@ -3081,7 +3104,7 @@ serve_connections(options & opts, shared_ptr listen(new listener(opts, lua, project, keys, react, role, addresses, guard, use_ipv6)); - react.add(listen, *guard); + react.add(shared_ptr(listen), *guard); while (true) @@ -3104,7 +3127,7 @@ serve_connections(options & opts, if (sess) { - react.add(sess, *guard); + react.add(shared_ptr(sess), *guard); L(FL("Opened connection to %s") % sess->peer_id); } } @@ -3123,15 +3146,16 @@ static void } static void -serve_single_connection(shared_ptr sess) +serve_single_connection(project_t & project, + shared_ptr sess) { sess->begin_service(); P(F("beginning service on %s") % sess->peer_id); - transaction_guard guard(sess->project.db); + transaction_guard guard(project.db); reactor react; - react.add(sess, guard); + react.add(shared_ptr(sess), guard); while (react.size() > 0) { @@ -3351,7 +3375,7 @@ run_netsync_protocol(options & opts, lua role, server_voice, globish("*"), globish(""), "stdio", str)); - serve_single_connection(sess); + serve_single_connection(project, sess); } else serve_connections(opts, lua, project, keys,