#
#
# add_file "unix/socketpair.c"
# content [20cce52deb8de9cdc77a6aea47332319a7364778]
#
# add_file "win32/socketpair.c"
# content [515facee7bc221dada5653b23751e3e133706b99]
#
# patch ".mtn-ignore"
# from [8d76d2540f0c737a188b670a78eb969e0baa57a4]
# to [05a89eb8ac4d4c87ee661d69c3a54e6d65da407e]
#
# patch "netsync.cc"
# from [50b0ee5fa1808bdcf31ff87cf3c5716c9d2ad009]
# to [6b00f6271a8eb7d72d3234ef738691fcbf6574c4]
#
# patch "netxx/probe.h"
# from [7dbd28f284875bb3af6e59db3be0fb9378f8e7bb]
# to [c90c28bd503c99be473993312dc8e595a8fb13fa]
#
# patch "netxx_pipe.cc"
# from [3fa376f09fb6fe19cb49562d8cbe71b79b95eeb3]
# to [d4195cd958d6f394b611aed34bd01754133cf482]
#
# patch "netxx_pipe.hh"
# from [873bc7d111d59db057cba5ea5f9ce31ad7608e1f]
# to [41d912dba5cf1c70862cff97b8e20bb199de2281]
#
# patch "platform.hh"
# from [b5cb961e19daf9302d866c2f066d8338db7c79c3]
# to [aa3302a71c7da264137d9b154902763fc4c3f349]
#
============================================================
--- unix/socketpair.c 20cce52deb8de9cdc77a6aea47332319a7364778
+++ unix/socketpair.c 20cce52deb8de9cdc77a6aea47332319a7364778
@@ -0,0 +1,34 @@
+/* socketpair.c
+ * Copyright 2007 by Nathan C. Myers
; all rights reserved.
+ * This code is Free Software. It may be copied freely, in original or
+ * modified form, subject only to the restrictions that (1) the author is
+ * relieved from all responsibilities for any use for any purpose, and (2)
+ * this copyright notice must be retained, unchanged, in its entirety. If
+ * for any reason the author might be held responsible for any consequences
+ * of copying or use, license is withheld.
+ */
+
+/* Changes:
+ * 2007-12-08: split into two files for monotone
+
+ * 2007-04-25:
+ * preserve value of WSAGetLastError() on all error returns.
+ * 2007-04-22: (Thanks to Matthew Gregan )
+ * s/EINVAL/WSAEINVAL/ fix trivial compile failure
+ * s/socket/WSASocket/ enable creation of sockets suitable as stdin/stdout
+ * of a child process.
+ * add argument make_overlapped
+ */
+
+#include
+
+include
+include
+
+int dumb_socketpair(int socks[2], int dummy)
+{
+ (void) dummy;
+ return socketpair(AF_LOCAL, SOCK_STREAM, 0, socks);
+}
+
+/* end of file */
============================================================
--- win32/socketpair.c 515facee7bc221dada5653b23751e3e133706b99
+++ win32/socketpair.c 515facee7bc221dada5653b23751e3e133706b99
@@ -0,0 +1,92 @@
+/* socketpair.c
+ * Copyright 2007 by Nathan C. Myers ; all rights reserved.
+ * This code is Free Software. It may be copied freely, in original or
+ * modified form, subject only to the restrictions that (1) the author is
+ * relieved from all responsibilities for any use for any purpose, and (2)
+ * this copyright notice must be retained, unchanged, in its entirety. If
+ * for any reason the author might be held responsible for any consequences
+ * of copying or use, license is withheld.
+ */
+
+/* Changes:
+ * 2007-12-08: split into two files for monotone
+
+ * 2007-04-25:
+ * preserve value of WSAGetLastError() on all error returns.
+ * 2007-04-22: (Thanks to Matthew Gregan )
+ * s/EINVAL/WSAEINVAL/ fix trivial compile failure
+ * s/socket/WSASocket/ enable creation of sockets suitable as stdin/stdout
+ * of a child process.
+ * add argument make_overlapped
+ */
+
+#include
+
+#include
+#include
+#include
+
+/* dumb_socketpair:
+ * If make_overlapped is nonzero, both sockets created will be usable for
+ * "overlapped" operations via WSASend etc. If make_overlapped is zero,
+ * socks[0] (only) will be usable with regular ReadFile etc., and thus
+ * suitable for use as stdin or stdout of a child process. Note that the
+ * sockets must be closed with closesocket() regardless.
+ */
+
+int dumb_socketpair(signed int socks[2], int make_overlapped)
+{
+ struct sockaddr_in addr;
+ SOCKET listener;
+ int e;
+ int addrlen = sizeof(addr);
+ DWORD flags = (make_overlapped ? WSA_FLAG_OVERLAPPED : 0);
+
+ if (socks == 0) {
+ WSASetLastError(WSAEINVAL);
+ return SOCKET_ERROR;
+ }
+
+ socks[0] = socks[1] = INVALID_SOCKET;
+ if ((listener = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET)
+ return SOCKET_ERROR;
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = htonl(0x7f000001);
+ addr.sin_port = 0;
+
+ e = bind(listener, (const struct sockaddr*) &addr, sizeof(addr));
+ if (e == SOCKET_ERROR) {
+ e = WSAGetLastError();
+ closesocket(listener);
+ WSASetLastError(e);
+ return SOCKET_ERROR;
+ }
+ e = getsockname(listener, (struct sockaddr*) &addr, &addrlen);
+ if (e == SOCKET_ERROR) {
+ e = WSAGetLastError();
+ closesocket(listener);
+ WSASetLastError(e);
+ return SOCKET_ERROR;
+ }
+
+ do {
+ if (listen(listener, 1) == SOCKET_ERROR) break;
+ if ((socks[0] = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, flags))
+ == INVALID_SOCKET) break;
+ if (connect(socks[0], (const struct sockaddr*) &addr,
+ sizeof(addr)) == SOCKET_ERROR) break;
+ if ((socks[1] = accept(listener, NULL, NULL))
+ == INVALID_SOCKET) break;
+ closesocket(listener);
+ return 0;
+ } while (0);
+ e = WSAGetLastError();
+ closesocket(listener);
+ closesocket(socks[0]);
+ closesocket(socks[1]);
+ WSASetLastError(e);
+ return SOCKET_ERROR;
+}
+/* end of file */
============================================================
--- .mtn-ignore 8d76d2540f0c737a188b670a78eb969e0baa57a4
+++ .mtn-ignore 05a89eb8ac4d4c87ee661d69c3a54e6d65da407e
@@ -1,11 +1,20 @@ tester_dir
testsuite\.dir
tester_dir
+^testlib\.c$
+^tester_tests\.status$
+^test_hooks\.c$
+^std_hooks\.c$
+^schema\.c$
+^.*\.exe$
^build$
^build/
\.dirstamp
^txt2c$
-^unit_tests$
-^unit_tests\.log$
+^lua_tests\.status$
+^unit-tests$
+^unit-tests/
+^unit-tests\.log$
+^unit_tests\.status$
^mtn$
^testsuite$
^testsuite\.log$
@@ -13,6 +22,7 @@ tester_dir
^tester\.h$
^run_lua_tests$
^run_tester_tests$
+^run_unit_tests$
^package_full_revision(\.c|\.txt|_dist\.txt|_raw\.txt)$
^package_revision_raw.txt$
^config\.
@@ -66,3 +76,5 @@ tester_dir
^m4/stdint_h\.m4$
^m4/uintmax_t\.m4$
^m4/ulonglong\.m4$
+^STATUS$
+^tester\.log$
============================================================
--- netsync.cc 50b0ee5fa1808bdcf31ff87cf3c5716c9d2ad009
+++ netsync.cc 6b00f6271a8eb7d72d3234ef738691fcbf6574c4
@@ -1,4 +1,4 @@
-// Copyright (C) 2004 Graydon Hoare
+// Copyright (C) 2004, 2007 Graydon Hoare
//
// This program is made available under the GNU GPL version 2.0 or
// greater. See the accompanying file COPYING for details.
@@ -2354,7 +2354,7 @@ build_stream_to_server(app_state & app,
argv.erase(argv.begin());
app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u);
return shared_ptr
- (new Netxx::PipeStream(cmd, argv));
+ (new Netxx::SpawnedStream(cmd, argv));
}
else
@@ -2380,7 +2380,7 @@ call_server(protocol_role role,
Netxx::port_type default_port,
unsigned long timeout_seconds)
{
- Netxx::PipeCompatibleProbe probe;
+ Netxx::Probe probe;
transaction_guard guard(app.db);
I(addresses.size() == 1);
utf8 address(*addresses.begin());
@@ -2493,41 +2493,15 @@ drop_session_associated_with_fd(map > & sessions,
Netxx::socket_type fd)
{
- // This is a bit of a hack. Initially all "file descriptors" in
- // netsync were full duplex, so we could get away with indexing
- // sessions by their file descriptor.
- //
- // When using pipes in unix, it's no longer true: a session gets
- // entered in the session map under its read pipe fd *and* its write
- // pipe fd. When we're in such a situation the socket fd is "-1" and
- // we downcast to a PipeStream and use its read+write fds.
- //
- // When using pipes in windows, we use a full duplex pipe (named
- // pipe) so the socket-like abstraction holds.
-
- I(fd != -1);
map >::const_iterator i = sessions.find(fd);
I(i != sessions.end());
shared_ptr sess = i->second;
fd = sess->str->get_socketfd();
- if (fd != -1)
- {
- sessions.erase(fd);
- }
- else
- {
- shared_ptr pipe =
- boost::dynamic_pointer_cast(sess->str);
- I(static_cast(pipe));
- I(pipe->get_writefd() != -1);
- I(pipe->get_readfd() != -1);
- sessions.erase(pipe->get_readfd());
- sessions.erase(pipe->get_writefd());
- }
+ sessions.erase(fd);
}
static void
-arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe,
+arm_sessions_and_calculate_probe(Netxx::Probe & probe,
map > & sessions,
set & armed_sessions,
transaction_guard & guard)
@@ -2744,7 +2718,7 @@ serve_connections(protocol_role role,
unsigned long timeout_seconds,
unsigned long session_limit)
{
- Netxx::PipeCompatibleProbe probe;
+ Netxx::Probe probe;
Netxx::Timeout
forever,
@@ -2779,7 +2753,7 @@ serve_connections(protocol_role role,
{
size_t l_colon = address().find(':');
size_t r_colon = address().rfind(':');
-
+
if (l_colon == r_colon && l_colon == 0)
{
// can't be an IPv6 address as there is only one colon
@@ -2846,14 +2820,15 @@ serve_connections(protocol_role role,
try
{
P(F("connecting to %s") % addr());
- shared_ptr server
+ shared_ptr other_server
= build_stream_to_server(app, inc, exc,
addr, default_port,
timeout);
+
// 'false' here means not to revert changes when
// the SockOpt goes out of scope.
- Netxx::SockOpt socket_options(server->get_socketfd(), false);
+ Netxx::SockOpt socket_options(other_server->get_socketfd(), false);
socket_options.set_non_blocking();
protocol_role role = source_and_sink_role;
@@ -2866,9 +2841,9 @@ serve_connections(protocol_role role,
shared_ptr sess(new session(role, client_voice,
inc, exc,
- app, addr(), server, true));
+ app, addr(), other_server, true));
- sessions.insert(make_pair(server->get_socketfd(), sess));
+ sessions.insert(make_pair(other_server->get_socketfd(), sess));
}
catch (Netxx::NetworkException & e)
{
@@ -2989,10 +2964,17 @@ static void
}
static void
-serve_single_connection(shared_ptr sess,
+serve_single_connection(protocol_role role,
+ globish const & include_pattern,
+ globish const & exclude_pattern,
+ app_state & app,
unsigned long timeout_seconds)
{
- Netxx::PipeCompatibleProbe probe;
+ shared_ptr str(new Netxx::StdioStream(0,1));
+ shared_ptr sess (new session (role, server_voice,
+ include_pattern, exclude_pattern,
+ app, "stdio", str));
+ Netxx::StdioProbe probe;
Netxx::Timeout
forever,
@@ -3008,17 +2990,7 @@ serve_single_connection(shared_ptr > sessions;
set armed_sessions;
- if (sess->str->get_socketfd() == -1)
- {
- // Unix pipes are non-duplex, have two filedescriptors
- shared_ptr pipe =
- boost::dynamic_pointer_cast(sess->str);
- I(pipe);
- sessions[pipe->get_writefd()]=sess;
- sessions[pipe->get_readfd()]=sess;
- }
- else
- sessions[sess->str->get_socketfd()]=sess;
+ sessions[sess->str->get_socketfd()]=sess;
while (!sessions.empty())
{
@@ -3287,11 +3259,7 @@ run_netsync_protocol(protocol_voice voic
{
if (app.opts.bind_stdio)
{
- shared_ptr str(new Netxx::PipeStream(0,1));
- shared_ptr sess(new session(role, server_voice,
- include_pattern, exclude_pattern,
- app, "stdio", str));
- serve_single_connection(sess,constants::netsync_timeout_seconds);
+ serve_single_connection(role, include_pattern, exclude_pattern, app, constants::netsync_timeout_seconds);
}
else
serve_connections(role, include_pattern, exclude_pattern, app,
============================================================
--- netxx/probe.h 7dbd28f284875bb3af6e59db3be0fb9378f8e7bb
+++ netxx/probe.h c90c28bd503c99be473993312dc8e595a8fb13fa
@@ -1,11 +1,11 @@
/*
* Copyright (C) 2001-2004 Peter J Jones (address@hidden)
* All Rights Reserved
- *
+ *
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
- *
+ *
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
@@ -15,7 +15,7 @@
* 3. Neither the name of the Author nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
- *
+ *
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS''
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
@@ -57,11 +57,11 @@ class Probe {
* kqueue(2) or /dev/poll.
**/
class Probe {
- /*
- * Probe has no public way to select read only and write only sockets
- * needed for probing pipes, so grant PipeCompatibleProbe to use add_socket
+ /*
+ * Probe has no public way to probe stdio/stdin, so grant
+ * StdioProbe to use add_socket
*/
- friend class PipeCompatibleProbe;
+ friend class StdioProbe;
public:
/*
* Bitmask for telling Probe exactly what you want and for testing the
@@ -81,7 +81,7 @@ public:
typedef std::pair result_type;
//####################################################################
- /**
+ /**
* Construct a new Netxx::Probe object.
*
* @author Peter Jones
@@ -90,7 +90,7 @@ public:
Probe (void);
//####################################################################
- /**
+ /**
* Netxx::Probe copy constructor.
*
* @param other The other Probe object to copy from.
@@ -100,7 +100,7 @@ public:
Probe (const Probe &other);
//####################################################################
- /**
+ /**
* Netxx::Probe assignment operator.
*
* @param other The other Probe object to copy from.
@@ -111,7 +111,7 @@ public:
Probe& operator= (const Probe &other);
//####################################################################
- /**
+ /**
* Swap this Probe and another one. Similar to std::swap().
*
* @param other The other Probe to swap with.
@@ -121,7 +121,7 @@ public:
void swap(Probe &other);
//####################################################################
- /**
+ /**
* Netxx::Probe destructor.
*
* @author Peter Jones
@@ -130,7 +130,7 @@ public:
~Probe (void);
//####################################################################
- /**
+ /**
* Clear the Probe. All objects will be removed from the Probe and it
* will be in a brand-new like state.
*
@@ -140,7 +140,7 @@ public:
void clear (void);
//####################################################################
- /**
+ /**
* Preform the probe. This function will block until either some data is
* ready or the given timeout expires. You may also supply a bitmask for
* the type of data you want in this probe.
@@ -155,7 +155,7 @@ public:
result_type ready (const Timeout &timeout=Timeout(), ready_type rt=ready_none);
//####################################################################
- /**
+ /**
* Add an object to the Probe. The object must support the
* Netxx::ProbeInfo class. All Netxx classes such as Stream and Datagram
* support the ProbeInfo class.
@@ -169,7 +169,7 @@ public:
* @author Peter Jones
**/
//####################################################################
- template void add (const T &t, ready_type rt=ready_none)
+ template void add (const T &t, ready_type rt=ready_none)
{
// implemented inline to work around bug in MSVC
const ProbeInfo *pi = t.get_probe_info();
@@ -178,14 +178,14 @@ public:
}
//####################################################################
- /**
+ /**
* Remove the given object from the Probe.
*
* @param t The object to remove from the Probe.
* @author Peter Jones
**/
//####################################################################
- template void remove (const T &t)
+ template void remove (const T &t)
{
// implemented inline to work around bug in MSVC
const ProbeInfo *pi = t.get_probe_info();
============================================================
--- netxx_pipe.cc 3fa376f09fb6fe19cb49562d8cbe71b79b95eeb3
+++ netxx_pipe.cc d4195cd958d6f394b611aed34bd01754133cf482
@@ -1,3 +1,4 @@
+// Copyright (C) 2007 Stephen Leake
// Copyright (C) 2005 Christof Petig
//
// This program is made available under the GNU GPL version 2.0 or
@@ -16,6 +17,7 @@
#ifdef WIN32
#include
+#include
#include
#include
#else
@@ -32,36 +34,32 @@ using std::strerror;
using std::perror;
using std::strerror;
-Netxx::PipeStream::PipeStream(int _readfd, int _writefd)
+Netxx::StdioStream::StdioStream(int _readfd, int _writefd)
:
-#ifdef WIN32
- child(INVALID_HANDLE_VALUE),
- bytes_available(0),
- read_in_progress(false)
-#else
readfd(_readfd),
- writefd(_writefd),
- child(0)
-#endif
+ writefd(_writefd)
{
+ // This allows netxx to call select() on these file descriptors. Unless
+ // they are actually a socket (ie, we are spawned from 'mtn sync
+ // file:...'), this will fail (at least on Win32).
+ probe_info.add_socket (readfd);
+ probe_info.add_socket (writefd);
+
#ifdef WIN32
+ {
+ WSADATA wsdata;
+ if (WSAStartup(MAKEWORD(2,2), &wsdata) != 0)
+ L(FL("failed to load WinSock"));
+ }
+
if (_setmode(_readfd, _O_BINARY) == -1)
L(FL("failed to set input file descriptor to binary"));
if (_setmode(_writefd, _O_BINARY) == -1)
L(FL("failed to set output file descriptor to binary"));
- named_pipe = (HANDLE)_get_osfhandle(_readfd);
-
- E(named_pipe != INVALID_HANDLE_VALUE,
- F("pipe handle is invalid"));
-
- // Create infrastructure for overlapping I/O
- memset(&overlap, 0, sizeof(overlap));
- overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
- bytes_available = 0;
- I(overlap.hEvent != 0);
#else
+ // FIXME: do we need to set these non-blocking?
int flags = fcntl(readfd, F_GETFL, 0);
I(fcntl(readfd, F_SETFL, flags | O_NONBLOCK) != -1);
flags = fcntl(writefd, F_GETFL, 0);
@@ -69,67 +67,35 @@ Netxx::PipeStream::PipeStream(int _readf
#endif
}
-
-#ifndef WIN32
-
-// Create pipes for stdio and fork subprocess, returns -1 on error, 0
-// to child and PID to parent.
-
-static pid_t
-pipe_and_fork(int fd1[2], int fd2[2])
+Netxx::signed_size_type
+Netxx::StdioStream::read (void *buffer, size_type length)
{
- pid_t result = -1;
- fd1[0] = -1;
- fd1[1] = -1;
- fd2[0] = -1;
- fd2[1] = -1;
+ return ::read(readfd, buffer, length);
+}
- if (pipe(fd1))
- return -1;
+Netxx::signed_size_type
+Netxx::StdioStream::write (const void *buffer, size_type length)
+{
+ return ::write(writefd, buffer, length);
+}
- if (pipe(fd2))
- {
- close(fd1[0]);
- close(fd1[1]);
- return -1;
- }
+void
+Netxx::StdioStream::close (void)
+{
+ // nothing to do here
+}
- result = fork();
+Netxx::socket_type
+Netxx::StdioStream::get_socketfd (void) const
+{
+ return writefd; // only used to register session
+}
- if (result < 0)
- {
- close(fd1[0]);
- close(fd1[1]);
- close(fd2[0]);
- close(fd2[1]);
- return -1;
- }
-
- else if (!result)
- {
- // fd1[1] for writing, fd2[0] for reading
- close(fd1[0]);
- close(fd2[1]);
- if (dup2(fd2[0], 0) != 0 ||
- dup2(fd1[1], 1) != 1)
- {
- perror("dup2");
- exit(-1); // kill the useless child
- }
- close(fd1[1]);
- close(fd2[0]);
- }
-
- else
- {
- // fd1[0] for reading, fd2[1] for writing
- close(fd1[1]);
- close(fd2[0]);
- }
-
- return result;
+const Netxx::ProbeInfo*
+Netxx::StdioStream::get_probe_info (void) const
+{
+ return &probe_info;
}
-#endif
#ifdef WIN32
static string
@@ -143,21 +109,17 @@ err_msg()
}
#endif
-
-Netxx::PipeStream::PipeStream (const string & cmd,
- const vector & args)
+Netxx::SpawnedStream::SpawnedStream (const string & cmd, const vector & args)
:
#ifdef WIN32
- child(INVALID_HANDLE_VALUE),
- bytes_available(0),
- read_in_progress(false)
+ child(INVALID_HANDLE_VALUE)
#else
- readfd(-1),
- writefd(-1),
- child(0)
+ child(-1)
#endif
{
- // Unfortunately neither munge_argv_into_cmdline nor execvp do take
+ socket_type socks[2]; // 0 is for child, 1 is for parent
+
+ // Unfortunately neither munge_argv_into_cmdline nor execvp take
// a vector as argument.
const unsigned newsize = 64;
@@ -171,61 +133,26 @@ Netxx::PipeStream::PipeStream (const str
newargv[newargc++] = i->c_str();
newargv[newargc] = 0;
-#ifdef WIN32
+ E(0 == dumb_socketpair (socks, 0), F("socketpair failed"));
- // In order to use nonblocking i/o on windows, you must use named
- // pipes and overlapped i/o. There is no other way, alas.
+ Child_Socket.set_socketfd (socks[0]);
+ Parent_Socket.set_socketfd (socks[1]);
- static unsigned long serial = 0;
- string pipename = (F("\\\\.\\pipe\\netxx_pipe_%ld_%d")
- % GetCurrentProcessId()
- % (++serial)).str();
+ probe_info.add_socket (socks[1]);
- // Create the parent's handle to the named pipe.
+#ifdef WIN32
- named_pipe = CreateNamedPipe(pipename.c_str(),
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_WAIT,
- 1,
- sizeof(readbuf),
- sizeof(readbuf),
- 1000,
- 0);
-
- E(named_pipe != INVALID_HANDLE_VALUE,
- F("CreateNamedPipe(%s,...) call failed: %s")
- % pipename % err_msg());
-
- // Open the child's handle to the named pipe.
-
- SECURITY_ATTRIBUTES inherit;
- memset(&inherit,0,sizeof inherit);
- inherit.nLength=sizeof inherit;
- inherit.bInheritHandle = TRUE;
-
- HANDLE hpipe = CreateFile(pipename.c_str(),
- GENERIC_READ|GENERIC_WRITE, 0,
- &inherit,
- OPEN_EXISTING,
- FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);
-
- E(hpipe != INVALID_HANDLE_VALUE,
- F("CreateFile(%s,...) call failed: %s")
- % pipename % err_msg());
-
- // Set up the child with the pipes as stdin/stdout and inheriting stderr.
-
PROCESS_INFORMATION piProcInfo;
STARTUPINFO siStartInfo;
memset(&piProcInfo, 0, sizeof(piProcInfo));
memset(&siStartInfo, 0, sizeof(siStartInfo));
- siStartInfo.cb = sizeof(siStartInfo);
- siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2));
- siStartInfo.hStdOutput = hpipe;
- siStartInfo.hStdInput = hpipe;
- siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
+ siStartInfo.cb = sizeof(siStartInfo);
+ siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2));
+ siStartInfo.hStdOutput = (HANDLE)socks[0];
+ siStartInfo.hStdInput = (HANDLE)socks[0];
+ siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
string cmdline = munge_argv_into_cmdline(newargv);
L(FL("Subprocess command line: '%s'") % cmdline);
@@ -240,292 +167,112 @@ Netxx::PipeStream::PipeStream (const str
NULL, // Current directory
&siStartInfo,
&piProcInfo);
+
+ if (!started)
+ {
+ closesocket(socks[0]);
+ closesocket(socks[1]);
+ }
+
E(started,
F("CreateProcess(%s,...) call failed: %s")
% cmdline % err_msg());
child = piProcInfo.hProcess;
- // create infrastructure for overlapping I/O
+#else // !WIN32
- memset(&overlap, 0, sizeof(overlap));
- overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
- bytes_available = 0;
- I(overlap.hEvent != 0);
+ child = fork();
-#else // !WIN32
+ if (child < 0)
+ {
+ // fork failed
+ socket(socks[0]);
+ socket(socks[1]);
- int fd1[2], fd2[2];
- child = pipe_and_fork(fd1, fd2);
- E(child >= 0, F("pipe/fork failed %s") % strerror(errno));
+ E(started, F("fork failed %s") % strerror(errno));
+ }
+
if (!child)
{
+ // We are in the child process; run the command, then exit.
+
+ socket_type old_stdio[2];
+
+ // Set the child socket as stdin and stdout. dup2 clobbers its first
+ // arg, so copy it first.
+
+ old_stdio[0] = socks[0];
+ old_stdio[1] = socks[0];
+
+ if (dup2(old_stdio[0], 0) != 0 ||
+ dup2(old_stdio[1], 1) != 1)
+ {
+ // We don't have the mtn error handling infrastructure here.
+ perror("dup2 failed");
+ exit(-1);
+ }
+
+ // old_stdio now holds the file descriptors for our old stdin, stdout, so close them
+ close (old_stdio[0]);
+ close (old_stdio[1]);
+
execvp(newargv[0], const_cast(newargv));
perror(newargv[0]);
exit(errno);
}
- readfd = fd1[0];
- writefd = fd2[1];
- fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK);
-#endif
+ // else we are in the parent process; continue.
- // P(F("mtn %d: set up i/o channels")
- // % GetCurrentProcessId());
+#endif
}
-// Non blocking read.
-
Netxx::signed_size_type
-Netxx::PipeStream::read (void *buffer, size_type length)
+Netxx::SpawnedStream::read (void *buffer, size_type length)
{
-#ifdef WIN32
-
- if (length > bytes_available)
- length = bytes_available;
-
- if (length)
- {
- memcpy(buffer, readbuf, length);
- if (length < bytes_available)
- memmove(readbuf, readbuf+length, bytes_available-length);
- bytes_available -= length;
- }
-
- return length;
-#else
- return ::read(readfd, buffer, length);
-#endif
+ return Parent_Socket.read (buffer, length, get_timeout());
}
Netxx::signed_size_type
-Netxx::PipeStream::write(const void *buffer, size_type length)
+Netxx::SpawnedStream::write (const void *buffer, size_type length)
{
-#ifdef WIN32
- DWORD written = 0;
- BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL);
- E(ok, F("WriteFile call failed: %s") % err_msg());
-#else
- size_t written = ::write(writefd, buffer, length);
-#endif
- return written;
+ return Parent_Socket.write (buffer, length, get_timeout());
}
void
-Netxx::PipeStream::close (void)
+Netxx::SpawnedStream::close (void)
{
-
-#ifdef WIN32
- if (named_pipe != INVALID_HANDLE_VALUE)
- CloseHandle(named_pipe);
- named_pipe = INVALID_HANDLE_VALUE;
-
- if (overlap.hEvent != INVALID_HANDLE_VALUE)
- CloseHandle(overlap.hEvent);
- overlap.hEvent = INVALID_HANDLE_VALUE;
-
- if (child != INVALID_HANDLE_VALUE)
- WaitForSingleObject(child, INFINITE);
- child = INVALID_HANDLE_VALUE;
-#else
- if (readfd != -1)
- ::close(readfd);
- readfd = -1;
-
- if (writefd != -1)
- ::close(writefd);
- writefd = -1;
-
- if (child)
- while (waitpid(child,0,0) == -1 && errno == EINTR);
- child = 0;
-#endif
+ // We assume the child process has exited
+ Child_Socket.close();
+ Parent_Socket.close();
}
Netxx::socket_type
-Netxx::PipeStream::get_socketfd (void) const
+Netxx::SpawnedStream::get_socketfd (void) const
{
-#ifdef WIN32
- return (Netxx::socket_type) named_pipe;
-#else
- return Netxx::socket_type(-1);
-#endif
+ return Parent_Socket.get_socketfd ();
}
const Netxx::ProbeInfo*
-Netxx::PipeStream::get_probe_info (void) const
+Netxx::SpawnedStream::get_probe_info (void) const
{
- return 0;
+ return &probe_info;
}
-#ifdef WIN32
-
-static string
-status_name(DWORD wstatus)
-{
- switch (wstatus) {
- case WAIT_TIMEOUT: return "WAIT_TIMEOUT";
- case WAIT_OBJECT_0: return "WAIT_OBJECT_0";
- case WAIT_FAILED: return "WAIT_FAILED";
- case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1";
- default: return "UNKNOWN";
- }
-}
-
-Netxx::Probe::result_type
-Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt)
-{
- if (!is_pipe)
- return Probe::ready(timeout, rt);
-
- // L(F("mtn %d: checking for i/o ready state") % GetCurrentProcessId());
-
- if (rt == ready_none)
- rt = ready_t; // remembered from add
-
- if (rt & ready_write)
- {
- return make_pair(pipe->get_socketfd(), ready_write);
- }
-
- if (rt & ready_read)
- {
- if (pipe->bytes_available == 0)
- {
- // Issue an async request to fill our buffer.
- BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf,
- sizeof(pipe->readbuf), NULL, &pipe->overlap);
- E(ok || GetLastError() == ERROR_IO_PENDING,
- F("ReadFile call failed: %s") % err_msg());
- pipe->read_in_progress = true;
- }
-
- if (pipe->read_in_progress)
- {
- I(pipe->bytes_available == 0);
-
- // Attempt to wait for the completion of the read-in-progress.
-
- int milliseconds = ((timeout.get_sec() * 1000)
- + (timeout.get_usec() / 1000));
-
- L(FL("WaitForSingleObject(,%d)") % milliseconds);
-
- DWORD wstatus = WAIT_FAILED;
-
- if (pipe->child != INVALID_HANDLE_VALUE)
- {
-
- // We're a server; we're going to wait for the client to
- // exit as well as the pipe read status, because
- // apparently you don't find out about closed pipes
- // during an overlapped read request (?)
-
- HANDLE handles[2];
- handles[0] = pipe->overlap.hEvent;
- handles[1] = pipe->child;
-
- wstatus = WaitForMultipleObjects(2,
- handles,
- FALSE,
- milliseconds);
-
- E(wstatus != WAIT_FAILED,
- F("WaitForMultipleObjects call failed: %s") % err_msg());
-
- if (wstatus == WAIT_OBJECT_0 + 1)
- return make_pair(pipe->get_socketfd(), ready_oobd);
- }
- else
- {
- wstatus = WaitForSingleObject(pipe->overlap.hEvent,
- milliseconds);
- E(wstatus != WAIT_FAILED,
- F("WaitForSingleObject call failed: %s") % err_msg());
- }
-
- if (wstatus == WAIT_TIMEOUT)
- return make_pair(-1, ready_none);
-
- BOOL ok = GetOverlappedResult(pipe->named_pipe,
- &pipe->overlap,
- &pipe->bytes_available,
- FALSE);
-
- if (ok)
- {
- // We completed our read.
- pipe->read_in_progress = false;
- }
- else
- {
- // We did not complete our read.
- E(GetLastError() == ERROR_IO_INCOMPLETE,
- F("GetOverlappedResult call failed: %s")
- % err_msg());
- }
- }
-
- if (pipe->bytes_available != 0)
- {
- return make_pair(pipe->get_socketfd(), ready_read);
- }
- }
-
- return make_pair(pipe->get_socketfd(), ready_none);
-}
-
void
-Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
-{
- assert(!is_pipe);
- assert(!pipe);
- is_pipe = true;
- pipe = &ps;
- ready_t = rt;
-}
-
-void
-Netxx::PipeCompatibleProbe::add(StreamBase const &sb, ready_type rt)
-{
- // FIXME: This is *still* an unfortunate way of performing a
- // downcast, though slightly less awful than the old way, which
- // involved throwing an exception.
- //
- // Perhaps we should twiddle the caller-visible API.
-
- StreamBase const *sbp = &sb;
- PipeStream const *psp = dynamic_cast(sbp);
- if (psp)
- add(const_cast(*psp),rt);
- else
- {
- assert(!is_pipe);
- Probe::add(sb,rt);
- }
-}
-
-void
-Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
-{
- assert(!is_pipe);
- Probe::add(ss,rt);
-}
-#else // unix
-void
-Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
+Netxx::StdioProbe::add(const StdioStream &ps, ready_type rt)
{
if (rt == ready_none || rt & ready_read)
- add_socket(ps.get_readfd(), ready_read);
+ add_socket(ps.readfd, ready_read);
if (rt == ready_none || rt & ready_write)
- add_socket(ps.get_writefd(), ready_write);
+ add_socket(ps.writefd, ready_write);
}
void
-Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
+Netxx::StdioProbe::add(const StreamBase &sb, ready_type rt)
{
try
{
- add(const_cast(dynamic_cast(sb)),rt);
+ add(const_cast(dynamic_cast(sb)),rt);
}
catch (...)
{
@@ -533,41 +280,30 @@ Netxx::PipeCompatibleProbe::add(const St
}
}
-void
-Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
-{
- Probe::add(ss,rt);
-}
-#endif
-
#ifdef BUILD_UNIT_TESTS
#include "unit_tests.hh"
-UNIT_TEST(pipe, simple_pipe)
+UNIT_TEST(pipe, spawned)
{ try
{
- Netxx::PipeStream pipe("cat",vector());
+ Netxx::SpawnedStream spawned("cat",vector());
string result;
- Netxx::PipeCompatibleProbe probe;
+ Netxx::Probe probe;
Netxx::Timeout timeout(2L), short_time(0,1000);
// time out because no data is available
probe.clear();
- probe.add(pipe, Netxx::Probe::ready_read);
+ probe.add(spawned, Netxx::Probe::ready_read);
Netxx::Probe::result_type res = probe.ready(short_time);
I(res.second==Netxx::Probe::ready_none);
// write should be possible
probe.clear();
- probe.add(pipe, Netxx::Probe::ready_write);
+ probe.add(spawned, Netxx::Probe::ready_write);
res = probe.ready(short_time);
I(res.second & Netxx::Probe::ready_write);
-#ifdef WIN32
- I(res.first==pipe.get_socketfd());
-#else
- I(res.first==pipe.get_writefd());
-#endif
+ I(res.first==spawned.get_socketfd());
// try binary transparency
for (int c = 0; c < 256; ++c)
@@ -575,21 +311,18 @@ UNIT_TEST(pipe, simple_pipe)
char buf[1024];
buf[0] = c;
buf[1] = 255 - c;
- pipe.write(buf, 2);
+ spawned.write(buf, 2);
string result;
while (result.size() < 2)
{ // wait for data to arrive
probe.clear();
- probe.add(pipe, Netxx::Probe::ready_read);
+ probe.add(spawned, Netxx::Probe::ready_read);
res = probe.ready(timeout);
E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c);
-#ifdef WIN32
- I(res.first == pipe.get_socketfd());
-#else
- I(res.first == pipe.get_readfd());
-#endif
- int bytes = pipe.read(buf, sizeof(buf));
+ I(res.first == spawned.get_socketfd());
+
+ int bytes = spawned.read(buf, sizeof(buf));
result += string(buf, bytes);
}
I(result.size() == 2);
@@ -597,7 +330,7 @@ UNIT_TEST(pipe, simple_pipe)
I(static_cast(result[1]) == 255 - c);
}
- pipe.close();
+ spawned.close();
}
catch (informative_failure &e)
@@ -608,6 +341,9 @@ catch (informative_failure &e)
throw;
}
}
+
+// FIXME: test StdioStream
+
#endif
// Local Variables:
============================================================
--- netxx_pipe.hh 873bc7d111d59db057cba5ea5f9ce31ad7608e1f
+++ netxx_pipe.hh 41d912dba5cf1c70862cff97b8e20bb199de2281
@@ -1,6 +1,7 @@
#ifndef __NETXX_PIPE_HH__
#define __NETXX_PIPE_HH__
+// Copyright (C) 2007 Stephen Leake
// Copyright (C) 2005 Christof Petig
//
// This program is made available under the GNU GPL version 2.0 or
@@ -18,118 +19,98 @@
#endif
/*
- What is this all for?
+ Here we provide children of Netxx::StreamBase that work with stdin/stdout
+ in two different ways.
- If you want to transparently handle a pipe and a socket on unix and
- windows you have to abstract some difficulties:
+ The use cases are 'mtn sync file:...', 'mtn sync ssh:...', and 'mtn serve
+ --stdio'.
- - sockets have a single filedescriptor for reading and writing
- pipes usually come in pairs (one for reading and one for writing)
+ The netsync code uses StreamBase objects to perform all communications
+ between the local and server mtns.
- - process creation is different on unix and windows
+ In 'mtn sync file:...', the local mtn spawns the server directly as 'mtn
+ serve --stdio'. Thus the local mtn needs a StreamBase object that can
+ serve as the stdin/stdout of a spawned process; SpawnedStream. The server
+ mtn needs to construct a StreamBase object from the existing stdin/stdout;
+ StdioStream.
- => so Netxx::PipeStream is a Netxx::StreamBase which abstracts two pipes to
- and from an external command
+ In 'mtn sync ssh:...' the local mtn spawns ssh, connecting to it via
+ SpawnedStream. On the server, ssh spawns 'mtn serve stdio', which uses
+ StdioStream.
- - windows can select on a socket but not on a pipe
+ We also need StdioProbe objects that work with StdioStream objects, since
+ Netxx::Probe doesn't. Netxx does not provide for child classes of Probe.
+ We handle this by having the netsync code create the appropriate Probe
+ object whenever it creates a StreamBase object.
- => so Netxx::PipeCompatibleProbe is a Netxx::Probe like class which
- _can_ handle pipes on windows (emulating select is difficult at best!)
- (on unix Probe and PipeCompatibleProbe are nearly identical: with pipes
- you should not select for both read and write on the same descriptor)
+ We use socket pairs to implement these objects. On Unix and Win32, a
+ socket can serve as stdin and stdout for a spawned process.
+ The sockets in the pair must be connected to each other; socketpair() does
+ that nicely.
+
+ An earlier implementation (a single class named PipeStream) tried to use
+ Win32 overlapped IO via named pipes on Win32, and Unix select with Unix
+ pipes on unix, but we couldn't make it work, because the semantics of the
+ two implementations are too different. Now we always use socket select on
+ sockets.
*/
namespace Netxx
{
- class PipeCompatibleProbe;
- class StreamServer;
- class PipeStream : public StreamBase
+ class SpawnedStream : public StreamBase
{
+ Socket Parent_Socket;
+ Socket Child_Socket;
+ ProbeInfo probe_info;
#ifdef WIN32
- HANDLE named_pipe;
- HANDLE child;
- char readbuf[1024];
- DWORD bytes_available;
- bool read_in_progress;
- OVERLAPPED overlap;
- friend class PipeCompatibleProbe;
+ HANDLE child;
#else
- int readfd, writefd;
- int child;
+ pid_t child;
#endif
-
public:
- // do we need Timeout for symmetry with Stream?
- explicit PipeStream (int readfd, int writefd);
- explicit PipeStream (const std::string &cmd, const std::vector &args);
- virtual ~PipeStream() { close(); }
+ explicit SpawnedStream (const std::string &cmd, const std::vector &args);
+ // Spawn a child process to run 'cmd args', connect its stdout and
+ // stdin to this object.
+
+ virtual ~SpawnedStream() { close(); }
virtual signed_size_type read (void *buffer, size_type length);
virtual signed_size_type write (const void *buffer, size_type length);
virtual void close (void);
virtual socket_type get_socketfd (void) const;
virtual const ProbeInfo* get_probe_info (void) const;
- int get_readfd(void) const
- {
-#ifdef WIN32
- return -1;
-#else
- return readfd;
-#endif
- }
- int get_writefd(void) const
- {
-#ifdef WIN32
- return -1;
-#else
- return writefd;
-#endif
- }
};
-#ifdef WIN32
+ class StdioProbe;
- // This probe can either handle _one_ PipeStream or several network
- // Streams so if !is_pipe this acts like a Probe.
- class PipeCompatibleProbe : public Probe
+ class StdioStream : public StreamBase
{
- bool is_pipe;
- // only meaningful if is_pipe is true
- PipeStream *pipe;
- ready_type ready_t;
+ friend class StdioProbe;
+ int readfd;
+ int writefd;
+ ProbeInfo probe_info;
+
public:
- PipeCompatibleProbe() : is_pipe(), pipe(), ready_t()
- {}
- void clear()
- {
- if (is_pipe)
- {
- pipe=0;
- is_pipe=false;
- }
- else
- Probe::clear();
- }
- // This function does all the hard work (emulating a select).
- result_type ready(const Timeout &timeout=Timeout(), ready_type rt=ready_none);
- void add(PipeStream &ps, ready_type rt=ready_none);
- void add(const StreamBase &sb, ready_type rt=ready_none);
- void add(const StreamServer &ss, ready_type rt=ready_none);
+ explicit StdioStream (int readfd, int writefd);
+ // Construct a Stream object from existing files; typically stdout
+ // and stdin of the current process.
+
+ virtual ~StdioStream() { close(); }
+ virtual signed_size_type read (void *buffer, size_type length);
+ virtual signed_size_type write (const void *buffer, size_type length);
+ virtual void close (void);
+ virtual socket_type get_socketfd (void) const;
+ virtual const ProbeInfo* get_probe_info (void) const;
};
-#else
- // We only act specially if a PipeStream is added (directly or via
- // the StreamBase parent reference).
- struct PipeCompatibleProbe : Probe
+ struct StdioProbe : Probe
{
- void add(PipeStream &ps, ready_type rt=ready_none);
+ public:
+ void add(const StdioStream &ps, ready_type rt=ready_none);
void add(const StreamBase &sb, ready_type rt=ready_none);
- void add(const StreamServer &ss, ready_type rt=ready_none);
};
-#endif
-
}
// Local Variables:
@@ -141,4 +122,3 @@ namespace Netxx
// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:
#endif // __NETXX_PIPE_HH__
-
============================================================
--- platform.hh b5cb961e19daf9302d866c2f066d8338db7c79c3
+++ platform.hh aa3302a71c7da264137d9b154902763fc4c3f349
@@ -1,7 +1,7 @@
#ifndef __PLATFORM_HH__
#define __PLATFORM_HH__
-// Copyright (C) 2002 Graydon Hoare
+// Copyright (C) 2002, 2007 Graydon Hoare
//
// This program is made available under the GNU GPL version 2.0 or
// greater. See the accompanying file COPYING for details.
@@ -36,6 +36,9 @@ void make_io_binary();
// stop "\n"->"\r\n" from breaking automate on Windows
void make_io_binary();
+// returns 0 for success, non-zero for error
+extern "C" int dumb_socketpair(int socks[2], int make_overlapped);
+
#ifdef WIN32
std::string munge_argv_into_cmdline(const char* const argv[]);
#endif
@@ -80,7 +83,7 @@ public:
// this time in the future?" bit in the hashed information. This bit
// will change when we pass the future point, and trigger a re-check of
// the file's contents.
- //
+ //
// This is, of course, still not perfect. There is no way to make our stat
// atomic with the actual read of the file, so there's always a race condition
// there. Additionally, this handling means that checkout will never actually