# # # patch "netsync.cc" # from [228c905f8c8b1d41bb30455ebfc4e805dddbd231] # to [e7e33910da65973573d29cc2fc69c5bff5aec228] # ============================================================ --- netsync.cc 228c905f8c8b1d41bb30455ebfc4e805dddbd231 +++ netsync.cc e7e33910da65973573d29cc2fc69c5bff5aec228 @@ -1108,18 +1108,51 @@ session::do_io(Netxx::Probe::ready_type session::do_io(Netxx::Probe::ready_type what) { bool ok = true; - if (what & Netxx::Probe::ready_read) + try { - if (!read_some()) - ok = false; + if (what & Netxx::Probe::ready_read) + { + if (!read_some()) + ok = false; + } + if (what & Netxx::Probe::ready_write) + { + if (!write_some()) + ok = false; + } + + if (what & Netxx::Probe::ready_oobd) + { + P(F("got OOB from peer %s, disconnecting") + % peer_id); + ok = false; + } + else if (!ok) + { + switch (protocol_state) + { + case working_state: + P(F("peer %s IO failed in working state (error)") + % peer_id); + break; + + case shutdown_state: + P(F("peer %s IO failed in shutdown state " + "(possibly client misreported error)") + % peer_id); + break; + + case confirmed_state: + P(F("peer %s IO failed in confirmed state (success)") + % peer_id); + break; + } + } } - if (what & Netxx::Probe::ready_write) + catch (Netxx::Exception & e) { - if (!write_some()) - ok = false; - } - if (what & Netxx::Probe::ready_oobd) - { + P(F("Network error on peer %s, disconnecting") + % peer_id); ok = false; } return ok; @@ -2484,7 +2517,373 @@ bool session::process(transaction_guard } } +static shared_ptr +make_server(std::list const & addresses, + Netxx::port_type default_port, + Netxx::Timeout timeout, + bool use_ipv6, + Netxx::Address & addr) +{ + try + { + addr = Netxx::Address(use_ipv6); + if (addresses.empty()) + addr.add_all_addresses(default_port); + else + { + for (std::list::const_iterator it = addresses.begin(); + it != addresses.end(); ++it) + { + const utf8 & address = *it; + if (!address().empty()) + { + size_t l_colon = address().find(':'); + size_t r_colon = address().rfind(':'); + + if (l_colon == r_colon && l_colon == 0) + { + // can't be an IPv6 address as there is only one colon + // must be a : followed by a port + string port_str = address().substr(1); + addr.add_all_addresses(std::atoi(port_str.c_str())); + } + else + addr.add_address(address().c_str(), default_port); + } + } + } + shared_ptr ret(new Netxx::StreamServer(addr, timeout)); + + char const * name; + name = addr.get_name(); + P(F("beginning service on %s : %s") + % (name != NULL ? name : _("")) + % lexical_cast(addr.get_port())); + + return ret; + } + // If we use IPv6 and the initialisation of server fails, we want + // to try again with IPv4. The reason is that someone may have + // downloaded a IPv6-enabled monotone on a system that doesn't + // have IPv6, and which might fail therefore. + catch(Netxx::NetworkException & e) + { + if (use_ipv6) + return make_server(addresses, default_port, timeout, false, addr); + else + throw; + } + catch(Netxx::Exception & e) + { + if (use_ipv6) + return make_server(addresses, default_port, timeout, false, addr); + else + throw; + } +} + +class reactor; + +class listener +{ + options & opts; + lua_hooks & lua; + project_t & project; + key_store & keys; + + reactor & react; + + protocol_role role; + Netxx::Timeout timeout; + + shared_ptr &guard; + Netxx::Address addr; +public: + shared_ptr srv; + + listener(options & opts, + lua_hooks & lua, + project_t & project, + key_store & keys, + reactor & react, + protocol_role role, + std::list const & addresses, + shared_ptr &guard, + bool use_ipv6) + : opts(opts), lua(lua), project(project), keys(keys), + react(react), role(role), + timeout(static_cast(constants::netsync_timeout_seconds)), + guard(guard), + addr(use_ipv6), + srv(make_server(addresses, constants::netsync_default_port, + timeout, use_ipv6, addr)) + { + } + + bool do_io(Netxx::Probe::ready_type event); +}; + +class reactor +{ + bool have_pipe; + Netxx::Timeout forever, timeout, instant; + + Netxx::PipeCompatibleProbe probe; + set > sessions; + set > listeners; + + map > session_lookup; + map > listener_lookup; + + bool readying; + int have_armed; + void ready_for_io(shared_ptr sess, transaction_guard & guard) + { + if (sess->do_work(guard)) + { + try + { + if (sess->arm()) + { + ++have_armed; + } + probe.add(*sess->str, sess->which_events()); + if (sess->str->get_socketfd() == -1) + { + shared_ptr pipe = + boost::dynamic_pointer_cast(sess->str); + I(pipe); + session_lookup.insert(make_pair(pipe->get_readfd(), + sess)); + session_lookup.insert(make_pair(pipe->get_writefd(), + sess)); + } + else + { + session_lookup.insert(make_pair(sess->str->get_socketfd(), + sess)); + } + } + catch (bad_decode & bd) + { + W(F("protocol error while processing peer %s: '%s'") + % sess->peer_id % bd.what); + remove(sess); + } + } + else + { + remove(sess); + } + } + void ready_for_io(shared_ptr listen, transaction_guard & guard) + { + if (sessions.size() >= constants::netsync_connection_limit) + { + W(F("session limit %d reached, some connections " + "will be refused") % constants::netsync_connection_limit); + } + else + { + probe.add(*listen->srv); + } + vector ss = listen->srv->get_probe_info()->get_sockets(); + for (vector::iterator i = ss.begin(); i != ss.end(); ++i) + { + listener_lookup.insert(make_pair(*i, listen)); + } + } +public: + reactor() + : have_pipe(false), + timeout(static_cast(constants::netsync_timeout_seconds)), + instant(0,1), + readying(false), + have_armed(0) + { } + void add(shared_ptr sess, transaction_guard & guard) + { + I(!have_pipe); + if (sess->str->get_socketfd() == -1) + { + I(sessions.size() == 0); + I(listeners.size() == 0); + have_pipe = true; + } + sessions.insert(sess); + if (readying) + ready_for_io(sess, guard); + } + void add(shared_ptr listen, transaction_guard & guard) + { + I(!have_pipe); + listeners.insert(listen); + if (readying) + ready_for_io(listen, guard); + } + void remove(shared_ptr sess) + { + set >::iterator i = sessions.find(sess); + if (i != sessions.end()) + { + sessions.erase(i); + have_pipe = false; + } + } + void remove(shared_ptr listen) + { + set >::iterator i = listeners.find(listen); + if (i != listeners.end()) + listeners.erase(i); + } + + int size() const + { + return sessions.size() + listeners.size(); + } + + void ready(transaction_guard & guard) + { + readying = true; + have_armed = 0; + + probe.clear(); + session_lookup.clear(); + set > s_todo = sessions; + for (set >::iterator i = s_todo.begin(); + i != s_todo.end(); ++i) + { + ready_for_io(*i, guard); + } + + listener_lookup.clear(); + set > l_todo = listeners; + for (set >::iterator i = l_todo.begin(); + i != l_todo.end(); ++i) + { + ready_for_io(*i, guard); + } + } + bool do_io() + { + // so it doesn't get reset under us if we drop the session + bool pipe = have_pipe; + readying = false; + bool timed_out = true; + Netxx::Timeout how_long; + if (sessions.empty()) + how_long = forever; + else if (have_armed > 0) + { + how_long = instant; + timed_out = false; + } + else + how_long = timeout; + + L(FL("i/o probe with %d armed") % have_armed); + Netxx::socket_type fd; + do + { + Netxx::Probe::result_type res = probe.ready(how_long); + how_long = instant; + fd = res.first; + Netxx::Probe::ready_type event = res.second; + + if (fd == -1) + break; + + timed_out = false; + + map >::iterator s + = session_lookup.find(fd); + map >::iterator l + = listener_lookup.find(fd); + if (s != session_lookup.end()) + { + if (sessions.find(s->second) != sessions.end()) + { + if (!s->second->do_io(event)) + { + remove(s->second); + } + } + else + { + L(FL("Got i/o on dead peer %s") % s->second->peer_id); + } + if (!pipe) + probe.remove(*s->second->str); + } + else if (l != listener_lookup.end()) + { + l->second->do_io(event); + probe.remove(*l->second->srv); + } + else + { + L(FL("got woken up for action on unknown fd %d") % fd); + } + } + while (fd != -1 && !pipe); + return !timed_out; + } + void prune() + { + time_t now = ::time(NULL); + set > s_todo = sessions; + for (set >::iterator i = s_todo.begin(); + i != s_todo.end(); ++i) + { + if (static_cast((*i)->last_io_time + constants::netsync_timeout_seconds) + < static_cast(now)) + { + P(F("peer %s has been idle too long, disconnecting") + % (*i)->peer_id); + remove(*i); + } + } + } +}; + +bool +listener::do_io(Netxx::Probe::ready_type event) +{ + L(FL("accepting new connection on %s : %s") + % (addr.get_name()?addr.get_name():"") % lexical_cast(addr.get_port())); + Netxx::Peer client = srv->accept_connection(); + + if (!client) + { + L(FL("accept() returned a dead client")); + } + else + { + P(F("accepted new client connection from %s : %s") + % client.get_address() % lexical_cast(client.get_port())); + + // 'false' here means not to revert changes when the SockOpt + // goes out of scope. + Netxx::SockOpt socket_options(client.get_socketfd(), false); + socket_options.set_non_blocking(); + + shared_ptr str = + shared_ptr + (new Netxx::Stream(client.get_socketfd(), timeout)); + + shared_ptr sess(new session(opts, lua, project, keys, + role, server_voice, + globish("*"), globish(""), + lexical_cast(client), str)); + sess->begin_service(); + I(guard); + react.add(sess, *guard); + } + return true; +} + + static shared_ptr build_stream_to_server(options & opts, lua_hooks & lua, netsync_connection_info info, @@ -2526,20 +2925,19 @@ call_server(options & opts, project_t & project, key_store & keys, protocol_role role, - netsync_connection_info const & info, - Netxx::port_type default_port, - unsigned long timeout_seconds) + netsync_connection_info const & info) { Netxx::PipeCompatibleProbe probe; transaction_guard guard(project.db); - Netxx::Timeout timeout(static_cast(timeout_seconds)), instant(0,1); + Netxx::Timeout timeout(static_cast(constants::netsync_timeout_seconds)), + instant(0,1); P(F("connecting to %s") % info.client.unparsed); shared_ptr server = build_stream_to_server(opts, lua, - info, default_port, + info, constants::netsync_default_port, timeout); @@ -2548,26 +2946,20 @@ call_server(options & opts, Netxx::SockOpt socket_options(server->get_socketfd(), false); socket_options.set_non_blocking(); - session sess(opts, lua, project, keys, - role, client_voice, - info.client.include_pattern, - info.client.exclude_pattern, - info.client.unparsed(), server); + shared_ptr sess(new session(opts, lua, project, keys, + role, client_voice, + info.client.include_pattern, + info.client.exclude_pattern, + info.client.unparsed(), server)); + reactor react; + react.add(sess, guard); + while (true) { - bool armed = false; - try - { - armed = sess.arm(); - } - catch (bad_decode & bd) - { - E(false, F("protocol error while processing peer %s: '%s'") - % sess.peer_id % bd.what); - } + react.ready(guard); - if (!sess.do_work(guard)) + if (react.size() == 0) { // Commit whatever work we managed to accomplish anyways. guard.commit(); @@ -2578,24 +2970,17 @@ call_server(options & opts, // exception). We call these cases E() errors. E(false, F("processing failure while talking to " "peer %s, disconnecting") - % sess.peer_id); + % sess->peer_id); return; } - probe.clear(); - probe.add(*(sess.str), sess.which_events()); - Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout); - Netxx::Probe::ready_type event = res.second; - Netxx::socket_type fd = res.first; + bool io_ok = react.do_io(); - if (fd == -1 && !armed) - { - E(false, (F("timed out waiting for I/O with " - "peer %s, disconnecting") - % sess.peer_id)); - } + E(io_ok, (F("timed out waiting for I/O with " + "peer %s, disconnecting") + % sess->peer_id)); - if (!sess.do_io(event)) + if (react.size() == 0) { // Commit whatever work we managed to accomplish anyways. guard.commit(); @@ -2604,22 +2989,22 @@ call_server(options & opts, // user-reported error or a clean disconnect. See protocol // state diagram in session::process_bye_cmd. - if (sess.protocol_state == session::confirmed_state) + if (sess->protocol_state == session::confirmed_state) { P(F("successful exchange with %s") - % sess.peer_id); + % sess->peer_id); return; } - else if (sess.encountered_error) + else if (sess->encountered_error) { P(F("peer %s disconnected after we informed them of error") - % sess.peer_id); + % sess->peer_id); return; } else E(false, (F("I/O failure while talking to " "peer %s, disconnecting") - % sess.peer_id)); + % sess->peer_id)); } } } @@ -2629,8 +3014,6 @@ session_from_server_sync_item(options & lua_hooks & lua, project_t & project, key_store & keys, - Netxx::Timeout const & timeout, - Netxx::port_type const & default_port, server_initiated_sync_request const & request) { netsync_connection_info info; @@ -2645,8 +3028,8 @@ session_from_server_sync_item(options & P(F("connecting to %s") % info.client.unparsed); shared_ptr server = build_stream_to_server(opts, lua, - info, default_port, - timeout); + info, constants::netsync_default_port, + Netxx::Timeout(constants::netsync_timeout_seconds)); // 'false' here means not to revert changes when // the SockOpt goes out of scope. @@ -2678,341 +3061,37 @@ session_from_server_sync_item(options & } } -static shared_ptr -make_server(std::list const & addresses, - Netxx::port_type default_port, - Netxx::Timeout timeout, - bool use_ipv6, - Netxx::Address & addr) -{ - try - { - addr = Netxx::Address(use_ipv6); - - if (addresses.empty()) - addr.add_all_addresses(default_port); - else - { - for (std::list::const_iterator it = addresses.begin(); - it != addresses.end(); ++it) - { - const utf8 & address = *it; - if (!address().empty()) - { - size_t l_colon = address().find(':'); - size_t r_colon = address().rfind(':'); - - if (l_colon == r_colon && l_colon == 0) - { - // can't be an IPv6 address as there is only one colon - // must be a : followed by a port - string port_str = address().substr(1); - addr.add_all_addresses(std::atoi(port_str.c_str())); - } - else - addr.add_address(address().c_str(), default_port); - } - } - } - shared_ptr ret(new Netxx::StreamServer(addr, timeout)); - - char const * name; - name = addr.get_name(); - P(F("beginning service on %s : %s") - % (name != NULL ? name : _("")) - % lexical_cast(addr.get_port())); - - return ret; - } - // If we use IPv6 and the initialisation of server fails, we want - // to try again with IPv4. The reason is that someone may have - // downloaded a IPv6-enabled monotone on a system that doesn't - // have IPv6, and which might fail therefore. - catch(Netxx::NetworkException & e) - { - if (use_ipv6) - return make_server(addresses, default_port, timeout, false, addr); - else - throw; - } - catch(Netxx::Exception & e) - { - if (use_ipv6) - return make_server(addresses, default_port, timeout, false, addr); - else - throw; - } -} - static void -drop_session_associated_with_fd(map > & sessions, - Netxx::socket_type fd) -{ - // This is a bit of a hack. Initially all "file descriptors" in - // netsync were full duplex, so we could get away with indexing - // sessions by their file descriptor. - // - // When using pipes in unix, it's no longer true: a session gets - // entered in the session map under its read pipe fd *and* its write - // pipe fd. When we're in such a situation the socket fd is "-1" and - // we downcast to a PipeStream and use its read+write fds. - // - // When using pipes in windows, we use a full duplex pipe (named - // pipe) so the socket-like abstraction holds. - - I(fd != -1); - map >::const_iterator i = sessions.find(fd); - I(i != sessions.end()); - shared_ptr sess = i->second; - fd = sess->str->get_socketfd(); - if (fd != -1) - { - sessions.erase(fd); - } - else - { - shared_ptr pipe = - boost::dynamic_pointer_cast(sess->str); - I(static_cast(pipe)); - I(pipe->get_writefd() != -1); - I(pipe->get_readfd() != -1); - sessions.erase(pipe->get_readfd()); - sessions.erase(pipe->get_writefd()); - } -} - -static bool -session_is_in_fd_set(set const & fds, - shared_ptr sess) -{ - Netxx::socket_type fd = sess->str->get_socketfd(); - - if (fd == -1) - { - shared_ptr pipe = - boost::dynamic_pointer_cast(sess->str); - I(pipe); - if (fds.find(pipe->get_readfd()) != fds.end()) - return true; - if (fds.find(pipe->get_writefd()) != fds.end()) - return true; - } - else if (fds.find(fd) != fds.end()) - return true; - - return false; -} - -static int -arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe, - map > & sessions, - set & armed_sessions, - transaction_guard & guard) -{ - int probing = 0; - set arm_failed; - for (map >::const_iterator i = sessions.begin(); - i != sessions.end(); ++i) - { - // stdio sessions are entered twice - if (session_is_in_fd_set(arm_failed, i->second)) - continue; - if (!i->second->do_work(guard)) - { - arm_failed.insert(i->first); - } - else - { - try - { - if (i->second->arm()) - { - L(FL("fd %d is armed") % i->first); - armed_sessions.insert(i->first); - } - probe.add(*i->second->str, i->second->which_events()); - ++probing; - } - catch (bad_decode & bd) - { - W(F("protocol error while processing peer %s: '%s', marking as bad") - % i->second->peer_id % bd.what); - arm_failed.insert(i->first); - } - } - } - for (set::const_iterator i = arm_failed.begin(); - i != arm_failed.end(); ++i) - { - drop_session_associated_with_fd(sessions, *i); - } - return probing; -} - -static void -handle_new_connection(options & opts, - lua_hooks & lua, - project_t & project, - key_store & keys, - Netxx::Address & addr, - Netxx::StreamServer & server, - Netxx::Timeout & timeout, - protocol_role role, - map > & sessions) -{ - L(FL("accepting new connection on %s : %s") - % (addr.get_name()?addr.get_name():"") % lexical_cast(addr.get_port())); - Netxx::Peer client = server.accept_connection(); - - if (!client) - { - L(FL("accept() returned a dead client")); - } - else - { - P(F("accepted new client connection from %s : %s") - % client.get_address() % lexical_cast(client.get_port())); - - // 'false' here means not to revert changes when the SockOpt - // goes out of scope. - Netxx::SockOpt socket_options(client.get_socketfd(), false); - socket_options.set_non_blocking(); - - shared_ptr str = - shared_ptr - (new Netxx::Stream(client.get_socketfd(), timeout)); - - shared_ptr sess(new session(opts, lua, project, keys, - role, server_voice, - globish("*"), globish(""), - lexical_cast(client), str)); - sess->begin_service(); - sessions.insert(make_pair(client.get_socketfd(), sess)); - } -} - -static void -handle_io(Netxx::socket_type fd, - shared_ptr sess, - map > & sessions, - Netxx::Probe::ready_type what) -{ - if (!sess->do_io(what)) - { - if (what & Netxx::Probe::ready_oobd) - { - P(F("got OOB from peer %s, disconnecting") - % sess->peer_id); - } - else - { - switch (sess->protocol_state) - { - case session::working_state: - P(F("peer %s IO failed in working state (error)") - % sess->peer_id); - break; - - case session::shutdown_state: - P(F("peer %s IO failed in shutdown state " - "(possibly client misreported error)") - % sess->peer_id); - break; - - case session::confirmed_state: - P(F("peer %s IO failed in confirmed state (success)") - % sess->peer_id); - break; - } - } - - drop_session_associated_with_fd(sessions, fd); - } -} - -static void -reap_dead_sessions(map > & sessions, - unsigned long timeout_seconds) -{ - // Kill any clients which haven't done any i/o inside the timeout period - // or who have exchanged all items and flushed their output buffers. - set dead_clients; - time_t now = ::time(NULL); - for (map >::const_iterator - i = sessions.begin(); i != sessions.end(); ++i) - { - if (static_cast(i->second->last_io_time + timeout_seconds) - < static_cast(now)) - { - P(F("fd %d (peer %s) has been idle too long, disconnecting") - % i->first % i->second->peer_id); - dead_clients.insert(i->first); - } - } - for (set::const_iterator i = dead_clients.begin(); - i != dead_clients.end(); ++i) - { - drop_session_associated_with_fd(sessions, *i); - } -} - -static void serve_connections(options & opts, lua_hooks & lua, project_t & project, key_store & keys, protocol_role role, - std::list const & addresses, - Netxx::port_type default_port, - unsigned long timeout_seconds, - unsigned long session_limit) + std::list const & addresses) { - Netxx::PipeCompatibleProbe probe; - - Netxx::Timeout - forever, - timeout(static_cast(timeout_seconds)), - instant(0,1); - #ifdef USE_IPV6 bool use_ipv6=true; #else bool use_ipv6=false; #endif - Netxx::Address addr(use_ipv6); - shared_ptr server = make_server(addresses, - default_port, - timeout, - use_ipv6, - addr); + shared_ptr guard(new transaction_guard(project.db)); - map > sessions; - set armed_sessions; + reactor react; + shared_ptr listen(new listener(opts, lua, project, keys, + react, role, addresses, + guard, use_ipv6)); + react.add(listen, *guard); - shared_ptr guard; while (true) { - probe.clear(); - armed_sessions.clear(); - - if (sessions.size() >= session_limit) - W(F("session limit %d reached, some connections " - "will be refused") % session_limit); - else - probe.add(*server); - if (!guard) guard = shared_ptr (new transaction_guard(project.db)); - I(guard); - int probing = arm_sessions_and_calculate_probe(probe, sessions, - armed_sessions, - *guard); + react.ready(*guard); while (!server_initiated_sync_requests.empty()) { @@ -3021,79 +3100,20 @@ serve_connections(options & opts, server_initiated_sync_requests.pop_front(); shared_ptr sess = session_from_server_sync_item(opts, lua, project, keys, - timeout, default_port, request); if (sess) { - sessions.insert(make_pair(sess->str->get_socketfd(), sess)); - probe.add(*sess->str, sess->which_events()); - ++probing; + react.add(sess, *guard); L(FL("Opened connection to %s") % sess->peer_id); } } - L(FL("i/o probe with %d armed, %d probing") - % armed_sessions.size() % probing); - Netxx::socket_type fd; - Netxx::Timeout how_long; - if (sessions.empty()) - how_long = forever; - else if (armed_sessions.empty()) - how_long = timeout; - else - how_long = instant; - do - { - Netxx::Probe::result_type res = probe.ready(how_long); - how_long = instant; - Netxx::Probe::ready_type event = res.second; - fd = res.first; + react.do_io(); - if (fd == -1) - { - if (armed_sessions.empty()) - L(FL("timed out waiting for I/O (listening on %s : %s)") - % addr.get_name() % lexical_cast(addr.get_port())); - } + react.prune(); - // we either got a new connection - else if (fd == *server) - handle_new_connection(opts, lua, project, keys, - addr, *server, timeout, role, - sessions); - - // or an existing session woke up - else - { - map >::iterator i; - i = sessions.find(fd); - if (i == sessions.end()) - { - L(FL("got woken up for action on unknown fd %d") % fd); - } - else - { - probe.remove(*(i->second->str)); - shared_ptr sess = i->second; - - try - { - handle_io(fd, sess, sessions, event); - } - catch (Netxx::Exception &) - { - P(F("Network error on peer %s, disconnecting") - % sess->peer_id); - drop_session_associated_with_fd(sessions, fd); - } - } - } - } - while (fd != -1); - reap_dead_sessions(sessions, timeout_seconds); - - if (sessions.empty()) + if (react.size() == 1 /* 1 listener + 0 sessions */) { // Let the guard die completely if everything's gone quiet. guard->commit(); @@ -3103,78 +3123,23 @@ static void } static void -serve_single_connection(shared_ptr sess, - unsigned long timeout_seconds) +serve_single_connection(shared_ptr sess) { - Netxx::PipeCompatibleProbe probe; - - Netxx::Timeout - forever, - timeout(static_cast(timeout_seconds)), - instant(0,1); - + sess->begin_service(); P(F("beginning service on %s") % sess->peer_id); - sess->begin_service(); - transaction_guard guard(sess->project.db); - map > sessions; - set armed_sessions; + reactor react; + react.add(sess, guard); - if (sess->str->get_socketfd() == -1) + while (react.size() > 0) { - // Unix pipes are non-duplex, have two filedescriptors - shared_ptr pipe = - boost::dynamic_pointer_cast(sess->str); - I(pipe); - sessions[pipe->get_writefd()]=sess; - sessions[pipe->get_readfd()]=sess; + react.ready(guard); + react.do_io(); + react.prune(); } - else - sessions[sess->str->get_socketfd()]=sess; - - while (!sessions.empty()) - { - probe.clear(); - armed_sessions.clear(); - - int probing = arm_sessions_and_calculate_probe(probe, sessions, - armed_sessions, - guard); - - L(FL("i/o probe with %d armed, %d probing") - % armed_sessions.size() % probing); - Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout - : instant)); - Netxx::Probe::ready_type event = res.second; - Netxx::socket_type fd = res.first; - - if (fd == -1) - { - if (armed_sessions.empty()) - L(FL("timed out waiting for I/O (listening on %s)") - % sess->peer_id); - } - - // an existing session woke up - else - { - map >::iterator i; - i = sessions.find(fd); - if (i == sessions.end()) - { - L(FL("got woken up for action on unknown fd %d") % fd); - } - else - { - shared_ptr sess = i->second; - - handle_io(fd, sess, sessions, event); - } - } - reap_dead_sessions(sessions, timeout_seconds); - } + guard.commit(); } @@ -3386,23 +3351,17 @@ run_netsync_protocol(options & opts, lua role, server_voice, globish("*"), globish(""), "stdio", str)); - serve_single_connection(sess,constants::netsync_timeout_seconds); + serve_single_connection(sess); } else serve_connections(opts, lua, project, keys, - role, - info.server.addrs, - static_cast(constants::netsync_default_port), - static_cast(constants::netsync_timeout_seconds), - static_cast(constants::netsync_connection_limit)); + role, info.server.addrs); } else { I(voice == client_voice); call_server(opts, lua, project, keys, - role, info, - static_cast(constants::netsync_default_port), - static_cast(constants::netsync_timeout_seconds)); + role, info); } } catch (Netxx::NetworkException & e)