# # # add_file "unix/socketpair.c" # content [20cce52deb8de9cdc77a6aea47332319a7364778] # # add_file "win32/socketpair.c" # content [515facee7bc221dada5653b23751e3e133706b99] # # patch ".mtn-ignore" # from [8d76d2540f0c737a188b670a78eb969e0baa57a4] # to [05a89eb8ac4d4c87ee661d69c3a54e6d65da407e] # # patch "netsync.cc" # from [50b0ee5fa1808bdcf31ff87cf3c5716c9d2ad009] # to [6b00f6271a8eb7d72d3234ef738691fcbf6574c4] # # patch "netxx/probe.h" # from [7dbd28f284875bb3af6e59db3be0fb9378f8e7bb] # to [c90c28bd503c99be473993312dc8e595a8fb13fa] # # patch "netxx_pipe.cc" # from [3fa376f09fb6fe19cb49562d8cbe71b79b95eeb3] # to [d4195cd958d6f394b611aed34bd01754133cf482] # # patch "netxx_pipe.hh" # from [873bc7d111d59db057cba5ea5f9ce31ad7608e1f] # to [41d912dba5cf1c70862cff97b8e20bb199de2281] # # patch "platform.hh" # from [b5cb961e19daf9302d866c2f066d8338db7c79c3] # to [aa3302a71c7da264137d9b154902763fc4c3f349] # ============================================================ --- unix/socketpair.c 20cce52deb8de9cdc77a6aea47332319a7364778 +++ unix/socketpair.c 20cce52deb8de9cdc77a6aea47332319a7364778 @@ -0,0 +1,34 @@ +/* socketpair.c + * Copyright 2007 by Nathan C. Myers ; all rights reserved. + * This code is Free Software. It may be copied freely, in original or + * modified form, subject only to the restrictions that (1) the author is + * relieved from all responsibilities for any use for any purpose, and (2) + * this copyright notice must be retained, unchanged, in its entirety. If + * for any reason the author might be held responsible for any consequences + * of copying or use, license is withheld. + */ + +/* Changes: + * 2007-12-08: split into two files for monotone + + * 2007-04-25: + * preserve value of WSAGetLastError() on all error returns. + * 2007-04-22: (Thanks to Matthew Gregan ) + * s/EINVAL/WSAEINVAL/ fix trivial compile failure + * s/socket/WSASocket/ enable creation of sockets suitable as stdin/stdout + * of a child process. + * add argument make_overlapped + */ + +#include + +include +include + +int dumb_socketpair(int socks[2], int dummy) +{ + (void) dummy; + return socketpair(AF_LOCAL, SOCK_STREAM, 0, socks); +} + +/* end of file */ ============================================================ --- win32/socketpair.c 515facee7bc221dada5653b23751e3e133706b99 +++ win32/socketpair.c 515facee7bc221dada5653b23751e3e133706b99 @@ -0,0 +1,92 @@ +/* socketpair.c + * Copyright 2007 by Nathan C. Myers ; all rights reserved. + * This code is Free Software. It may be copied freely, in original or + * modified form, subject only to the restrictions that (1) the author is + * relieved from all responsibilities for any use for any purpose, and (2) + * this copyright notice must be retained, unchanged, in its entirety. If + * for any reason the author might be held responsible for any consequences + * of copying or use, license is withheld. + */ + +/* Changes: + * 2007-12-08: split into two files for monotone + + * 2007-04-25: + * preserve value of WSAGetLastError() on all error returns. + * 2007-04-22: (Thanks to Matthew Gregan ) + * s/EINVAL/WSAEINVAL/ fix trivial compile failure + * s/socket/WSASocket/ enable creation of sockets suitable as stdin/stdout + * of a child process. + * add argument make_overlapped + */ + +#include + +#include +#include +#include + +/* dumb_socketpair: + * If make_overlapped is nonzero, both sockets created will be usable for + * "overlapped" operations via WSASend etc. If make_overlapped is zero, + * socks[0] (only) will be usable with regular ReadFile etc., and thus + * suitable for use as stdin or stdout of a child process. Note that the + * sockets must be closed with closesocket() regardless. + */ + +int dumb_socketpair(signed int socks[2], int make_overlapped) +{ + struct sockaddr_in addr; + SOCKET listener; + int e; + int addrlen = sizeof(addr); + DWORD flags = (make_overlapped ? WSA_FLAG_OVERLAPPED : 0); + + if (socks == 0) { + WSASetLastError(WSAEINVAL); + return SOCKET_ERROR; + } + + socks[0] = socks[1] = INVALID_SOCKET; + if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) + return SOCKET_ERROR; + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(0x7f000001); + addr.sin_port = 0; + + e = bind(listener, (const struct sockaddr*) &addr, sizeof(addr)); + if (e == SOCKET_ERROR) { + e = WSAGetLastError(); + closesocket(listener); + WSASetLastError(e); + return SOCKET_ERROR; + } + e = getsockname(listener, (struct sockaddr*) &addr, &addrlen); + if (e == SOCKET_ERROR) { + e = WSAGetLastError(); + closesocket(listener); + WSASetLastError(e); + return SOCKET_ERROR; + } + + do { + if (listen(listener, 1) == SOCKET_ERROR) break; + if ((socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags)) + == INVALID_SOCKET) break; + if (connect(socks[0], (const struct sockaddr*) &addr, + sizeof(addr)) == SOCKET_ERROR) break; + if ((socks[1] = accept(listener, NULL, NULL)) + == INVALID_SOCKET) break; + closesocket(listener); + return 0; + } while (0); + e = WSAGetLastError(); + closesocket(listener); + closesocket(socks[0]); + closesocket(socks[1]); + WSASetLastError(e); + return SOCKET_ERROR; +} +/* end of file */ ============================================================ --- .mtn-ignore 8d76d2540f0c737a188b670a78eb969e0baa57a4 +++ .mtn-ignore 05a89eb8ac4d4c87ee661d69c3a54e6d65da407e @@ -1,11 +1,20 @@ tester_dir testsuite\.dir tester_dir +^testlib\.c$ +^tester_tests\.status$ +^test_hooks\.c$ +^std_hooks\.c$ +^schema\.c$ +^.*\.exe$ ^build$ ^build/ \.dirstamp ^txt2c$ -^unit_tests$ -^unit_tests\.log$ +^lua_tests\.status$ +^unit-tests$ +^unit-tests/ +^unit-tests\.log$ +^unit_tests\.status$ ^mtn$ ^testsuite$ ^testsuite\.log$ @@ -13,6 +22,7 @@ tester_dir ^tester\.h$ ^run_lua_tests$ ^run_tester_tests$ +^run_unit_tests$ ^package_full_revision(\.c|\.txt|_dist\.txt|_raw\.txt)$ ^package_revision_raw.txt$ ^config\. @@ -66,3 +76,5 @@ tester_dir ^m4/stdint_h\.m4$ ^m4/uintmax_t\.m4$ ^m4/ulonglong\.m4$ +^STATUS$ +^tester\.log$ ============================================================ --- netsync.cc 50b0ee5fa1808bdcf31ff87cf3c5716c9d2ad009 +++ netsync.cc 6b00f6271a8eb7d72d3234ef738691fcbf6574c4 @@ -1,4 +1,4 @@ -// Copyright (C) 2004 Graydon Hoare +// Copyright (C) 2004, 2007 Graydon Hoare // // This program is made available under the GNU GPL version 2.0 or // greater. See the accompanying file COPYING for details. @@ -2354,7 +2354,7 @@ build_stream_to_server(app_state & app, argv.erase(argv.begin()); app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u); return shared_ptr - (new Netxx::PipeStream(cmd, argv)); + (new Netxx::SpawnedStream(cmd, argv)); } else @@ -2380,7 +2380,7 @@ call_server(protocol_role role, Netxx::port_type default_port, unsigned long timeout_seconds) { - Netxx::PipeCompatibleProbe probe; + Netxx::Probe probe; transaction_guard guard(app.db); I(addresses.size() == 1); utf8 address(*addresses.begin()); @@ -2493,41 +2493,15 @@ drop_session_associated_with_fd(map > & sessions, Netxx::socket_type fd) { - // This is a bit of a hack. Initially all "file descriptors" in - // netsync were full duplex, so we could get away with indexing - // sessions by their file descriptor. - // - // When using pipes in unix, it's no longer true: a session gets - // entered in the session map under its read pipe fd *and* its write - // pipe fd. When we're in such a situation the socket fd is "-1" and - // we downcast to a PipeStream and use its read+write fds. - // - // When using pipes in windows, we use a full duplex pipe (named - // pipe) so the socket-like abstraction holds. - - I(fd != -1); map >::const_iterator i = sessions.find(fd); I(i != sessions.end()); shared_ptr sess = i->second; fd = sess->str->get_socketfd(); - if (fd != -1) - { - sessions.erase(fd); - } - else - { - shared_ptr pipe = - boost::dynamic_pointer_cast(sess->str); - I(static_cast(pipe)); - I(pipe->get_writefd() != -1); - I(pipe->get_readfd() != -1); - sessions.erase(pipe->get_readfd()); - sessions.erase(pipe->get_writefd()); - } + sessions.erase(fd); } static void -arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe, +arm_sessions_and_calculate_probe(Netxx::Probe & probe, map > & sessions, set & armed_sessions, transaction_guard & guard) @@ -2744,7 +2718,7 @@ serve_connections(protocol_role role, unsigned long timeout_seconds, unsigned long session_limit) { - Netxx::PipeCompatibleProbe probe; + Netxx::Probe probe; Netxx::Timeout forever, @@ -2779,7 +2753,7 @@ serve_connections(protocol_role role, { size_t l_colon = address().find(':'); size_t r_colon = address().rfind(':'); - + if (l_colon == r_colon && l_colon == 0) { // can't be an IPv6 address as there is only one colon @@ -2846,14 +2820,15 @@ serve_connections(protocol_role role, try { P(F("connecting to %s") % addr()); - shared_ptr server + shared_ptr other_server = build_stream_to_server(app, inc, exc, addr, default_port, timeout); + // 'false' here means not to revert changes when // the SockOpt goes out of scope. - Netxx::SockOpt socket_options(server->get_socketfd(), false); + Netxx::SockOpt socket_options(other_server->get_socketfd(), false); socket_options.set_non_blocking(); protocol_role role = source_and_sink_role; @@ -2866,9 +2841,9 @@ serve_connections(protocol_role role, shared_ptr sess(new session(role, client_voice, inc, exc, - app, addr(), server, true)); + app, addr(), other_server, true)); - sessions.insert(make_pair(server->get_socketfd(), sess)); + sessions.insert(make_pair(other_server->get_socketfd(), sess)); } catch (Netxx::NetworkException & e) { @@ -2989,10 +2964,17 @@ static void } static void -serve_single_connection(shared_ptr sess, +serve_single_connection(protocol_role role, + globish const & include_pattern, + globish const & exclude_pattern, + app_state & app, unsigned long timeout_seconds) { - Netxx::PipeCompatibleProbe probe; + shared_ptr str(new Netxx::StdioStream(0,1)); + shared_ptr sess (new session (role, server_voice, + include_pattern, exclude_pattern, + app, "stdio", str)); + Netxx::StdioProbe probe; Netxx::Timeout forever, @@ -3008,17 +2990,7 @@ serve_single_connection(shared_ptr > sessions; set armed_sessions; - if (sess->str->get_socketfd() == -1) - { - // Unix pipes are non-duplex, have two filedescriptors - shared_ptr pipe = - boost::dynamic_pointer_cast(sess->str); - I(pipe); - sessions[pipe->get_writefd()]=sess; - sessions[pipe->get_readfd()]=sess; - } - else - sessions[sess->str->get_socketfd()]=sess; + sessions[sess->str->get_socketfd()]=sess; while (!sessions.empty()) { @@ -3287,11 +3259,7 @@ run_netsync_protocol(protocol_voice voic { if (app.opts.bind_stdio) { - shared_ptr str(new Netxx::PipeStream(0,1)); - shared_ptr sess(new session(role, server_voice, - include_pattern, exclude_pattern, - app, "stdio", str)); - serve_single_connection(sess,constants::netsync_timeout_seconds); + serve_single_connection(role, include_pattern, exclude_pattern, app, constants::netsync_timeout_seconds); } else serve_connections(role, include_pattern, exclude_pattern, app, ============================================================ --- netxx/probe.h 7dbd28f284875bb3af6e59db3be0fb9378f8e7bb +++ netxx/probe.h c90c28bd503c99be473993312dc8e595a8fb13fa @@ -1,11 +1,11 @@ /* * Copyright (C) 2001-2004 Peter J Jones (address@hidden) * All Rights Reserved - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: - * + * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright @@ -15,7 +15,7 @@ * 3. Neither the name of the Author nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. - * + * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A @@ -57,11 +57,11 @@ class Probe { * kqueue(2) or /dev/poll. **/ class Probe { - /* - * Probe has no public way to select read only and write only sockets - * needed for probing pipes, so grant PipeCompatibleProbe to use add_socket + /* + * Probe has no public way to probe stdio/stdin, so grant + * StdioProbe to use add_socket */ - friend class PipeCompatibleProbe; + friend class StdioProbe; public: /* * Bitmask for telling Probe exactly what you want and for testing the @@ -81,7 +81,7 @@ public: typedef std::pair result_type; //#################################################################### - /** + /** * Construct a new Netxx::Probe object. * * @author Peter Jones @@ -90,7 +90,7 @@ public: Probe (void); //#################################################################### - /** + /** * Netxx::Probe copy constructor. * * @param other The other Probe object to copy from. @@ -100,7 +100,7 @@ public: Probe (const Probe &other); //#################################################################### - /** + /** * Netxx::Probe assignment operator. * * @param other The other Probe object to copy from. @@ -111,7 +111,7 @@ public: Probe& operator= (const Probe &other); //#################################################################### - /** + /** * Swap this Probe and another one. Similar to std::swap(). * * @param other The other Probe to swap with. @@ -121,7 +121,7 @@ public: void swap(Probe &other); //#################################################################### - /** + /** * Netxx::Probe destructor. * * @author Peter Jones @@ -130,7 +130,7 @@ public: ~Probe (void); //#################################################################### - /** + /** * Clear the Probe. All objects will be removed from the Probe and it * will be in a brand-new like state. * @@ -140,7 +140,7 @@ public: void clear (void); //#################################################################### - /** + /** * Preform the probe. This function will block until either some data is * ready or the given timeout expires. You may also supply a bitmask for * the type of data you want in this probe. @@ -155,7 +155,7 @@ public: result_type ready (const Timeout &timeout=Timeout(), ready_type rt=ready_none); //#################################################################### - /** + /** * Add an object to the Probe. The object must support the * Netxx::ProbeInfo class. All Netxx classes such as Stream and Datagram * support the ProbeInfo class. @@ -169,7 +169,7 @@ public: * @author Peter Jones **/ //#################################################################### - template void add (const T &t, ready_type rt=ready_none) + template void add (const T &t, ready_type rt=ready_none) { // implemented inline to work around bug in MSVC const ProbeInfo *pi = t.get_probe_info(); @@ -178,14 +178,14 @@ public: } //#################################################################### - /** + /** * Remove the given object from the Probe. * * @param t The object to remove from the Probe. * @author Peter Jones **/ //#################################################################### - template void remove (const T &t) + template void remove (const T &t) { // implemented inline to work around bug in MSVC const ProbeInfo *pi = t.get_probe_info(); ============================================================ --- netxx_pipe.cc 3fa376f09fb6fe19cb49562d8cbe71b79b95eeb3 +++ netxx_pipe.cc d4195cd958d6f394b611aed34bd01754133cf482 @@ -1,3 +1,4 @@ +// Copyright (C) 2007 Stephen Leake // Copyright (C) 2005 Christof Petig // // This program is made available under the GNU GPL version 2.0 or @@ -16,6 +17,7 @@ #ifdef WIN32 #include +#include #include #include #else @@ -32,36 +34,32 @@ using std::strerror; using std::perror; using std::strerror; -Netxx::PipeStream::PipeStream(int _readfd, int _writefd) +Netxx::StdioStream::StdioStream(int _readfd, int _writefd) : -#ifdef WIN32 - child(INVALID_HANDLE_VALUE), - bytes_available(0), - read_in_progress(false) -#else readfd(_readfd), - writefd(_writefd), - child(0) -#endif + writefd(_writefd) { + // This allows netxx to call select() on these file descriptors. Unless + // they are actually a socket (ie, we are spawned from 'mtn sync + // file:...'), this will fail (at least on Win32). + probe_info.add_socket (readfd); + probe_info.add_socket (writefd); + #ifdef WIN32 + { + WSADATA wsdata; + if (WSAStartup(MAKEWORD(2,2), &wsdata) != 0) + L(FL("failed to load WinSock")); + } + if (_setmode(_readfd, _O_BINARY) == -1) L(FL("failed to set input file descriptor to binary")); if (_setmode(_writefd, _O_BINARY) == -1) L(FL("failed to set output file descriptor to binary")); - named_pipe = (HANDLE)_get_osfhandle(_readfd); - - E(named_pipe != INVALID_HANDLE_VALUE, - F("pipe handle is invalid")); - - // Create infrastructure for overlapping I/O - memset(&overlap, 0, sizeof(overlap)); - overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0); - bytes_available = 0; - I(overlap.hEvent != 0); #else + // FIXME: do we need to set these non-blocking? int flags = fcntl(readfd, F_GETFL, 0); I(fcntl(readfd, F_SETFL, flags | O_NONBLOCK) != -1); flags = fcntl(writefd, F_GETFL, 0); @@ -69,67 +67,35 @@ Netxx::PipeStream::PipeStream(int _readf #endif } - -#ifndef WIN32 - -// Create pipes for stdio and fork subprocess, returns -1 on error, 0 -// to child and PID to parent. - -static pid_t -pipe_and_fork(int fd1[2], int fd2[2]) +Netxx::signed_size_type +Netxx::StdioStream::read (void *buffer, size_type length) { - pid_t result = -1; - fd1[0] = -1; - fd1[1] = -1; - fd2[0] = -1; - fd2[1] = -1; + return ::read(readfd, buffer, length); +} - if (pipe(fd1)) - return -1; +Netxx::signed_size_type +Netxx::StdioStream::write (const void *buffer, size_type length) +{ + return ::write(writefd, buffer, length); +} - if (pipe(fd2)) - { - close(fd1[0]); - close(fd1[1]); - return -1; - } +void +Netxx::StdioStream::close (void) +{ + // nothing to do here +} - result = fork(); +Netxx::socket_type +Netxx::StdioStream::get_socketfd (void) const +{ + return writefd; // only used to register session +} - if (result < 0) - { - close(fd1[0]); - close(fd1[1]); - close(fd2[0]); - close(fd2[1]); - return -1; - } - - else if (!result) - { - // fd1[1] for writing, fd2[0] for reading - close(fd1[0]); - close(fd2[1]); - if (dup2(fd2[0], 0) != 0 || - dup2(fd1[1], 1) != 1) - { - perror("dup2"); - exit(-1); // kill the useless child - } - close(fd1[1]); - close(fd2[0]); - } - - else - { - // fd1[0] for reading, fd2[1] for writing - close(fd1[1]); - close(fd2[0]); - } - - return result; +const Netxx::ProbeInfo* +Netxx::StdioStream::get_probe_info (void) const +{ + return &probe_info; } -#endif #ifdef WIN32 static string @@ -143,21 +109,17 @@ err_msg() } #endif - -Netxx::PipeStream::PipeStream (const string & cmd, - const vector & args) +Netxx::SpawnedStream::SpawnedStream (const string & cmd, const vector & args) : #ifdef WIN32 - child(INVALID_HANDLE_VALUE), - bytes_available(0), - read_in_progress(false) + child(INVALID_HANDLE_VALUE) #else - readfd(-1), - writefd(-1), - child(0) + child(-1) #endif { - // Unfortunately neither munge_argv_into_cmdline nor execvp do take + socket_type socks[2]; // 0 is for child, 1 is for parent + + // Unfortunately neither munge_argv_into_cmdline nor execvp take // a vector as argument. const unsigned newsize = 64; @@ -171,61 +133,26 @@ Netxx::PipeStream::PipeStream (const str newargv[newargc++] = i->c_str(); newargv[newargc] = 0; -#ifdef WIN32 + E(0 == dumb_socketpair (socks, 0), F("socketpair failed")); - // In order to use nonblocking i/o on windows, you must use named - // pipes and overlapped i/o. There is no other way, alas. + Child_Socket.set_socketfd (socks[0]); + Parent_Socket.set_socketfd (socks[1]); - static unsigned long serial = 0; - string pipename = (F("\\\\.\\pipe\\netxx_pipe_%ld_%d") - % GetCurrentProcessId() - % (++serial)).str(); + probe_info.add_socket (socks[1]); - // Create the parent's handle to the named pipe. +#ifdef WIN32 - named_pipe = CreateNamedPipe(pipename.c_str(), - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_WAIT, - 1, - sizeof(readbuf), - sizeof(readbuf), - 1000, - 0); - - E(named_pipe != INVALID_HANDLE_VALUE, - F("CreateNamedPipe(%s,...) call failed: %s") - % pipename % err_msg()); - - // Open the child's handle to the named pipe. - - SECURITY_ATTRIBUTES inherit; - memset(&inherit,0,sizeof inherit); - inherit.nLength=sizeof inherit; - inherit.bInheritHandle = TRUE; - - HANDLE hpipe = CreateFile(pipename.c_str(), - GENERIC_READ|GENERIC_WRITE, 0, - &inherit, - OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0); - - E(hpipe != INVALID_HANDLE_VALUE, - F("CreateFile(%s,...) call failed: %s") - % pipename % err_msg()); - - // Set up the child with the pipes as stdin/stdout and inheriting stderr. - PROCESS_INFORMATION piProcInfo; STARTUPINFO siStartInfo; memset(&piProcInfo, 0, sizeof(piProcInfo)); memset(&siStartInfo, 0, sizeof(siStartInfo)); - siStartInfo.cb = sizeof(siStartInfo); - siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2)); - siStartInfo.hStdOutput = hpipe; - siStartInfo.hStdInput = hpipe; - siStartInfo.dwFlags |= STARTF_USESTDHANDLES; + siStartInfo.cb = sizeof(siStartInfo); + siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2)); + siStartInfo.hStdOutput = (HANDLE)socks[0]; + siStartInfo.hStdInput = (HANDLE)socks[0]; + siStartInfo.dwFlags |= STARTF_USESTDHANDLES; string cmdline = munge_argv_into_cmdline(newargv); L(FL("Subprocess command line: '%s'") % cmdline); @@ -240,292 +167,112 @@ Netxx::PipeStream::PipeStream (const str NULL, // Current directory &siStartInfo, &piProcInfo); + + if (!started) + { + closesocket(socks[0]); + closesocket(socks[1]); + } + E(started, F("CreateProcess(%s,...) call failed: %s") % cmdline % err_msg()); child = piProcInfo.hProcess; - // create infrastructure for overlapping I/O +#else // !WIN32 - memset(&overlap, 0, sizeof(overlap)); - overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0); - bytes_available = 0; - I(overlap.hEvent != 0); + child = fork(); -#else // !WIN32 + if (child < 0) + { + // fork failed + socket(socks[0]); + socket(socks[1]); - int fd1[2], fd2[2]; - child = pipe_and_fork(fd1, fd2); - E(child >= 0, F("pipe/fork failed %s") % strerror(errno)); + E(started, F("fork failed %s") % strerror(errno)); + } + if (!child) { + // We are in the child process; run the command, then exit. + + socket_type old_stdio[2]; + + // Set the child socket as stdin and stdout. dup2 clobbers its first + // arg, so copy it first. + + old_stdio[0] = socks[0]; + old_stdio[1] = socks[0]; + + if (dup2(old_stdio[0], 0) != 0 || + dup2(old_stdio[1], 1) != 1) + { + // We don't have the mtn error handling infrastructure here. + perror("dup2 failed"); + exit(-1); + } + + // old_stdio now holds the file descriptors for our old stdin, stdout, so close them + close (old_stdio[0]); + close (old_stdio[1]); + execvp(newargv[0], const_cast(newargv)); perror(newargv[0]); exit(errno); } - readfd = fd1[0]; - writefd = fd2[1]; - fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK); -#endif + // else we are in the parent process; continue. - // P(F("mtn %d: set up i/o channels") - // % GetCurrentProcessId()); +#endif } -// Non blocking read. - Netxx::signed_size_type -Netxx::PipeStream::read (void *buffer, size_type length) +Netxx::SpawnedStream::read (void *buffer, size_type length) { -#ifdef WIN32 - - if (length > bytes_available) - length = bytes_available; - - if (length) - { - memcpy(buffer, readbuf, length); - if (length < bytes_available) - memmove(readbuf, readbuf+length, bytes_available-length); - bytes_available -= length; - } - - return length; -#else - return ::read(readfd, buffer, length); -#endif + return Parent_Socket.read (buffer, length, get_timeout()); } Netxx::signed_size_type -Netxx::PipeStream::write(const void *buffer, size_type length) +Netxx::SpawnedStream::write (const void *buffer, size_type length) { -#ifdef WIN32 - DWORD written = 0; - BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL); - E(ok, F("WriteFile call failed: %s") % err_msg()); -#else - size_t written = ::write(writefd, buffer, length); -#endif - return written; + return Parent_Socket.write (buffer, length, get_timeout()); } void -Netxx::PipeStream::close (void) +Netxx::SpawnedStream::close (void) { - -#ifdef WIN32 - if (named_pipe != INVALID_HANDLE_VALUE) - CloseHandle(named_pipe); - named_pipe = INVALID_HANDLE_VALUE; - - if (overlap.hEvent != INVALID_HANDLE_VALUE) - CloseHandle(overlap.hEvent); - overlap.hEvent = INVALID_HANDLE_VALUE; - - if (child != INVALID_HANDLE_VALUE) - WaitForSingleObject(child, INFINITE); - child = INVALID_HANDLE_VALUE; -#else - if (readfd != -1) - ::close(readfd); - readfd = -1; - - if (writefd != -1) - ::close(writefd); - writefd = -1; - - if (child) - while (waitpid(child,0,0) == -1 && errno == EINTR); - child = 0; -#endif + // We assume the child process has exited + Child_Socket.close(); + Parent_Socket.close(); } Netxx::socket_type -Netxx::PipeStream::get_socketfd (void) const +Netxx::SpawnedStream::get_socketfd (void) const { -#ifdef WIN32 - return (Netxx::socket_type) named_pipe; -#else - return Netxx::socket_type(-1); -#endif + return Parent_Socket.get_socketfd (); } const Netxx::ProbeInfo* -Netxx::PipeStream::get_probe_info (void) const +Netxx::SpawnedStream::get_probe_info (void) const { - return 0; + return &probe_info; } -#ifdef WIN32 - -static string -status_name(DWORD wstatus) -{ - switch (wstatus) { - case WAIT_TIMEOUT: return "WAIT_TIMEOUT"; - case WAIT_OBJECT_0: return "WAIT_OBJECT_0"; - case WAIT_FAILED: return "WAIT_FAILED"; - case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1"; - default: return "UNKNOWN"; - } -} - -Netxx::Probe::result_type -Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt) -{ - if (!is_pipe) - return Probe::ready(timeout, rt); - - // L(F("mtn %d: checking for i/o ready state") % GetCurrentProcessId()); - - if (rt == ready_none) - rt = ready_t; // remembered from add - - if (rt & ready_write) - { - return make_pair(pipe->get_socketfd(), ready_write); - } - - if (rt & ready_read) - { - if (pipe->bytes_available == 0) - { - // Issue an async request to fill our buffer. - BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf, - sizeof(pipe->readbuf), NULL, &pipe->overlap); - E(ok || GetLastError() == ERROR_IO_PENDING, - F("ReadFile call failed: %s") % err_msg()); - pipe->read_in_progress = true; - } - - if (pipe->read_in_progress) - { - I(pipe->bytes_available == 0); - - // Attempt to wait for the completion of the read-in-progress. - - int milliseconds = ((timeout.get_sec() * 1000) - + (timeout.get_usec() / 1000)); - - L(FL("WaitForSingleObject(,%d)") % milliseconds); - - DWORD wstatus = WAIT_FAILED; - - if (pipe->child != INVALID_HANDLE_VALUE) - { - - // We're a server; we're going to wait for the client to - // exit as well as the pipe read status, because - // apparently you don't find out about closed pipes - // during an overlapped read request (?) - - HANDLE handles[2]; - handles[0] = pipe->overlap.hEvent; - handles[1] = pipe->child; - - wstatus = WaitForMultipleObjects(2, - handles, - FALSE, - milliseconds); - - E(wstatus != WAIT_FAILED, - F("WaitForMultipleObjects call failed: %s") % err_msg()); - - if (wstatus == WAIT_OBJECT_0 + 1) - return make_pair(pipe->get_socketfd(), ready_oobd); - } - else - { - wstatus = WaitForSingleObject(pipe->overlap.hEvent, - milliseconds); - E(wstatus != WAIT_FAILED, - F("WaitForSingleObject call failed: %s") % err_msg()); - } - - if (wstatus == WAIT_TIMEOUT) - return make_pair(-1, ready_none); - - BOOL ok = GetOverlappedResult(pipe->named_pipe, - &pipe->overlap, - &pipe->bytes_available, - FALSE); - - if (ok) - { - // We completed our read. - pipe->read_in_progress = false; - } - else - { - // We did not complete our read. - E(GetLastError() == ERROR_IO_INCOMPLETE, - F("GetOverlappedResult call failed: %s") - % err_msg()); - } - } - - if (pipe->bytes_available != 0) - { - return make_pair(pipe->get_socketfd(), ready_read); - } - } - - return make_pair(pipe->get_socketfd(), ready_none); -} - void -Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt) -{ - assert(!is_pipe); - assert(!pipe); - is_pipe = true; - pipe = &ps; - ready_t = rt; -} - -void -Netxx::PipeCompatibleProbe::add(StreamBase const &sb, ready_type rt) -{ - // FIXME: This is *still* an unfortunate way of performing a - // downcast, though slightly less awful than the old way, which - // involved throwing an exception. - // - // Perhaps we should twiddle the caller-visible API. - - StreamBase const *sbp = &sb; - PipeStream const *psp = dynamic_cast(sbp); - if (psp) - add(const_cast(*psp),rt); - else - { - assert(!is_pipe); - Probe::add(sb,rt); - } -} - -void -Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt) -{ - assert(!is_pipe); - Probe::add(ss,rt); -} -#else // unix -void -Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt) +Netxx::StdioProbe::add(const StdioStream &ps, ready_type rt) { if (rt == ready_none || rt & ready_read) - add_socket(ps.get_readfd(), ready_read); + add_socket(ps.readfd, ready_read); if (rt == ready_none || rt & ready_write) - add_socket(ps.get_writefd(), ready_write); + add_socket(ps.writefd, ready_write); } void -Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt) +Netxx::StdioProbe::add(const StreamBase &sb, ready_type rt) { try { - add(const_cast(dynamic_cast(sb)),rt); + add(const_cast(dynamic_cast(sb)),rt); } catch (...) { @@ -533,41 +280,30 @@ Netxx::PipeCompatibleProbe::add(const St } } -void -Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt) -{ - Probe::add(ss,rt); -} -#endif - #ifdef BUILD_UNIT_TESTS #include "unit_tests.hh" -UNIT_TEST(pipe, simple_pipe) +UNIT_TEST(pipe, spawned) { try { - Netxx::PipeStream pipe("cat",vector()); + Netxx::SpawnedStream spawned("cat",vector()); string result; - Netxx::PipeCompatibleProbe probe; + Netxx::Probe probe; Netxx::Timeout timeout(2L), short_time(0,1000); // time out because no data is available probe.clear(); - probe.add(pipe, Netxx::Probe::ready_read); + probe.add(spawned, Netxx::Probe::ready_read); Netxx::Probe::result_type res = probe.ready(short_time); I(res.second==Netxx::Probe::ready_none); // write should be possible probe.clear(); - probe.add(pipe, Netxx::Probe::ready_write); + probe.add(spawned, Netxx::Probe::ready_write); res = probe.ready(short_time); I(res.second & Netxx::Probe::ready_write); -#ifdef WIN32 - I(res.first==pipe.get_socketfd()); -#else - I(res.first==pipe.get_writefd()); -#endif + I(res.first==spawned.get_socketfd()); // try binary transparency for (int c = 0; c < 256; ++c) @@ -575,21 +311,18 @@ UNIT_TEST(pipe, simple_pipe) char buf[1024]; buf[0] = c; buf[1] = 255 - c; - pipe.write(buf, 2); + spawned.write(buf, 2); string result; while (result.size() < 2) { // wait for data to arrive probe.clear(); - probe.add(pipe, Netxx::Probe::ready_read); + probe.add(spawned, Netxx::Probe::ready_read); res = probe.ready(timeout); E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c); -#ifdef WIN32 - I(res.first == pipe.get_socketfd()); -#else - I(res.first == pipe.get_readfd()); -#endif - int bytes = pipe.read(buf, sizeof(buf)); + I(res.first == spawned.get_socketfd()); + + int bytes = spawned.read(buf, sizeof(buf)); result += string(buf, bytes); } I(result.size() == 2); @@ -597,7 +330,7 @@ UNIT_TEST(pipe, simple_pipe) I(static_cast(result[1]) == 255 - c); } - pipe.close(); + spawned.close(); } catch (informative_failure &e) @@ -608,6 +341,9 @@ catch (informative_failure &e) throw; } } + +// FIXME: test StdioStream + #endif // Local Variables: ============================================================ --- netxx_pipe.hh 873bc7d111d59db057cba5ea5f9ce31ad7608e1f +++ netxx_pipe.hh 41d912dba5cf1c70862cff97b8e20bb199de2281 @@ -1,6 +1,7 @@ #ifndef __NETXX_PIPE_HH__ #define __NETXX_PIPE_HH__ +// Copyright (C) 2007 Stephen Leake // Copyright (C) 2005 Christof Petig // // This program is made available under the GNU GPL version 2.0 or @@ -18,118 +19,98 @@ #endif /* - What is this all for? + Here we provide children of Netxx::StreamBase that work with stdin/stdout + in two different ways. - If you want to transparently handle a pipe and a socket on unix and - windows you have to abstract some difficulties: + The use cases are 'mtn sync file:...', 'mtn sync ssh:...', and 'mtn serve + --stdio'. - - sockets have a single filedescriptor for reading and writing - pipes usually come in pairs (one for reading and one for writing) + The netsync code uses StreamBase objects to perform all communications + between the local and server mtns. - - process creation is different on unix and windows + In 'mtn sync file:...', the local mtn spawns the server directly as 'mtn + serve --stdio'. Thus the local mtn needs a StreamBase object that can + serve as the stdin/stdout of a spawned process; SpawnedStream. The server + mtn needs to construct a StreamBase object from the existing stdin/stdout; + StdioStream. - => so Netxx::PipeStream is a Netxx::StreamBase which abstracts two pipes to - and from an external command + In 'mtn sync ssh:...' the local mtn spawns ssh, connecting to it via + SpawnedStream. On the server, ssh spawns 'mtn serve stdio', which uses + StdioStream. - - windows can select on a socket but not on a pipe + We also need StdioProbe objects that work with StdioStream objects, since + Netxx::Probe doesn't. Netxx does not provide for child classes of Probe. + We handle this by having the netsync code create the appropriate Probe + object whenever it creates a StreamBase object. - => so Netxx::PipeCompatibleProbe is a Netxx::Probe like class which - _can_ handle pipes on windows (emulating select is difficult at best!) - (on unix Probe and PipeCompatibleProbe are nearly identical: with pipes - you should not select for both read and write on the same descriptor) + We use socket pairs to implement these objects. On Unix and Win32, a + socket can serve as stdin and stdout for a spawned process. + The sockets in the pair must be connected to each other; socketpair() does + that nicely. + + An earlier implementation (a single class named PipeStream) tried to use + Win32 overlapped IO via named pipes on Win32, and Unix select with Unix + pipes on unix, but we couldn't make it work, because the semantics of the + two implementations are too different. Now we always use socket select on + sockets. */ namespace Netxx { - class PipeCompatibleProbe; - class StreamServer; - class PipeStream : public StreamBase + class SpawnedStream : public StreamBase { + Socket Parent_Socket; + Socket Child_Socket; + ProbeInfo probe_info; #ifdef WIN32 - HANDLE named_pipe; - HANDLE child; - char readbuf[1024]; - DWORD bytes_available; - bool read_in_progress; - OVERLAPPED overlap; - friend class PipeCompatibleProbe; + HANDLE child; #else - int readfd, writefd; - int child; + pid_t child; #endif - public: - // do we need Timeout for symmetry with Stream? - explicit PipeStream (int readfd, int writefd); - explicit PipeStream (const std::string &cmd, const std::vector &args); - virtual ~PipeStream() { close(); } + explicit SpawnedStream (const std::string &cmd, const std::vector &args); + // Spawn a child process to run 'cmd args', connect its stdout and + // stdin to this object. + + virtual ~SpawnedStream() { close(); } virtual signed_size_type read (void *buffer, size_type length); virtual signed_size_type write (const void *buffer, size_type length); virtual void close (void); virtual socket_type get_socketfd (void) const; virtual const ProbeInfo* get_probe_info (void) const; - int get_readfd(void) const - { -#ifdef WIN32 - return -1; -#else - return readfd; -#endif - } - int get_writefd(void) const - { -#ifdef WIN32 - return -1; -#else - return writefd; -#endif - } }; -#ifdef WIN32 + class StdioProbe; - // This probe can either handle _one_ PipeStream or several network - // Streams so if !is_pipe this acts like a Probe. - class PipeCompatibleProbe : public Probe + class StdioStream : public StreamBase { - bool is_pipe; - // only meaningful if is_pipe is true - PipeStream *pipe; - ready_type ready_t; + friend class StdioProbe; + int readfd; + int writefd; + ProbeInfo probe_info; + public: - PipeCompatibleProbe() : is_pipe(), pipe(), ready_t() - {} - void clear() - { - if (is_pipe) - { - pipe=0; - is_pipe=false; - } - else - Probe::clear(); - } - // This function does all the hard work (emulating a select). - result_type ready(const Timeout &timeout=Timeout(), ready_type rt=ready_none); - void add(PipeStream &ps, ready_type rt=ready_none); - void add(const StreamBase &sb, ready_type rt=ready_none); - void add(const StreamServer &ss, ready_type rt=ready_none); + explicit StdioStream (int readfd, int writefd); + // Construct a Stream object from existing files; typically stdout + // and stdin of the current process. + + virtual ~StdioStream() { close(); } + virtual signed_size_type read (void *buffer, size_type length); + virtual signed_size_type write (const void *buffer, size_type length); + virtual void close (void); + virtual socket_type get_socketfd (void) const; + virtual const ProbeInfo* get_probe_info (void) const; }; -#else - // We only act specially if a PipeStream is added (directly or via - // the StreamBase parent reference). - struct PipeCompatibleProbe : Probe + struct StdioProbe : Probe { - void add(PipeStream &ps, ready_type rt=ready_none); + public: + void add(const StdioStream &ps, ready_type rt=ready_none); void add(const StreamBase &sb, ready_type rt=ready_none); - void add(const StreamServer &ss, ready_type rt=ready_none); }; -#endif - } // Local Variables: @@ -141,4 +122,3 @@ namespace Netxx // vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s: #endif // __NETXX_PIPE_HH__ - ============================================================ --- platform.hh b5cb961e19daf9302d866c2f066d8338db7c79c3 +++ platform.hh aa3302a71c7da264137d9b154902763fc4c3f349 @@ -1,7 +1,7 @@ #ifndef __PLATFORM_HH__ #define __PLATFORM_HH__ -// Copyright (C) 2002 Graydon Hoare +// Copyright (C) 2002, 2007 Graydon Hoare // // This program is made available under the GNU GPL version 2.0 or // greater. See the accompanying file COPYING for details. @@ -36,6 +36,9 @@ void make_io_binary(); // stop "\n"->"\r\n" from breaking automate on Windows void make_io_binary(); +// returns 0 for success, non-zero for error +extern "C" int dumb_socketpair(int socks[2], int make_overlapped); + #ifdef WIN32 std::string munge_argv_into_cmdline(const char* const argv[]); #endif @@ -80,7 +83,7 @@ public: // this time in the future?" bit in the hashed information. This bit // will change when we pass the future point, and trigger a re-check of // the file's contents. - // + // // This is, of course, still not perfect. There is no way to make our stat // atomic with the actual read of the file, so there's always a race condition // there. Additionally, this handling means that checkout will never actually