# # # patch "ChangeLog" # from [2daf508ddd9169fe6ae37c06b59eeb656d9fd484] # to [f0261047015b1ef6e5984007c32a6e5c7c4df4e4] # # patch "Makefile.am" # from [ad1680df7c76acbdaef7ad40c4e32e01d89daf7f] # to [eeeba6b1fdbe46f7152547cb8a6bd8442a8ecd86] # # patch "app_state.cc" # from [69cb4754f67caebae4a18a1a030ddd3f24e85e4a] # to [fa17a6dd38712e92a302c86d85c9ce7833184a25] # # patch "app_state.hh" # from [7c7f7cb855dc4fa643d6ba62c8d4fac8c5c6075f] # to [da132f3bf31a985a1a26945ded175b81e7bf1925] # # patch "cmd_netsync.cc" # from [624019425573cc5ac20ad0628032d32b9085d718] # to [c84c1664d00926c4079d2c4b9643a9d6e42c6cc6] # # patch "monotone.cc" # from [f955c2c8d4ec5dd4de832ce3fd300b61a6640860] # to [44be693267e1ec6d4071b3b09396f1b603ccfb34] # # patch "netsync.cc" # from [838ea8e645fbc5694a6ac0e81ee0e5c8dde36ede] # to [e9bedced32c6a4474898d3f9bf4f02ccbf2f9c48] # # patch "netxx/probe.h" # from [ab8ffcf0b226a90c7f79c56a7d41346c23b0d664] # to [7dbd28f284875bb3af6e59db3be0fb9378f8e7bb] # # patch "options.hh" # from [db7002b03fc93c0456a360c5cc212c6b796950c9] # to [6ea8ce2a5b90b4bd1fffc77df22160eec911e985] # # patch "platform.hh" # from [02cce907829ab54b8a3a6e4338913f33ddb826d7] # to [4f4eca4a2cb78f93f2c202231951d182237914ca] # # patch "unit_tests.cc" # from [a9f5606a66179106bd461d4a34d8444e142a8ad2] # to [4b2815d5146151bd9263997f2d3b4760047e9ac2] # # patch "unit_tests.hh" # from [1f0dbf6a811b8bcb25b0a0071239cf7809867037] # to [dca619b7a7c6cda898fa0fc98e0bc906b746a614] # # patch "visualc/monotone.vcproj" # from [c27688d937c7336023ce0d6136e83ba0e65797cc] # to [e52b7ab761938efb46bf8692f961b2a7a7314158] # # patch "win32/process.cc" # from [79552c8593362854390fe3560b13ab682a31c434] # to [a39d79c6786719f2f126f3059020150749ca80ec] # ============================================================ --- ChangeLog 2daf508ddd9169fe6ae37c06b59eeb656d9fd484 +++ ChangeLog f0261047015b1ef6e5984007c32a6e5c7c4df4e4 @@ -1,3 +1,30 @@ +2006-05-13 Graydon Hoare + + * netsync.cc, netxx/probe.h, netxx_pipe.{cc,hh}: Adaptation of + Christof Petig's PipeStream work, with heavy modifications; seems + to work under win32 / visualc8 now. + + * options.hh (OPT_STDIO): New option. + * app_state.{cc,hh} (bind_stdio): Store it. + * monotone.cc (cpp_main): Parse it. + * cmd_netsync.cc (serve): Accept --stdio option for 'serve'. + * platform.hh (munge_argv_into_cmdline): Export on win32. + * win32/process.cc (munge_argv_into_cmdline): Make non-static. + * unit_tests.{cc,hh}: Add tests for pipes. + * visualc/monotone.vcproj: Add netxx_pipe.{cc,hh} + * Makefile.am: Likewise. + +2006-05-13 Christof Petig + + (Originally from 2004-06-18, on a different branch) + + * netsync.cc: + (struct session): use PipeStream, pair of sockets + (find_wordend,parse_ssh_url): helper functions to parse an URL + (call_server): recognize ssh: and file: URLs + (serve_stdio): variant of serve_connections for stdio + (run_netsync_protocol): call serve_stdio if address is - + 2006-04-27 Timothy Brownawell * hash_map.hh: Do things a little differently. Should make ============================================================ --- Makefile.am ad1680df7c76acbdaef7ad40c4e32e01d89daf7f +++ Makefile.am eeeba6b1fdbe46f7152547cb8a6bd8442a8ecd86 @@ -30,6 +30,7 @@ refiner.cc refiner.hh \ enumerator.cc enumerator.hh \ netsync.cc netsync.hh \ + netxx_pipe.cc netxx_pipe.hh \ netcmd.cc netcmd.hh \ merkle_tree.cc merkle_tree.hh \ basic_io.cc basic_io.hh \ ============================================================ --- app_state.cc 69cb4754f67caebae4a18a1a030ddd3f24e85e4a +++ app_state.cc fa17a6dd38712e92a302c86d85c9ce7833184a25 @@ -35,7 +35,7 @@ no_merges(false), set_default(false), verbose(false), date_set(false), search_root("/"), depth(-1), last(-1), next(-1), diff_format(unified_diff), diff_args_provided(false), - use_lca(false), execute(false), bind_address(""), bind_port(""), + use_lca(false), execute(false), bind_address(""), bind_port(""), bind_stdio(false), missing(false), unknown(false), confdir(get_default_confdir()), have_set_key_dir(false), no_files(false) { ============================================================ --- app_state.hh 7c7f7cb855dc4fa643d6ba62c8d4fac8c5c6075f +++ app_state.hh da132f3bf31a985a1a26945ded175b81e7bf1925 @@ -68,6 +68,7 @@ bool execute; utf8 bind_address; utf8 bind_port; + bool bind_stdio; bool missing; bool unknown; std::vector keys_to_push; ============================================================ --- cmd_netsync.cc 624019425573cc5ac20ad0628032d32b9085d718 +++ cmd_netsync.cc c84c1664d00926c4079d2c4b9643a9d6e42c6cc6 @@ -168,7 +168,7 @@ CMD_NO_WORKSPACE(serve, N_("network"), N_("PATTERN ..."), N_("serve the branches specified by PATTERNs to connecting clients"), - OPT_BIND % OPT_PIDFILE % OPT_EXCLUDE) + OPT_BIND % OPT_STDIO % OPT_PIDFILE % OPT_EXCLUDE) { if (args.size() < 1) throw usage(name); ============================================================ --- monotone.cc f955c2c8d4ec5dd4de832ce3fd300b61a6640860 +++ monotone.cc 44be693267e1ec6d4071b3b09396f1b603ccfb34 @@ -78,6 +78,7 @@ {"missing", 0, POPT_ARG_NONE, NULL, OPT_MISSING, gettext_noop("perform the operations for files missing from workspace"), NULL}, {"unknown", 0, POPT_ARG_NONE, NULL, OPT_UNKNOWN, gettext_noop("perform the operations for unknown files from workspace"), NULL}, {"key-to-push", 0, POPT_ARG_STRING, &argstr, OPT_KEY_TO_PUSH, gettext_noop("push the specified key even if it hasn't signed anything"), NULL}, + {"stdio", 0, POPT_ARG_NONE, NULL, OPT_STDIO, gettext_noop("serve netsync on stdio"), NULL}, {"drop-attr", 0, POPT_ARG_STRING, &argstr, OPT_DROP_ATTR, gettext_noop("when rosterifying, drop attrs entries with the given key"), NULL}, {"no-files", 0, POPT_ARG_NONE, NULL, OPT_NO_FILES, gettext_noop("exclude files when printing logs"), NULL}, {"recursive", 'R', POPT_ARG_NONE, NULL, OPT_RECURSIVE, gettext_noop("also operate on the contents of any listed directories"), NULL}, @@ -495,6 +496,10 @@ app.execute = true; break; + case OPT_STDIO: + app.bind_stdio = true; + break; + case OPT_BIND: { std::string arg(argstr); @@ -526,6 +531,7 @@ port_part = ""; } } + app.bind_stdio = false; app.bind_address = utf8(addr_part); app.bind_port = utf8(port_part); } ============================================================ --- netsync.cc 838ea8e645fbc5694a6ac0e81ee0e5c8dde36ede +++ netsync.cc e9bedced32c6a4474898d3f9bf4f02ccbf2f9c48 @@ -50,7 +50,7 @@ #include "netxx/stream.h" #include "netxx/streamserver.h" #include "netxx/timeout.h" - +#include "netxx_pipe.hh" // TODO: things to do that will break protocol compatibility // -- need some way to upgrade anonymous to keyed pull, without user having // to explicitly specify which they want @@ -264,8 +264,7 @@ app_state & app; string peer_id; - Netxx::socket_type fd; - Netxx::Stream str; + shared_ptr str; string_queue inbuf; // deque of pair @@ -336,8 +335,7 @@ utf8 const & our_exclude_pattern, app_state & app, string const & peer, - Netxx::socket_type sock, - Netxx::Timeout const & to); + shared_ptr sock); virtual ~session(); @@ -448,8 +446,7 @@ utf8 const & our_exclude_pattern, app_state & app, string const & peer, - Netxx::socket_type sock, - Netxx::Timeout const & to) : + shared_ptr sock) : role(role), voice(voice), our_include_pattern(our_include_pattern), @@ -457,8 +454,7 @@ our_matcher(our_include_pattern, our_exclude_pattern), app(app), peer_id(peer), - fd(sock), - str(sock, to), + str(sock), inbuf(), outbuf_size(0), armed(false), @@ -924,7 +920,6 @@ throw netsync_error(errmsg); } - Netxx::Probe::ready_type session::which_events() const { @@ -950,10 +945,10 @@ { I(inbuf.size() < constants::netcmd_maxsz); char tmp[constants::bufsz]; - Netxx::signed_size_type count = str.read(tmp, sizeof(tmp)); + Netxx::signed_size_type count = str->read(tmp, sizeof(tmp)); if (count > 0) { - L(FL("read %d bytes from fd %d (peer %s)\n") % count % fd % peer_id); + L(FL("read %d bytes from fd %d (peer %s)\n") % count % str->get_socketfd() % peer_id); if (encountered_error) { L(FL("in error unwind mode, so throwing them into the bit bucket\n")); @@ -974,7 +969,7 @@ { I(!outbuf.empty()); size_t writelen = outbuf.front().first.size() - outbuf.front().second; - Netxx::signed_size_type count = str.write(outbuf.front().first.data() + outbuf.front().second, + Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second, std::min(writelen, constants::bufsz)); if (count > 0) @@ -989,7 +984,7 @@ outbuf.front().second += count; } L(FL("wrote %d bytes to fd %d (peer %s)\n") - % count % fd % peer_id); + % count % str->get_socketfd() % peer_id); mark_recent_io(); if (byte_out_ticker.get() != NULL) (*byte_out_ticker) += count; @@ -2252,6 +2247,159 @@ } +static bool +parse_ssh_url(const std::string & address, + std::string & host, + std::string & user, + std::string & port, + std::string & dbpath) +{ + std::string::size_type + wordbegin = 0, + wordend = std::string::npos; + + if (address.size() >= 2 && + address.substr(wordbegin, 2) == "//") + wordbegin += 2; + + wordend = address.find_first_of("@:/", wordbegin); + + if (wordend == string::npos) + return false; + + if (address.at(wordend) == '@') + { + user = address.substr(wordbegin, wordend - wordbegin); + wordbegin = wordend + 1; + wordend = address.find_first_of(":/", wordbegin); + if (wordend == string::npos) + return false; + } + + if (address.at(wordend) == ':') + { + host = address.substr(wordbegin, wordend-wordbegin); + wordbegin = wordend + 1; + wordend = address.find_first_of("/", wordbegin); + if (wordend == string::npos) + return false; + } + + if (address.at(wordend) != '/') + return false; + + if (wordbegin == wordend) + return false; // empty port/host + + if (host.empty()) + host = address.substr(wordbegin, wordend-wordbegin); + else + port = address.substr(wordbegin, wordend-wordbegin); + + dbpath = address.substr(wordend); // with leading '/' ! + return true; +} + +static shared_ptr +build_stream_to_server(utf8 const & include_pattern, + utf8 const & exclude_pattern, + utf8 const & address, + Netxx::port_type default_port, + Netxx::Timeout timeout) +{ + shared_ptr server; + + if (address().size() > 5 && + address().substr(0,5)=="file:") + { + std::vector args; + std::string db_path = address().substr(5); + if (global_sanity.debug) + args.push_back("--debug"); + else + args.push_back("--quiet"); + + args.push_back("--db"); + args.push_back(db_path); + + if (exclude_pattern().size()) + { + args.push_back("--exclude"); + args.push_back(exclude_pattern()); + } + + args.push_back("serve"); + args.push_back("--stdio"); + args.push_back(include_pattern()); + + return shared_ptr + (new Netxx::PipeStream("mtn", args)); + } + + else if (address().size() > 4 && + address().substr(0,4)=="ssh:") + { + std::vector args; + std::string user, host, port, db_path; + + if (!parse_ssh_url(address().substr(4), + host, user, port, db_path)) + { + N(false, + F("url %s is not of form " + "ssh:[//address@hidden:port/dbpath\n") % address()); + } + + if (!port.empty()) + { + args.push_back("-p"); + args.push_back(port); + } + + if (!user.empty()) + { + args.push_back("-l"); + args.push_back(user); + } + + args.push_back(host); + args.push_back("mtn"); + + if (global_sanity.debug) + args.push_back("--debug"); + else + args.push_back("--quiet"); + + args.push_back("--db"); + args.push_back(db_path); + + if (exclude_pattern().size()) + { + args.push_back("--exclude"); + args.push_back(exclude_pattern()); + } + + args.push_back("--stdio"); + args.push_back("serve"); + args.push_back(include_pattern()); + + return shared_ptr + (new Netxx::PipeStream("ssh", args)); + } + else + { +#ifdef USE_IPV6 + bool use_ipv6=true; +#else + bool use_ipv6=false; +#endif + Netxx::Address addr(address().c_str(), + default_port, use_ipv6); + return shared_ptr + (new Netxx::Stream(addr, timeout)); + } +} + static void call_server(protocol_role role, utf8 const & include_pattern, @@ -2261,30 +2409,31 @@ Netxx::port_type default_port, unsigned long timeout_seconds) { - + Netxx::PipeCompatibleProbe probe; transaction_guard guard(app.db); - Netxx::Probe probe; Netxx::Timeout timeout(static_cast(timeout_seconds)), instant(0,1); -#ifdef USE_IPV6 - bool use_ipv6=true; -#else - bool use_ipv6=false; -#endif // FIXME: split into labels and convert to ace here. P(F("connecting to %s\n") % address()); - Netxx::Address addr(address().c_str(), default_port, use_ipv6); - Netxx::Stream server(addr, timeout); + shared_ptr server + = build_stream_to_server(include_pattern, + exclude_pattern, + address, default_port, + timeout); + + // 'false' here means not to revert changes when the SockOpt // goes out of scope. - Netxx::SockOpt socket_options(server.get_socketfd(), false); + Netxx::SockOpt socket_options(server->get_socketfd(), false); socket_options.set_non_blocking(); - session sess(role, client_voice, include_pattern, exclude_pattern, - app, address(), server.get_socketfd(), timeout); + session sess(role, client_voice, + include_pattern, + exclude_pattern, + app, address(), server); while (true) { @@ -2303,7 +2452,7 @@ sess.maybe_say_goodbye(guard); probe.clear(); - probe.add(sess.str, sess.which_events()); + probe.add(*(sess.str), sess.which_events()); Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout); Netxx::Probe::ready_type event = res.second; Netxx::socket_type fd = res.first; @@ -2315,20 +2464,13 @@ % sess.peer_id)); } - bool all_io_clean = true; + bool all_io_clean = (event != Netxx::Probe::ready_oobd); if (event & Netxx::Probe::ready_read) all_io_clean = all_io_clean && sess.read_some(); if (event & Netxx::Probe::ready_write) all_io_clean = all_io_clean && sess.write_some(); - - if (event & Netxx::Probe::ready_oobd) - { - E(false, (F("got OOB data from " - "peer %s, disconnecting\n") - % sess.peer_id)); - } if (armed) if (!sess.process(guard)) @@ -2376,7 +2518,7 @@ } static void -arm_sessions_and_calculate_probe(Netxx::Probe & probe, +arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe, map > & sessions, set & armed_sessions) { @@ -2393,7 +2535,7 @@ L(FL("fd %d is armed\n") % i->first); armed_sessions.insert(i->first); } - probe.add(i->second->str, i->second->which_events()); + probe.add(*i->second->str, i->second->which_events()); } catch (bad_decode & bd) { @@ -2436,12 +2578,15 @@ // goes out of scope. Netxx::SockOpt socket_options(client.get_socketfd(), false); socket_options.set_non_blocking(); + + shared_ptr str = + shared_ptr + (new Netxx::Stream(client.get_socketfd(), timeout)); shared_ptr sess(new session(role, server_voice, include_pattern, exclude_pattern, app, - lexical_cast(client), - client.get_socketfd(), timeout)); + lexical_cast(client), str)); sess->begin_service(); sessions.insert(make_pair(client.get_socketfd(), sess)); } @@ -2588,7 +2733,7 @@ unsigned long timeout_seconds, unsigned long session_limit) { - Netxx::Probe probe; + Netxx::PipeCompatibleProbe probe; Netxx::Timeout forever, @@ -2761,6 +2906,92 @@ } } while(try_again); + } + +static void +serve_single_connection(shared_ptr sess, + unsigned long timeout_seconds) +{ + Netxx::PipeCompatibleProbe probe; + + Netxx::Timeout + forever, + timeout(static_cast(timeout_seconds)), + instant(0,1); + + P(F("beginning service on %s\n") % sess->peer_id); + + sess->begin_service(); + + transaction_guard guard(sess->app.db); + + map > sessions; + set armed_sessions; + + if (sess->str->get_socketfd() == -1) + { + // Unix pipes are non-duplex, have two filedescriptors + Netxx::PipeStream *pipe=dynamic_cast(&*sess->str); + if (pipe) + { + sessions[pipe->get_writefd()]=sess; + sessions[pipe->get_readfd()]=sess; + } + } + else + sessions[sess->str->get_socketfd()]=sess; + + while (!sessions.empty()) + { + probe.clear(); + armed_sessions.clear(); + + arm_sessions_and_calculate_probe(probe, sessions, armed_sessions); + + L(FL("i/o probe with %d armed\n") % armed_sessions.size()); + Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout + : instant)); + Netxx::Probe::ready_type event = res.second; + Netxx::socket_type fd = res.first; + + if (fd == -1) + { + if (armed_sessions.empty()) + L(FL("timed out waiting for I/O (listening on %s)\n") + % sess->peer_id); + } + + // an existing session woke up + else + { + map >::iterator i; + i = sessions.find(fd); + if (i == sessions.end()) + { + L(FL("got woken up for action on unknown fd %d\n") % fd); + } + else + { + shared_ptr sess = i->second; + bool live_p = true; + + if (event & Netxx::Probe::ready_read) + handle_read_available(fd, sess, sessions, armed_sessions, live_p); + + if (live_p && (event & Netxx::Probe::ready_write)) + handle_write_available(fd, sess, sessions, live_p); + + if (live_p && (event & Netxx::Probe::ready_oobd)) + { + P(F("got some OOB data on fd %d (peer %s), disconnecting\n") + % fd % sess->peer_id); + sessions.erase(i); + } + } + } + process_armed_sessions(sessions, armed_sessions, guard); + reap_dead_sessions(sessions, timeout_seconds); + } } @@ -2971,10 +3202,19 @@ { if (voice == server_voice) { - serve_connections(role, include_pattern, exclude_pattern, app, - addr, static_cast(constants::netsync_default_port), - static_cast(constants::netsync_timeout_seconds), - static_cast(constants::netsync_connection_limit)); + if (app.bind_stdio) + { + shared_ptr str(new Netxx::PipeStream(0,1)); + shared_ptr sess(new session(role, server_voice, + include_pattern, exclude_pattern, + app, "stdio", str)); + serve_single_connection(sess,constants::netsync_timeout_seconds); + } + else + serve_connections(role, include_pattern, exclude_pattern, app, + addr, static_cast(constants::netsync_default_port), + static_cast(constants::netsync_timeout_seconds), + static_cast(constants::netsync_connection_limit)); } else { ============================================================ --- netxx/probe.h ab8ffcf0b226a90c7f79c56a7d41346c23b0d664 +++ netxx/probe.h 7dbd28f284875bb3af6e59db3be0fb9378f8e7bb @@ -48,6 +48,8 @@ namespace Netxx { +class PipeCompatibleProbe; + /** * The Netxx::Probe class is a wrapper around one of the Netxx probe * classes. The reason that we have a wrapper is because most operating @@ -55,6 +57,11 @@ * kqueue(2) or /dev/poll. **/ class Probe { + /* + * Probe has no public way to select read only and write only sockets + * needed for probing pipes, so grant PipeCompatibleProbe to use add_socket + */ + friend class PipeCompatibleProbe; public: /* * Bitmask for telling Probe exactly what you want and for testing the ============================================================ --- options.hh db7002b03fc93c0456a360c5cc212c6b796950c9 +++ options.hh 6ea8ce2a5b90b4bd1fffc77df22160eec911e985 @@ -54,3 +54,4 @@ #define OPT_NO_FILES 45 #define OPT_LOG 46 #define OPT_RECURSIVE 47 +#define OPT_STDIO 48 ============================================================ --- platform.hh 02cce907829ab54b8a3a6e4338913f33ddb826d7 +++ platform.hh 4f4eca4a2cb78f93f2c202231951d182237914ca @@ -31,6 +31,9 @@ // stop "\n"->"\r\n" from breaking automate on Windows void make_io_binary(); +#ifdef WIN32 +std::string munge_argv_into_cmdline(const char* const argv[]); +#endif // for term selection bool have_smart_terminal(); // this function cannot call W/P/L, because it is called by the tick printing ============================================================ --- unit_tests.cc a9f5606a66179106bd461d4a34d8444e142a8ad2 +++ unit_tests.cc 4b2815d5146151bd9263997f2d3b4760047e9ac2 @@ -80,6 +80,8 @@ if (t.empty() || t.find("crypto") != t.end()) add_crypto_tests(suite); + if (t.empty() || t.find("pipe") != t.end()) + add_pipe_tests(suite); if (t.empty() || t.find("string_queue") != t.end()) add_string_queue_tests(suite); ============================================================ --- unit_tests.hh 1f0dbf6a811b8bcb25b0a0071239cf7809867037 +++ unit_tests.hh dca619b7a7c6cda898fa0fc98e0bc906b746a614 @@ -34,6 +34,7 @@ void add_globish_tests(test_suite * suite); void add_crypto_tests(test_suite * suite); void add_string_queue_tests(test_suite * suite); +void add_pipe_tests(test_suite * suite); void add_paths_tests(test_suite * suite); void add_roster_tests(test_suite * suite); void add_roster_merge_tests(test_suite * suite); ============================================================ --- visualc/monotone.vcproj c27688d937c7336023ce0d6136e83ba0e65797cc +++ visualc/monotone.vcproj e52b7ab761938efb46bf8692f961b2a7a7314158 @@ -320,10 +320,6 @@ > - - + + @@ -402,6 +402,10 @@ > + + + + ============================================================ --- win32/process.cc 79552c8593362854390fe3560b13ab682a31c434 +++ win32/process.cc a39d79c6786719f2f126f3059020150749ca80ec @@ -76,7 +76,7 @@ return munge_inner_argument(arg); } -static std::string munge_argv_into_cmdline(const char* const argv[]) +std::string munge_argv_into_cmdline(const char* const argv[]) { std::string cmdline;