# # # patch "ChangeLog" # from [f0261047015b1ef6e5984007c32a6e5c7c4df4e4] # to [7436f3f8621a686bbf9e1352bd88831c00b41e3e] # # patch "netsync.cc" # from [e9bedced32c6a4474898d3f9bf4f02ccbf2f9c48] # to [e6680e393c344a1614e0f247d4ea351237549a41] # # patch "netxx_pipe.cc" # from [257cde413eb1573139f1400d764f3c54651c6c24] # to [2eabfc5fa0a4f5696e5951b13cfbad91d423a206] # # patch "netxx_pipe.hh" # from [9b4e1e3543138f5e784814446299bce571ea5e18] # to [720992e155fa3f850ed0c46aaa1d224e25c95efe] # ============================================================ --- ChangeLog f0261047015b1ef6e5984007c32a6e5c7c4df4e4 +++ ChangeLog 7436f3f8621a686bbf9e1352bd88831c00b41e3e @@ -1,3 +1,9 @@ +2006-05-14 Graydon Hoare + + * netxx_pipe.{cc,hh}: Various cleanups to build on unix. + * netsync.cc (drop_session_associated_with_fd): New helper, + and use it everywhere we did sessions.erase() before. + 2006-05-13 Graydon Hoare * netsync.cc, netxx/probe.h, netxx_pipe.{cc,hh}: Adaptation of ============================================================ --- netsync.cc e9bedced32c6a4474898d3f9bf4f02ccbf2f9c48 +++ netsync.cc e6680e393c344a1614e0f247d4ea351237549a41 @@ -2517,6 +2517,43 @@ } } +static void +drop_session_associated_with_fd(map > & sessions, + Netxx::socket_type fd) +{ + // This is a bit of a hack. Initially all "file descriptors" in + // netsync were full duplex, so we could get away with indexing + // sessions by their file descriptor. + // + // When using pipes in unix, it's no longer true: a session gets + // entered in the session map under its read pipe fd *and* its write + // pipe fd. When we're in such a situation the socket fd is "-1" and + // we downcast to a PipeStream and use its read+write fds. + // + // When using pipes in windows, we use a full duplex pipe (named + // pipe) so the socket-like abstraction holds. + + I(fd != -1); + map >::const_iterator i = sessions.find(fd); + I(i != sessions.end()); + shared_ptr sess = i->second; + fd = sess->str->get_socketfd(); + if (fd != -1) + { + sessions.erase(fd); + } + else + { + shared_ptr pipe = + boost::dynamic_pointer_cast(sess->str); + I(static_cast(pipe)); + I(static_cast(pipe->get_writefd()) != -1); + I(static_cast(pipe->get_readfd()) != -1); + sessions.erase(pipe->get_readfd()); + sessions.erase(pipe->get_writefd()); + } +} + static void arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe, map > & sessions, @@ -2547,7 +2584,7 @@ for (set::const_iterator i = arm_failed.begin(); i != arm_failed.end(); ++i) { - sessions.erase(*i); + drop_session_associated_with_fd(sessions, *i); } } @@ -2610,7 +2647,7 @@ { W(F("protocol error while processing peer %s: '%s', disconnecting\n") % sess->peer_id % bd.what); - sessions.erase(fd); + drop_session_associated_with_fd(sessions, fd); live_p = false; } } @@ -2634,7 +2671,7 @@ % sess->peer_id); break; } - sessions.erase(fd); + drop_session_associated_with_fd(sessions, fd); live_p = false; } } @@ -2667,7 +2704,7 @@ break; } - sessions.erase(fd); + drop_session_associated_with_fd(sessions, fd); live_p = false; } } @@ -2691,7 +2728,7 @@ { P(F("peer %s processing finished, disconnecting\n") % sess->peer_id); - sessions.erase(j); + drop_session_associated_with_fd(sessions, *i); } } } @@ -2719,7 +2756,7 @@ for (set::const_iterator i = dead_clients.begin(); i != dead_clients.end(); ++i) { - sessions.erase(*i); + drop_session_associated_with_fd(sessions, *i); } } @@ -2846,7 +2883,7 @@ } else { - probe.remove(i->second->str); + probe.remove(*(i->second->str)); shared_ptr sess = i->second; bool live_p = true; @@ -2861,7 +2898,7 @@ { P(F("got OOB from peer %s, disconnecting\n") % sess->peer_id); - sessions.erase(i); + drop_session_associated_with_fd(sessions, fd); } } } @@ -2931,12 +2968,11 @@ if (sess->str->get_socketfd() == -1) { // Unix pipes are non-duplex, have two filedescriptors - Netxx::PipeStream *pipe=dynamic_cast(&*sess->str); - if (pipe) - { - sessions[pipe->get_writefd()]=sess; - sessions[pipe->get_readfd()]=sess; - } + shared_ptr pipe = + boost::dynamic_pointer_cast(sess->str); + I(pipe); + sessions[pipe->get_writefd()]=sess; + sessions[pipe->get_readfd()]=sess; } else sessions[sess->str->get_socketfd()]=sess; @@ -2985,7 +3021,7 @@ { P(F("got some OOB data on fd %d (peer %s), disconnecting\n") % fd % sess->peer_id); - sessions.erase(i); + drop_session_associated_with_fd(sessions, fd); } } } ============================================================ --- netxx_pipe.cc 257cde413eb1573139f1400d764f3c54651c6c24 +++ netxx_pipe.cc 2eabfc5fa0a4f5696e5951b13cfbad91d423a206 @@ -21,8 +21,16 @@ #endif Netxx::PipeStream::PipeStream(int _readfd, int _writefd) - : child(INVALID_HANDLE_VALUE), - bytes_available(0) + : +#ifdef WIN32 + child(INVALID_HANDLE_VALUE), + bytes_available(0), + read_in_progress(false) +#else + readfd(_readfd), + writefd(_writefd), + child(0) +#endif { #ifdef WIN32 if (_setmode(_readfd, _O_BINARY) == -1) @@ -106,6 +114,7 @@ } #endif +#ifdef WIN32 static std::string err_msg() { @@ -115,13 +124,20 @@ (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0); return std::string(buf); } +#endif + Netxx::PipeStream::PipeStream (const std::string & cmd, const std::vector & args) - : child(INVALID_HANDLE_VALUE), - bytes_available(0) + : #ifdef WIN32 - ,read_in_progress(false) + child(INVALID_HANDLE_VALUE), + bytes_available(0), + read_in_progress(false) +#else + readfd(-1), + writefd(-1), + child(0) #endif { // Unfortunately neither munge_argv_into_cmdline nor execvp do take ============================================================ --- netxx_pipe.hh 9b4e1e3543138f5e784814446299bce571ea5e18 +++ netxx_pipe.hh 720992e155fa3f850ed0c46aaa1d224e25c95efe @@ -112,7 +112,6 @@ void add(PipeStream &ps, ready_type rt=ready_none); void add(const StreamBase &sb, ready_type rt=ready_none); void add(const StreamServer &ss, ready_type rt=ready_none); - void remove(const PipeStream &ps); }; #else