# # # patch "netcmd.cc" # from [aa9c61e301fc6c077007dd9008f49765f839ff81] # to [5bab83a6a802053edeb3ae2338c023b1172b6bef] # # patch "netcmd.hh" # from [c8c14900e822a49270dfa3da37a2fc44323f66e6] # to [872635bda584c800fb0a4b91470d0c57e3443934] # # patch "netsync.cc" # from [9d4e651046c00670ed02483a52838e0b8d7dd460] # to [e50bb25879786008a2e41c5775791a44e3aab193] # ============================================================ --- netcmd.cc aa9c61e301fc6c077007dd9008f49765f839ff81 +++ netcmd.cc 5bab83a6a802053edeb3ae2338c023b1172b6bef @@ -49,7 +49,7 @@ netcmd::netcmd() : version(constants::ne cmd_code(error_cmd) {} -size_t netcmd::encoded_size() +size_t netcmd::encoded_size() const { string tmp; insert_datum_uleb128(payload.size(), tmp); ============================================================ --- netcmd.hh c8c14900e822a49270dfa3da37a2fc44323f66e6 +++ netcmd.hh 872635bda584c800fb0a4b91470d0c57e3443934 @@ -75,7 +75,7 @@ public: public: netcmd(); netcmd_code get_cmd_code() const {return cmd_code;} - size_t encoded_size(); + size_t encoded_size() const; bool operator==(netcmd const & other) const; ============================================================ --- netsync.cc 9d4e651046c00670ed02483a52838e0b8d7dd460 +++ netsync.cc e50bb25879786008a2e41c5775791a44e3aab193 @@ -277,33 +277,85 @@ public: public: app_state & app; - string peer_id; + string const peer_id; shared_ptr str; private: - string_queue inbuf; - // deque of pair - deque< pair > outbuf; - // the total data stored in outbuf - this is - // used as a valve to stop too much data - // backing up - size_t outbuf_size; + class input_manager + { + string_queue buffer; + netcmd cmd; + bool have_cmd; - netcmd cmd; - bool armed; + chained_hmac read_hmac; + + public: + input_manager(bool use_transport_auth) + : have_cmd(false), + read_hmac(constants::netsync_key_initializer, use_transport_auth) + {} + inline bool full() const { return buffer.size() >= constants::netcmd_maxsz; } + inline bool have_netcmd() + { + if (!have_cmd) + { + have_cmd = cmd.read(buffer, read_hmac); + } + return have_cmd; + } + inline void get_netcmd(netcmd & c) + { + I(have_cmd); + c = cmd; + have_cmd = false; + } + inline size_t size() const + { + return buffer.size() + (have_cmd ? cmd.encoded_size() : 0); + } + inline void set_hmac_key(netsync_session_key const & key) + { read_hmac.set_key(key); } + Netxx::signed_size_type read_some_from(shared_ptr str); + }; + input_manager input; + + class output_manager + { + // deque of pair + deque< pair > buffer; + // the total data stored in outbuf - this is + // used as a valve to stop too much data + // backing up + size_t buffer_size; + + chained_hmac write_hmac; + + public: + output_manager(bool use_transport_auth) + : buffer_size(0), + write_hmac(constants::netsync_key_initializer, use_transport_auth) + {} + inline bool full() const { return buffer_size > constants::bufsz * 10; } + inline bool empty() const { return buffer_size == 0; } + inline void set_hmac_key(netsync_session_key const & key) + { write_hmac.set_key(key); } + void queue_netcmd(netcmd const & cmd); + Netxx::signed_size_type write_some_to(shared_ptr str); + }; + output_manager output; + public: - bool arm(); + bool armed(); private: id remote_peer_key_hash; rsa_keypair_id remote_peer_key_name; - netsync_session_key session_key; - chained_hmac read_hmac; - chained_hmac write_hmac; bool authenticated; + time_t last_io_time; public: - time_t last_io_time; + bool io_idle_timeout(time_t now, unsigned long timeout); + private: auto_ptr byte_in_ticker; auto_ptr byte_out_ticker; @@ -403,9 +455,11 @@ private: bool queued_all_items(); bool received_all_items(); bool finished_working(); -public: + + bool can_step(); void maybe_step(); +public: void maybe_say_goodbye(transaction_guard & guard); private: @@ -414,7 +468,7 @@ public: void note_item_sent(netcmd_item_type ty, id const & i); public: - Netxx::Probe::ready_type which_events() const; + Netxx::Probe::ready_type which_events(); bool read_some(); bool write_some(); private: @@ -521,14 +575,10 @@ session::session(protocol_role role, app(app), peer_id(peer), str(sock), - inbuf(), - outbuf_size(0), - armed(false), + input(app.opts.use_transport_auth), + output(app.opts.use_transport_auth), remote_peer_key_hash(""), remote_peer_key_name(""), - session_key(constants::netsync_key_initializer), - read_hmac(constants::netsync_key_initializer, app.opts.use_transport_auth), - write_hmac(constants::netsync_key_initializer, app.opts.use_transport_auth), authenticated(false), last_io_time(::time(NULL)), byte_in_ticker(NULL), @@ -750,12 +800,19 @@ session::mark_recent_io() last_io_time = ::time(NULL); } +bool +session::io_idle_timeout(time_t now, unsigned long timeout) +{ + return static_cast(last_io_time + timeout) + < static_cast(now); +} + void session::set_session_key(string const & key) { - session_key = netsync_session_key(key); - read_hmac.set_key(session_key); - write_hmac.set_key(session_key); + netsync_session_key session_key = netsync_session_key(key); + input.set_hmac_key(session_key); + output.set_hmac_key(session_key); } void @@ -979,14 +1036,20 @@ void } void +session::output_manager::queue_netcmd(netcmd const & cmd) +{ + string buf; + cmd.write(buf, write_hmac); + buffer.push_back(make_pair(buf, 0)); + buffer_size += buf.size(); +} + +void session::write_netcmd_and_try_flush(netcmd const & cmd) { if (!encountered_error) { - string buf; - cmd.write(buf, write_hmac); - outbuf.push_back(make_pair(buf, 0)); - outbuf_size += buf.size(); + output.queue_netcmd(cmd); } else L(FL("dropping outgoing netcmd (because we're in error unwind mode)")); @@ -1009,31 +1072,54 @@ Netxx::Probe::ready_type } Netxx::Probe::ready_type -session::which_events() const +session::which_events() { - // Only ask to read if we're not armed. - if (outbuf.empty()) + Netxx::Probe::ready_type which = Netxx::Probe::ready_oobd; + + // Don't ask to read if we still have unprocessed input. + if (!input.full() && !input.have_netcmd()) { - if (inbuf.size() < constants::netcmd_maxsz && !armed) - return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd; - else - return Netxx::Probe::ready_oobd; + which = which | Netxx::Probe::ready_read; } - else + + if (!output.empty()) { - if (inbuf.size() < constants::netcmd_maxsz && !armed) - return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd; - else - return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd; + which = which | Netxx::Probe::ready_write; } + + return which; } +Netxx::signed_size_type +session::input_manager::read_some_from(shared_ptr str) +{ + I(!full()); + char tmp[constants::bufsz]; + Netxx::signed_size_type count = str->read(tmp, sizeof(tmp)); + + if (count > 0) + { + buffer.append(tmp,count); + } + + return count; +} + bool session::read_some() { - I(inbuf.size() < constants::netcmd_maxsz); - char tmp[constants::bufsz]; - Netxx::signed_size_type count = str->read(tmp, sizeof(tmp)); + I(!input.full()); + Netxx::signed_size_type count; + if (!encountered_error) + { + count = input.read_some_from(str); + } + else + { + char bit_bucket[constants::bufsz]; + count = str->read(bit_bucket, sizeof(bit_bucket)); + } + if (count > 0) { L(FL("read %d bytes from fd %d (peer %s)") % count % str->get_socketfd() % peer_id); @@ -1042,7 +1128,6 @@ session::read_some() L(FL("in error unwind mode, so throwing them into the bit bucket")); return true; } - inbuf.append(tmp,count); mark_recent_io(); if (byte_in_ticker.get() != NULL) (*byte_in_ticker) += count; @@ -1053,32 +1138,45 @@ session::read_some() return false; } -bool -session::write_some() +Netxx::signed_size_type +session::output_manager::write_some_to(shared_ptr str) { - I(!outbuf.empty()); - size_t writelen = outbuf.front().first.size() - outbuf.front().second; - Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second, + I(!buffer.empty()); + string & to_write(buffer.front().first); + size_t & writepos(buffer.front().second); + size_t writelen = to_write.size() - writepos; + Netxx::signed_size_type count = str->write(to_write.data() + writepos, min(writelen, constants::bufsz)); if (count > 0) { if ((size_t)count == writelen) { - outbuf_size -= outbuf.front().first.size(); - outbuf.pop_front(); + buffer_size -= to_write.size(); + buffer.pop_front(); } else { - outbuf.front().second += count; + writepos += count; } + } + return count; +} + +bool +session::write_some() +{ + I(!output.empty()); + Netxx::signed_size_type count = output.write_some_to(str); + if (count > 0) + { L(FL("wrote %d bytes to fd %d (peer %s)") % count % str->get_socketfd() % peer_id); mark_recent_io(); if (byte_out_ticker.get() != NULL) (*byte_out_ticker) += count; bytes_out += count; - if (encountered_error && outbuf.empty()) + if (encountered_error && output.empty()) { // we've flushed our error message, so it's time to get out. L(FL("finished flushing output queue in error unwind mode, disconnecting")); @@ -1132,6 +1230,7 @@ session::queue_hello_cmd(rsa_keypair_id rsa_pub_key pub; if (app.opts.use_transport_auth) decode_base64(pub_encoded, pub); + netcmd cmd; cmd.write_hello_cmd(key_name, pub, nonce); write_netcmd_and_try_flush(cmd); } @@ -2262,12 +2361,18 @@ session::begin_service() queue_hello_cmd(app.opts.signing_key, kp.pub, mk_nonce()); } +bool +session::can_step() +{ + return done_all_refinements() + && !rev_enumerator.done() + && !output.full(); +} + void session::maybe_step() { - while (done_all_refinements() - && !rev_enumerator.done() - && outbuf_size < constants::bufsz * 10) + while (can_step()) { rev_enumerator.step(); } @@ -2287,20 +2392,19 @@ bool } bool -session::arm() +session::armed() { - if (!armed) - { - // Don't pack the buffer unnecessarily. - if (outbuf_size > constants::bufsz * 10) - return false; + // Don't pack the buffer unnecessarily. + if (output.full()) + return false; - if (cmd.read(inbuf, read_hmac)) - { - armed = true; - } - } - return armed; + if (input.have_netcmd()) + return true; + + if (can_step()) + return true; + + return false; } bool session::process(transaction_guard & guard) @@ -2309,17 +2413,24 @@ bool session::process(transaction_guard return true; try { - if (!arm()) + if (!armed()) return true; - armed = false; + maybe_step(); + + if (!input.have_netcmd()) + return true; + L(FL("processing %d byte input buffer from peer %s") - % inbuf.size() % peer_id); + % input.size() % peer_id); + netcmd cmd; + input.get_netcmd(cmd); + size_t sz = cmd.encoded_size(); bool ret = dispatch_payload(cmd, guard); - if (inbuf.size() >= constants::netcmd_maxsz) + if (input.full()) W(F("input buffer for peer %s is overfull " "after netcmd dispatch") % peer_id); @@ -2427,7 +2538,7 @@ call_server(protocol_role role, bool armed = false; try { - armed = sess.arm(); + armed = sess.armed(); } catch (bad_decode & bd) { @@ -2435,7 +2546,6 @@ call_server(protocol_role role, % sess.peer_id % bd.what); } - sess.maybe_step(); sess.maybe_say_goodbye(guard); probe.clear(); @@ -2551,10 +2661,9 @@ arm_sessions_and_calculate_probe(Netxx:: shared_ptr >::const_iterator i = sessions.begin(); i != sessions.end(); ++i) { - i->second->maybe_step(); try { - if (i->second->arm()) + if (i->second->armed()) { L(FL("fd %d is armed") % i->first); armed_sessions.insert(i->first); @@ -2627,7 +2736,7 @@ handle_read_available(Netxx::socket_type { try { - if (sess->arm()) + if (sess->armed()) armed_sessions.insert(fd); } catch (bad_decode & bd) @@ -2732,8 +2841,7 @@ reap_dead_sessions(map >::const_iterator i = sessions.begin(); i != sessions.end(); ++i) { - if (static_cast(i->second->last_io_time + timeout_seconds) - < static_cast(now)) + if (i->second->io_idle_timeout(now, timeout_seconds)) { P(F("fd %d (peer %s) has been idle too long, disconnecting") % i->first % i->second->peer_id);