# # patch "ChangeLog" # from [ca7af199e3ec7cbe2e7333e3a4c107ed23161796] # to [7d63b013470e09c70dd05b9da8f899ec63ed2201] # # patch "netsync.cc" # from [356f622b68dcfef9265dad15a8c988f2ea62741f] # to [0aef99065fae04d90bbf3ae8bcf2df03097539cd] # # patch "netxx_pipe.hh" # from [7f13e757f0721fccd410751bdcc10f2d604de2a3] # to [74bdced617276bc40eca3e34f8be2b6ca8d390a9] # ======================================================================== --- ChangeLog ca7af199e3ec7cbe2e7333e3a4c107ed23161796 +++ ChangeLog 7d63b013470e09c70dd05b9da8f899ec63ed2201 @@ -1,3 +1,17 @@ +2005-08-13 Christof Petig + + use portable PipeStream implementation to reimplement changes on the + ssh branch + +2004-06-18 Christof Petig + + * netsync.cc: + (struct session): use PipeStream, pair of sockets + (find_wordend,parse_ssh_url): helper functions to parse an URL + (call_server): recognize ssh: and file: URLs + (serve_stdio): variant of serve_connections for stdio + (run_netsync_protocol): call serve_stdio if address is - + 2005-07-26 Nathaniel Smith * change_set.cc (dump): Add state_renumbering dumper. ======================================================================== --- netsync.cc 356f622b68dcfef9265dad15a8c988f2ea62741f +++ netsync.cc 0aef99065fae04d90bbf3ae8bcf2df03097539cd @@ -45,6 +45,7 @@ #include "netxx/stream.h" #include "netxx/streamserver.h" #include "netxx/timeout.h" +#include "netxx_pipe.h" // // this is the "new" network synchronization (netsync) system in @@ -234,8 +235,8 @@ app_state & app; string peer_id; - Netxx::socket_type fd; - Netxx::Stream str; +// Netxx::socket_type fd; + shared_ptr str; string inbuf; // deque of pair @@ -292,8 +293,7 @@ utf8 const & our_exclude_pattern, app_state & app, string const & peer, - Netxx::socket_type sock, - Netxx::Timeout const & to); + shared_ptr sock); virtual ~session(); @@ -458,8 +458,7 @@ utf8 const & our_exclude_pattern, app_state & app, string const & peer, - Netxx::socket_type sock, - Netxx::Timeout const & to) : + shared_ptr sock) : role(role), voice(voice), our_include_pattern(our_include_pattern), @@ -467,8 +466,7 @@ our_matcher(our_include_pattern, our_exclude_pattern), app(app), peer_id(peer), - fd(sock), - str(sock, to), + str(sock), inbuf(""), outbuf_size(0), armed(false), @@ -1327,10 +1325,10 @@ { I(inbuf.size() < constants::netcmd_maxsz); char tmp[constants::bufsz]; - Netxx::signed_size_type count = str.read(tmp, sizeof(tmp)); + Netxx::signed_size_type count = str->read(tmp, sizeof(tmp)); if (count > 0) { - L(F("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id); + L(F("read %d bytes from fd %d (peer %s)\n") % count % str->get_socketfd() % peer_id); if (encountered_error) { L(F("in error unwind mode, so throwing them into the bit bucket\n")); @@ -1351,7 +1349,7 @@ { I(!outbuf.empty()); size_t writelen = outbuf.front().first.size() - outbuf.front().second; - Netxx::signed_size_type count = str.write(outbuf.front().first.data() + outbuf.front().second, + Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second, std::min(writelen, constants::bufsz)); if (count > 0) @@ -1366,7 +1364,7 @@ outbuf.front().second += count; } L(F("wrote %d bytes to fd %d (peer %s)\n") - % count % fd % peer_id); + % count % str->get_socketfd() % peer_id); mark_recent_io(); if (byte_out_ticker.get() != NULL) (*byte_out_ticker) += count; @@ -3215,7 +3213,42 @@ } } +static std::string::size_type find_wordend(const std::string &address, + std::string::size_type begin, std::string::size_type end, + const std::string &breaks) +{ for (;begin server; + if (address().substr(0,5)=="file:") + { int fd1[2],fd2[2]; + pid_t child=pipe_and_fork(fd1,fd2); + if (child<0) + { L(F("pipe/fork failed")); + return; + } + else if (!child) + { std::string db_path=address().substr(5); + const unsigned newsize=64; + const char *newargv[newsize]; + unsigned newargc=0; + newargv[newargc++]="monotone"; + if (global_sanity.debug) newargv[newargc++]="--debug"; + newargv[newargc++]="--db"; + newargv[newargc++]=db_path.c_str(); + newargv[newargc++]="--"; + newargv[newargc++]="serve"; + newargv[newargc++]="-"; + for (unsigned i=0; i str(new Netxx::Stream(client.get_socketfd(), + timeout)); shared_ptr sess(new session(role, server_voice, include_pattern, exclude_pattern, app, - lexical_cast(client), - client.get_socketfd(), timeout)); + lexical_cast(client), str)); sess->begin_service(); sessions.insert(make_pair(client.get_socketfd(), sess)); } @@ -3583,7 +3694,87 @@ } } +static void +serve_single_connection(//protocol_role role, + shared_ptr sess, + unsigned long timeout_seconds) +{ + Netxx::PipeCompatibleProbe probe; + Netxx::Timeout + forever, + timeout(static_cast(timeout_seconds)), + instant(0,1); + + P(F("beginning service on %s\n") % sess->peer_id); + +// Netxx::StreamServer server(addr, timeout); + sess->begin_service(); + + map > sessions; + set armed_sessions; + + sessions[sess->str.get_socketfd()]=sess; + PipeStream *pipe=dynamic_cast(&*sess->str); + if (pipe) sessions[pipe->get_writefd()]=sess; + + // no addr, no server + +// bool live_p = true; + while (!sessions.empty()) + { + probe.clear(); + armed_sessions.clear(); + + arm_sessions_and_calculate_probe(probe, sessions, armed_sessions); + + L(F("i/o probe with %d armed\n") % armed_sessions.size()); + Netxx::Probe::result_type res = probe.ready(/*sessions.empty() ? forever */ + : (armed_sessions.empty() ? timeout + : instant)); + Netxx::Probe::ready_type event = res.second; + Netxx::socket_type fd = res.first; + + if (fd == -1) + { + if (armed_sessions.empty()) + L(F("timed out waiting for I/O (listening on %s : %d)\n") + % addr.get_name() % addr.get_port()); + } + + // an existing session woke up + else + { + map >::iterator i; + i = sessions.find(fd); + if (i == sessions.end()) + { + L(F("got woken up for action on unknown fd %d\n") % fd); + } + else + { + shared_ptr sess = i->second; + bool live_p = true; + + if (event & Netxx::Probe::ready_read) + handle_read_available(fd, sess, sessions, armed_sessions, live_p); + + if (live_p && (event & Netxx::Probe::ready_write)) + handle_write_available(fd, sess, sessions, live_p); + + if (live_p && (event & Netxx::Probe::ready_oobd)) + { + P(F("got some OOB data on fd %d (peer %s), disconnecting\n") + % fd % sess->peer_id); + sessions.erase(i); + } + } + } + process_armed_sessions(sessions, armed_sessions); + reap_dead_sessions(sessions, timeout_seconds); + } +} + ///////////////////////////////////////////////// // // layer 4: monotone interface layer @@ -3765,7 +3956,15 @@ start_platform_netsync(); if (voice == server_voice) { - serve_connections(role, include_pattern, exclude_pattern, app, + if (addr==utf8("-")) + { shared_ptr str(new PipeStream(0,1)); + shared_ptr sess(new session(role, server_voice, + include_pattern, exclude_pattern, + app, "stdio", str)); + serve_single_connection(sess,constants::netsync_timeout_seconds) + } + else + serve_connections(role, include_pattern, exclude_pattern, app, addr, static_cast(constants::netsync_default_port), static_cast(constants::netsync_timeout_seconds), static_cast(constants::netsync_connection_limit)); ======================================================================== --- netxx_pipe.hh 7f13e757f0721fccd410751bdcc10f2d604de2a3 +++ netxx_pipe.hh 74bdced617276bc40eca3e34f8be2b6ca8d390a9 @@ -29,7 +29,7 @@ friend class PipeCompatibleProbe; #endif public: - explicit PipeStream (int readfd, int writefd); + explicit PipeStream (int readfd, int writefd); // Timeout? explicit PipeStream (const std::string &cmd, const std::vector &args); virtual signed_size_type read (void *buffer, size_type length); virtual signed_size_type write (const void *buffer, size_type length);