# # # patch "netxx/probe.h" # from [c90c28bd503c99be473993312dc8e595a8fb13fa] # to [b0fee59ffe0278c9a5a764ea53cfd21acda4d2b8] # # patch "netxx_pipe.cc" # from [c737854c6aca119d29cfdb4ec6a33a9c9c070922] # to [6517b7610819328a1a4062fe419f9438968267e6] # # patch "netxx_pipe.hh" # from [3bd86ee5c96447657c531ea6383c63d543a8ace8] # to [2086562743c62a48b6062c2e7ece72ad89d49872] # # patch "netxx_pipe_stdio_main.cc" # from [74b05a9fe5dfa0a1a0e7f816b13de0cdb241c6f1] # to [c2233bd6294a7c92d2e41fc6fad15157862a5f87] # # patch "platform.hh" # from [aa3302a71c7da264137d9b154902763fc4c3f349] # to [c10818ff8131c08c9d23f11fb5d1914140c3a40a] # # patch "win32/process.cc" # from [097e18737531f5b702b62c55a7b98899bab4f7ac] # to [9a58e8a4fbbc41f44d83d3bce65434a155a2f7a3] # ============================================================ --- netxx/probe.h c90c28bd503c99be473993312dc8e595a8fb13fa +++ netxx/probe.h b0fee59ffe0278c9a5a764ea53cfd21acda4d2b8 @@ -48,8 +48,6 @@ namespace Netxx { namespace Netxx { -class PipeCompatibleProbe; - /** * The Netxx::Probe class is a wrapper around one of the Netxx probe * classes. The reason that we have a wrapper is because most operating @@ -58,7 +56,7 @@ class Probe { **/ class Probe { /* - * Probe has no public way to probe stdio/stdin, so grant + * Probe has no public way to probe stdio/stdin, so allow * StdioProbe to use add_socket */ friend class StdioProbe; ============================================================ --- netxx_pipe.cc c737854c6aca119d29cfdb4ec6a33a9c9c070922 +++ netxx_pipe.cc 6517b7610819328a1a4062fe419f9438968267e6 @@ -12,6 +12,7 @@ #include #include "sanity.hh" #include "platform.hh" +#include #include #include // for operator<< @@ -45,9 +46,10 @@ Netxx::StdioStream::StdioStream(void) writefd (stdout) #endif { - // This allows netxx to call select() on these file descriptors. Unless - // they are actually a socket (ie, we are spawned from 'mtn sync - // file:...'), this will fail on Win32. + // This allows netxx to call select() on these file descriptors. On Win32, + // this will fail unless they are actually a socket (ie, we are spawned + // from 'mtn sync file:...', or some other program that sets stdin, stdout + // to a socket). probe_info.add_socket (readfd); probe_info.add_socket (writefd); @@ -58,25 +60,125 @@ Netxx::StdioStream::StdioStream(void) L(FL("failed to load WinSock")); } - if (_setmode(readfd, _O_BINARY) == -1) - L(FL("failed to set input file descriptor to binary")); - - if (_setmode(writefd, _O_BINARY) == -1) - L(FL("failed to set output file descriptor to binary")); - #endif } Netxx::signed_size_type Netxx::StdioStream::read (void *buffer, size_type length) { - return ::read(readfd, buffer, length); + // based on netxx/socket.cxx read + signed_size_type rc; + char *buffer_ptr = static_cast(buffer); + + for (;;) + { +#ifdef WIN32 + // readfd must be a socket, and 'read' doesn't work on sockets + rc = recv(readfd, buffer_ptr, length, 0); +#else + // On Unix, this works for sockets as well as files and pipes. + rc = ::read(readfd, buffer, length); +#endif + if (rc < 0) + { + error_type error_code = get_last_error(); + if (error_code == EWOULDBLOCK) error_code = EAGAIN; + + switch (error_code) { + case ECONNRESET: + return 0; + + case EINTR: + continue; + + case EAGAIN: + return -1; + +#ifdef WIN32 + + case WSAEMSGSIZE: + return length; + + case WSAENETRESET: + case WSAESHUTDOWN: + case WSAECONNABORTED: + case WSAETIMEDOUT: // timed out shouldn't happen + return 0; + +#endif + + default: + { + std::string error("recv failure: "); + error += str_error(error_code); + throw Exception(error); + } + } + } + + break; + } + + return rc; } Netxx::signed_size_type Netxx::StdioStream::write (const void *buffer, size_type length) { - return ::write(writefd, buffer, length); + // based on netxx/socket.cxx write + const char *buffer_ptr = static_cast(buffer); + signed_size_type rc, bytes_written=0; + + while (length) + { + +#ifdef WIN32 + // writefd must be a socket, and 'write' doesn't work on sockets + rc = send(writefd, buffer_ptr, length, 0); +#else + // On Unix, this works for sockets as well as files and pipes. + rc = ::write(writefd, buffer, length); +#endif + if (rc < 0) + { + Netxx::error_type error_code = get_last_error(); + if (error_code == EWOULDBLOCK) error_code = EAGAIN; + + switch (error_code) { + case EPIPE: + case ECONNRESET: + return 0; + + case EINTR: + continue; + + case EAGAIN: + return -1; + +#if defined(WIN32) + case WSAENETRESET: + case WSAESHUTDOWN: + case WSAEHOSTUNREACH: + case WSAECONNABORTED: + case WSAETIMEDOUT: + return 0; +#endif + + default: + { + std::string error("send failed: "); + error += str_error(error_code); + throw Exception(error); + } + } + } + + buffer_ptr += rc; + bytes_written += rc; + length -= rc; + } + + return bytes_written; } void @@ -88,7 +190,7 @@ Netxx::StdioStream::get_socketfd (void) Netxx::socket_type Netxx::StdioStream::get_socketfd (void) const { - return writefd; // only used to register session + return writefd; // only used to register session in netsync } const Netxx::ProbeInfo* @@ -97,17 +199,19 @@ Netxx::StdioStream::get_probe_info (void return &probe_info; } -#ifdef WIN32 -static string -err_msg() +void +Netxx::StdioStream::set_socketfd (socket_type sock) { - char buf[1024]; - I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, - NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), - (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0); - return string(buf); + readfd = sock; + writefd = sock; + + probe_info.clear(); + probe_info.add_socket (readfd); + probe_info.add_socket (writefd); + + // We don't set binary on WIN32, because it is not necessary for send/recv + // on a socket. } -#endif Netxx::SpawnedStream::SpawnedStream (const string & cmd, const vector & args) : @@ -176,7 +280,7 @@ Netxx::SpawnedStream::SpawnedStream (con E(started, F("CreateProcess(%s,...) call failed: %s") - % cmdline % err_msg()); + % cmdline % win32_last_err_msg()); child = piProcInfo.hProcess; @@ -283,19 +387,100 @@ Netxx::StdioProbe::add(const StreamServe void Netxx::StdioProbe::add(const StreamServer &ss, ready_type rt) { - try - { - Probe::add(ss,rt); - } - catch (...) - { - I(0); // Should not be a StdioStream here - } + // Should not be a StdioStream here + Probe::add(ss,rt); } #ifdef BUILD_UNIT_TESTS #include "unit_tests.hh" +namespace Netxx +{ + class StdioStreamTest : public StdioStream + { + public: + StdioStreamTest (void) : StdioStream (1) {}; + void set_socket (socket_type sock); + }; +} + +void +Netxx::StdioStreamTest::set_socket (socket_type sock) +{ + this->set_socketfd (sock); +} + +UNIT_TEST(pipe, stdio_stream) +{ + Netxx::StdioStreamTest stream; + Netxx::StdioProbe probe; + Netxx::Probe::result_type probe_result; + Netxx::Timeout short_time(0,1000); + Netxx::Timeout timeout(2L); + + char write_buffer[2]; + char stream_read_buffer[2]; + char parent_read_buffer[2]; + Netxx::signed_size_type bytes_read; + Netxx::signed_size_type bytes_written; + + Netxx::socket_type socks[2]; + +#ifdef WIN32 + { + WSADATA wsdata; + if (WSAStartup(MAKEWORD(2,2), &wsdata) != 0) + L(FL("failed to load WinSock")); + } +#endif + + E(0 == dumb_socketpair (socks, 0), F("socketpair failed")); + + // Test StdioStream read and write + stream.set_socket (socks[0]); + + // test read time out + probe.clear(); + probe.add(stream, Netxx::Probe::ready_read); + probe_result = probe.ready(short_time); + I(probe_result.first == -1); // timeout + I(probe_result.second == Netxx::Probe::ready_none); + + // test StdioStream read + write_buffer[0] = 42; + write_buffer[1] = 43; + + bytes_written = ::send (socks[1], write_buffer, 2, 0); + I(bytes_written == 2); + + probe.clear(); + probe.add(stream, Netxx::Probe::ready_read); + probe_result = probe.ready(short_time); + I(probe_result.second == Netxx::Probe::ready_read); + I(probe_result.first == stream.get_socketfd()); + + bytes_read = stream.read (stream_read_buffer, sizeof(stream_read_buffer)); + I(bytes_read == 2); + I(stream_read_buffer[0] == 42); + I(stream_read_buffer[1] == 43); + + // test StdioStream write + probe.clear(); + probe.add(stream, Netxx::Probe::ready_write); + probe_result = probe.ready(short_time); + I(probe_result.second & Netxx::Probe::ready_write); + I(probe_result.first == stream.get_socketfd()); + + bytes_written = stream.write (write_buffer, 2); + I(bytes_written == 2); + + bytes_read = ::recv (socks[1], parent_read_buffer, sizeof(parent_read_buffer), 0); + + I(bytes_read == 2); + I(parent_read_buffer[0] == 42); + I(parent_read_buffer[1] == 43); +} + void unit_test_spawn (char *cmd) { try { @@ -319,13 +504,14 @@ void unit_test_spawn (char *cmd) I(res.second & Netxx::Probe::ready_write); I(res.first==spawned.get_socketfd()); - // try binary transparency + // test binary transparency, lots of cycles for (int c = 0; c < 256; ++c) { - char buf[1024]; - buf[0] = c; - buf[1] = 255 - c; - spawned.write(buf, 2); + char write_buf[1024]; + char read_buf[1024]; + write_buf[0] = c; + write_buf[1] = 255 - c; + spawned.write(write_buf, 2); string result; while (result.size() < 2) @@ -336,8 +522,8 @@ void unit_test_spawn (char *cmd) E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c); I(res.first == spawned.get_socketfd()); - int bytes = spawned.read(buf, sizeof(buf)); - result += string(buf, bytes); + int bytes = spawned.read(read_buf, sizeof(read_buf)); + result += string(read_buf, bytes); } I(result.size() == 2); I(static_cast(result[0]) == c); @@ -347,9 +533,7 @@ void unit_test_spawn (char *cmd) spawned.close(); } -catch (informative_failure &e) - // for some reason boost does not provide - // enough information + catch (informative_failure &e) { W(F("Failure %s") % e.what()); throw; ============================================================ --- netxx_pipe.hh 3bd86ee5c96447657c531ea6383c63d543a8ace8 +++ netxx_pipe.hh 2086562743c62a48b6062c2e7ece72ad89d49872 @@ -35,20 +35,27 @@ StdioStream. In 'mtn sync ssh:...' the local mtn spawns ssh, connecting to it via - SpawnedStream. On the server, ssh spawns 'mtn serve stdio', which uses + SpawnedStream. On the server, ssh spawns 'mtn serve --stdio', which uses StdioStream. We also need StdioProbe objects that work with StdioStream objects, since - Netxx::Probe doesn't. Netxx does not provide for child classes of Probe. - We handle this by having the netsync code create the appropriate Probe - object whenever it creates a StreamBase object. + Netxx::Probe doesn't. Netxx does not provide for child classes of Probe, + nor of ProbeInfo; none of the methods are virtual. We handle this by + having the netsync code always use a StdioProbe object when the StreamBase + object might be a StdioStream. StdioProbe acts like Probe unless the + stream is StdioStream. IMPROVEME: we only need a StdioProbe in + serve_single_connection; should fix Netxx to allow for derived classes. - We use socket pairs to implement these objects. On Unix and Win32, a + We use socket pairs to implement SpawnedStream. On Unix and Win32, a socket can serve as stdin and stdout for a spawned process. The sockets in the pair must be connected to each other; socketpair() does that nicely. + On Win32, select works on stdin only if it is actually a socket. We just + live with that restriction. It may mean Win32 can't be an ssh server for + mtn. + An earlier implementation (a single class named PipeStream) tried to use Win32 overlapped IO via named pipes on Win32, and Unix select with Unix pipes on unix, but we couldn't make it work, because the semantics of the @@ -60,6 +67,7 @@ namespace Netxx { class StdioProbe; class StreamServer; + class StdioStreamTest; class SpawnedStream : public StreamBase { @@ -104,6 +112,16 @@ namespace Netxx virtual void close (void); virtual socket_type get_socketfd (void) const; virtual const ProbeInfo* get_probe_info (void) const; + + private: + friend class StdioStreamTest; + // Unit test facilities + + // noop constructor + explicit StdioStream (int test) {}; + + // Set socket for unit testing + void set_socketfd (socket_type sock); }; struct StdioProbe : Probe ============================================================ --- netxx_pipe_stdio_main.cc 74b05a9fe5dfa0a1a0e7f816b13de0cdb241c6f1 +++ netxx_pipe_stdio_main.cc c2233bd6294a7c92d2e41fc6fad15157862a5f87 @@ -15,6 +15,7 @@ #include "base.hh" #include "netxx_pipe.hh" +#include "platform.hh" #include @@ -40,22 +41,13 @@ int main (int argc, char *argv[]) int main (int argc, char *argv[]) { - int fid = STDIN_FILENO; - global_sanity.initialize(argc, argv, 0); - // If an argument is given, it is a file to read instead of stdin, for debugging - if (argc == 2) - { - fprintf (stderr, "opening %s\n", argv[1]); - fid = open (argv[1], 0); - } - { - Netxx::StdioStream stream (fid, STDOUT_FILENO); + Netxx::StdioStream stream; Netxx::StdioProbe probe; Netxx::Probe::result_type probe_result; - Netxx::Timeout short_time(0,1000); + Netxx::Timeout timeout(0, 1000); char buffer[256]; Netxx::signed_size_type bytes_read; @@ -63,14 +55,27 @@ int main (int argc, char *argv[]) int quit = 0; probe.add (stream, Netxx::Probe::ready_read); + stream.set_timeout (timeout); - // Exit when ready returns none - // But that never happens when reading a file, so exit on a count in that case - for (i = 0; (!quit) && ((argc == 1) || (i < 100)); i++) + // Exit when ready times out; socket has been closed + for (;!quit;) try { - probe_result = probe.ready(short_time); + probe_result = probe.ready(timeout, Netxx::Probe::ready_read); + if (-1 == probe_result.first) + { + // timeout; assume we're running the probe:spawn_stdio unit test, and it's done (the socket closed) + quit = 1; + continue; + } + else if (stream.get_socketfd() != probe_result.first) + { + fprintf (stderr, "ready returned other socket\n"); + quit = 1; + continue; + } + switch (probe_result.second) { case Netxx::Probe::ready_none: @@ -79,13 +84,30 @@ int main (int argc, char *argv[]) case Netxx::Probe::ready_read: bytes_read = stream.read (buffer, sizeof (buffer)); - stream.write (buffer, bytes_read); + if (-1 == bytes_read) + { + fprintf (stderr, "read timed out\n"); + quit = 1; + } + else if (0 == bytes_read) + { + fprintf (stderr, "socket closed\n"); + quit = 1; + } + else + { + stream.write (buffer, bytes_read); + } break; case Netxx::Probe::ready_write: + fprintf (stderr, "ready write\n", bytes_read); + quit = 1; break; case Netxx::Probe::ready_oobd: + fprintf (stderr, "ready oobd\n", bytes_read); + quit = 1; break; } } @@ -97,9 +119,6 @@ int main (int argc, char *argv[]) stream.close(); - if (fid != STDIN_FILENO) - close (fid); - } return 1; } // end main ============================================================ --- platform.hh aa3302a71c7da264137d9b154902763fc4c3f349 +++ platform.hh c10818ff8131c08c9d23f11fb5d1914140c3a40a @@ -41,7 +41,10 @@ std::string munge_argv_into_cmdline(cons #ifdef WIN32 std::string munge_argv_into_cmdline(const char* const argv[]); + +std::string win32_last_err_msg(void); #endif + // for term selection bool have_smart_terminal(); // this function cannot call W/P/L, because it is called by the tick printing ============================================================ --- win32/process.cc 097e18737531f5b702b62c55a7b98899bab4f7ac +++ win32/process.cc 9a58e8a4fbbc41f44d83d3bce65434a155a2f7a3 @@ -92,6 +92,15 @@ std::string munge_argv_into_cmdline(cons return cmdline; } +std::string win32_last_err_msg(void) +{ + char buf[1024]; + I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, + NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0); + return std::string(buf); +} // last_err_msg + int existsonpath(const char *exe) { if (SearchPath(NULL, exe, ".exe", 0, NULL, NULL)==0) @@ -170,7 +179,7 @@ redir::redir(int which, char const * fil sa.nLength = sizeof(SECURITY_ATTRIBUTES); sa.lpSecurityDescriptor = 0; sa.bInheritHandle = true; - + file = CreateFile(filename, (which==0?GENERIC_READ:GENERIC_WRITE), FILE_SHARE_READ,