#
#
# add_file "netxx_pipe.cc"
# content [257cde413eb1573139f1400d764f3c54651c6c24]
#
# add_file "netxx_pipe.hh"
# content [9b4e1e3543138f5e784814446299bce571ea5e18]
#
============================================================
--- netxx_pipe.cc 257cde413eb1573139f1400d764f3c54651c6c24
+++ netxx_pipe.cc 257cde413eb1573139f1400d764f3c54651c6c24
@@ -0,0 +1,609 @@
+// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil -*-
+// copyright (C) 2005 Christof Petig
+// all rights reserved.
+// licensed to the public under the terms of the GNU GPL (>= 2)
+// see the file COPYING for details
+
+#include
+#include "sanity.hh"
+#include "platform.hh"
+#include
+
+#ifdef WIN32
+#include
+#include
+#include
+#else
+#include
+#include
+#include
+#include
+#endif
+
+Netxx::PipeStream::PipeStream(int _readfd, int _writefd)
+ : child(INVALID_HANDLE_VALUE),
+ bytes_available(0)
+{
+#ifdef WIN32
+ if (_setmode(_readfd, _O_BINARY) == -1)
+ L(FL("failed to set input file descriptor to binary"));
+
+ if (_setmode(_writefd, _O_BINARY) == -1)
+ L(FL("failed to set output file descriptor to binary"));
+
+ named_pipe = (HANDLE)_get_osfhandle(_readfd);
+
+ E(named_pipe != INVALID_HANDLE_VALUE,
+ F("pipe handle is invalid"));
+
+ // Create infrastructure for overlapping I/O
+ memset(&overlap, 0, sizeof(overlap));
+ overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
+ bytes_available = 0;
+ I(overlap.hEvent != 0);
+#endif
+}
+
+
+#ifndef WIN32
+
+// Create pipes for stdio and fork subprocess, returns -1 on error, 0
+// to child and PID to parent.
+
+static pid_t
+pipe_and_fork(int fd1[2], int fd2[2])
+{
+ pid_t result = -1;
+ fd1[0] = -1;
+ fd1[1] = -1;
+ fd2[0] = -1;
+ fd2[1] = -1;
+
+ if (pipe(fd1))
+ return -1;
+
+ if (pipe(fd2))
+ {
+ close(fd1[0]);
+ close(fd1[1]);
+ return -1;
+ }
+
+ result = fork();
+
+ if (result < 0)
+ {
+ close(fd1[0]);
+ close(fd1[1]);
+ close(fd2[0]);
+ close(fd2[1]);
+ return -1;
+ }
+
+ else if (!result)
+ {
+ // fd1[1] for writing, fd2[0] for reading
+ close(fd1[0]);
+ close(fd2[1]);
+ if (dup2(fd2[0], 0) != 0 ||
+ dup2(fd1[1], 1) != 1)
+ {
+ perror("dup2");
+ exit(-1); // kill the useless child
+ }
+ close(fd1[1]);
+ close(fd2[0]);
+ }
+
+ else
+ {
+ // fd1[0] for reading, fd2[1] for writing
+ close(fd1[1]);
+ close(fd2[0]);
+ }
+
+ return result;
+}
+#endif
+
+static std::string
+err_msg()
+{
+ char buf[1024];
+ I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
+ NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0);
+ return std::string(buf);
+}
+
+Netxx::PipeStream::PipeStream (const std::string & cmd,
+ const std::vector & args)
+ : child(INVALID_HANDLE_VALUE),
+ bytes_available(0)
+#ifdef WIN32
+ ,read_in_progress(false)
+#endif
+{
+ // Unfortunately neither munge_argv_into_cmdline nor execvp do take
+ // a vector as argument.
+
+ const unsigned newsize = 64;
+ const char *newargv[newsize];
+ I(args.size() < (sizeof(newargv) / sizeof(newargv[0])));
+
+ unsigned newargc = 0;
+ newargv[newargc++]=cmd.c_str();
+ for (std::vector::const_iterator i = args.begin();
+ i != args.end(); ++i)
+ newargv[newargc++] = i->c_str();
+ newargv[newargc] = 0;
+
+#ifdef WIN32
+
+ // In order to use nonblocking i/o on windows, you must use named
+ // pipes and overlapped i/o. There is no other way, alas.
+
+ static unsigned long serial = 0;
+ std::string pipename = (F("\\\\.\\pipe\\netxx_pipe_%ld_%d")
+ % GetCurrentProcessId()
+ % (++serial)).str();
+
+ // Create the parent's handle to the named pipe.
+
+ // P(F("mtn %d: creating pipe %s\n")
+ // % GetCurrentProcessId() % pipename);
+
+ named_pipe = CreateNamedPipe(pipename.c_str(),
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_WAIT,
+ 1,
+ sizeof(readbuf),
+ sizeof(readbuf),
+ 1000,
+ 0);
+
+ // P(F("mtn %d: created pipe %s\n")
+ // % GetCurrentProcessId() % pipename);
+
+ E(named_pipe != INVALID_HANDLE_VALUE,
+ F("CreateNamedPipe(%s,...) call failed: %s")
+ % pipename % err_msg());
+
+ // Open the child's handle to the named pipe.
+
+ SECURITY_ATTRIBUTES inherit;
+ memset(&inherit,0,sizeof inherit);
+ inherit.nLength=sizeof inherit;
+ inherit.bInheritHandle = TRUE;
+
+ HANDLE hpipe = CreateFile(pipename.c_str(),
+ GENERIC_READ|GENERIC_WRITE, 0,
+ &inherit,
+ OPEN_EXISTING,
+ FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);
+
+ E(hpipe != INVALID_HANDLE_VALUE,
+ F("CreateFile(%s,...) call failed: %s")
+ % pipename % err_msg());
+
+ // P(F("mtn %d: creating file handle on named pipe %s\n")
+ // % GetCurrentProcessId() % pipename);
+
+ // Set up the child with the pipes as stdin/stdout and inheriting stderr.
+
+ PROCESS_INFORMATION piProcInfo;
+ STARTUPINFO siStartInfo;
+
+ memset(&piProcInfo, 0, sizeof(piProcInfo));
+ memset(&siStartInfo, 0, sizeof(siStartInfo));
+
+ siStartInfo.cb = sizeof(siStartInfo);
+ siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2));
+ siStartInfo.hStdOutput = hpipe;
+ siStartInfo.hStdInput = hpipe;
+ siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
+
+ std::string cmdline = munge_argv_into_cmdline(newargv);
+ L(FL("Subprocess command line: '%s'\n") % cmdline);
+
+ // P(F("mtn %d: subprocess cmd line: '%s'\n")
+ // % GetCurrentProcessId() % cmdline);
+
+ BOOL started = CreateProcess(NULL, // Application name
+ const_cast(cmdline.c_str()),
+ NULL, // Process attributes
+ NULL, // Thread attributes
+ TRUE, // Inherit handles
+ 0, // Creation flags
+ NULL, // Environment
+ NULL, // Current directory
+ &siStartInfo,
+ &piProcInfo);
+ E(started,
+ F("CreateProcess(%s,...) call failed: %s")
+ % cmdline % err_msg());
+
+ // P(F("mtn %d: started subprocess: '%s'\n")
+ // % GetCurrentProcessId() % cmdline);
+
+ child = piProcInfo.hProcess;
+
+ // P(F("mtn %d: opened handles\n")
+ // % GetCurrentProcessId());
+
+ // create infrastructure for overlapping I/O
+ memset(&overlap, 0, sizeof(overlap));
+ overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0);
+ bytes_available = 0;
+ I(overlap.hEvent != 0);
+
+ // P(F("mtn %d: created overlap event\n")
+ // % GetCurrentProcessId());
+
+#else // !WIN32
+
+ int fd1[2], fd2[2];
+ child = pipe_and_fork(fd1, fd2);
+ E(child >= 0, F("pipe/fork failed %s") % strerror(errno));
+ if (!child)
+ {
+ execvp(newargv[0], const_cast(newargv));
+ perror(newargv[0]);
+ exit(errno);
+ }
+ readfd = fd1[0];
+ writefd = fd2[1];
+ fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK);
+#endif
+
+ // P(F("mtn %d: set up i/o channels\n")
+ // % GetCurrentProcessId());
+}
+
+// Non blocking read.
+
+Netxx::signed_size_type
+Netxx::PipeStream::read (void *buffer, size_type length)
+{
+#ifdef WIN32
+ // P(F("mtn %d: attempting read of length %d\n")
+ // % GetCurrentProcessId() % length);
+
+ if (length > bytes_available)
+ length = bytes_available;
+
+ // P(F("mtn %d: reading %d bytes of %d available\n")
+ // % GetCurrentProcessId() % length % bytes_available);
+
+ if (length)
+ {
+ memcpy(buffer, readbuf, length);
+ if (length < bytes_available)
+ memmove(readbuf, readbuf+length, bytes_available-length);
+ bytes_available -= length;
+ }
+
+ // P(F("mtn %d: read complete, %d bytes\n")
+ // % GetCurrentProcessId() % length);
+ return length;
+#else
+ return ::read(readfd, buffer, length);
+#endif
+}
+
+Netxx::signed_size_type
+Netxx::PipeStream::write(const void *buffer, size_type length)
+{
+ // P(F("mtn %d: writing, %d bytes\n")
+ // % GetCurrentProcessId() % length);
+#ifdef WIN32
+ DWORD written = 0;
+ BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL);
+ E(ok, F("WriteFile call failed: %s") % err_msg());
+#else
+ size_t written = ::write(writefd, buffer, length);
+#endif
+ // P(F("mtn %d: write complete, %d bytes\n")
+ // % GetCurrentProcessId() % length);
+ return written;
+}
+
+void
+Netxx::PipeStream::close (void)
+{
+
+#ifdef WIN32
+ // P(F("mtn %d: closing named pipe\n")
+ // % GetCurrentProcessId());
+
+ if (named_pipe != INVALID_HANDLE_VALUE)
+ CloseHandle(named_pipe);
+ named_pipe = INVALID_HANDLE_VALUE;
+
+ if (overlap.hEvent != INVALID_HANDLE_VALUE)
+ CloseHandle(overlap.hEvent);
+ overlap.hEvent = INVALID_HANDLE_VALUE;
+
+ if (child != INVALID_HANDLE_VALUE)
+ WaitForSingleObject(child, INFINITE);
+ child = INVALID_HANDLE_VALUE;
+#else
+ if (readfd != -1)
+ ::close(readfd);
+ readfd = -1;
+
+ if (writefd != -1)
+ ::close(writefd);
+ writefd = -1;
+
+ if (child)
+ waitpid(child,0,0);
+ child = 0;
+#endif
+}
+
+Netxx::socket_type
+Netxx::PipeStream::get_socketfd (void) const
+{
+#ifdef WIN32
+ return (Netxx::socket_type) named_pipe;
+#else
+ return Netxx::socket_type(-1);
+#endif
+}
+
+const Netxx::ProbeInfo*
+Netxx::PipeStream::get_probe_info (void) const
+{
+ return 0;
+}
+
+#ifdef WIN32
+
+// to emulate the semantics of the select call we wait up to timeout for the
+// first byte and ask for more bytes with no timeout
+// perhaps there is a more efficient/less complicated way (tell me if you know)
+
+static std::string
+status_name(DWORD wstatus)
+{
+ switch (wstatus) {
+ case WAIT_TIMEOUT: return "WAIT_TIMEOUT";
+ case WAIT_OBJECT_0: return "WAIT_OBJECT_0";
+ case WAIT_FAILED: return "WAIT_FAILED";
+ case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1";
+ default: return "UNKNOWN";
+ }
+}
+
+Netxx::Probe::result_type
+Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt)
+{
+ if (!is_pipe)
+ return Probe::ready(timeout, rt);
+
+ // L(F("mtn %d: checking for i/o ready state\n") % GetCurrentProcessId());
+
+ if (rt == ready_none)
+ rt = ready_t; // remembered from add
+
+ if (rt & ready_write)
+ {
+ // P(F("mtn %d: write is possible, returning\n")
+ // % GetCurrentProcessId());
+ return std::make_pair(pipe->get_socketfd(), ready_write);
+ }
+
+ if (rt & ready_read)
+ {
+ if (pipe->bytes_available == 0)
+ {
+ // Issue an async request to fill our buffer.
+ // P(F("mtn %d: issuing readfile\n") % GetCurrentProcessId());
+ BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf,
+ sizeof(pipe->readbuf), NULL, &pipe->overlap);
+ E(ok || GetLastError() == ERROR_IO_PENDING,
+ F("ReadFile call failed: %s") % err_msg());
+ pipe->read_in_progress = true;
+ }
+
+ if (pipe->read_in_progress)
+ {
+ I(pipe->bytes_available == 0);
+ // Attempt to wait for the completion of the read-in-progress.
+ int milliseconds = ((timeout.get_sec() * 1000)
+ + (timeout.get_usec() / 1000));
+ L(FL("WaitForSingleObject(,%d)\n") % milliseconds);
+
+ DWORD wstatus = WAIT_FAILED;
+
+ if (pipe->child != INVALID_HANDLE_VALUE)
+ {
+ // We're a server; we're going to wait for the client to exit as well
+ // as the pipe read status, because apparently you don't find out about
+ // closed pipes during an overlapped read request (?)
+ HANDLE handles[2];
+ handles[0] = pipe->overlap.hEvent;
+ handles[1] = pipe->child;
+ // P( F("mtn %d: waiting %d milliseconds for multiple objects\n")
+ // % GetCurrentProcessId() % milliseconds);
+ wstatus = WaitForMultipleObjects(2, handles, FALSE, milliseconds);
+ // P(F("mtn %d: wait finished with %s\n")
+ // % GetCurrentProcessId() % status_name(wstatus));
+ E(wstatus != WAIT_FAILED,
+ F("WaitForMultipleObjects call failed: %s") % err_msg());
+ if (wstatus == WAIT_OBJECT_0 + 1)
+ return std::make_pair(pipe->get_socketfd(), ready_oobd);
+ }
+ else
+ {
+ // P(F("mtn %d: waiting %d milliseconds for single object\n")
+ // % GetCurrentProcessId() % milliseconds);
+ wstatus = WaitForSingleObject(pipe->overlap.hEvent, milliseconds);
+ // P(F("mtn %d: wait finished with %s\n")
+ // % GetCurrentProcessId() % status_name(wstatus));
+ E(wstatus != WAIT_FAILED,
+ F("WaitForSingleObject call failed: %s") % err_msg());
+ }
+
+ if (wstatus == WAIT_TIMEOUT)
+ return std::make_pair(-1, ready_none);
+
+ // P(F("mtn %d: getting overlapped result\n") % GetCurrentProcessId());
+ BOOL ok = GetOverlappedResult(pipe->named_pipe, &pipe->overlap,
+ &pipe->bytes_available, FALSE);
+ // P(F("mtn %d: overlapped result: %s\n") % GetCurrentProcessId() % ok);
+ if (ok)
+ {
+ // We completed our read.
+ pipe->read_in_progress = false;
+ }
+ else
+ {
+ // We did not complete our read.
+ E(GetLastError() == ERROR_IO_INCOMPLETE,
+ F("GetOverlappedResult call failed: %s") % err_msg());
+ }
+ }
+
+ if (pipe->bytes_available != 0)
+ {
+ // P(F("mtn %d: %d bytes are available, returning\n")
+ // % GetCurrentProcessId() % pipe->bytes_available);
+ return std::make_pair(pipe->get_socketfd(), ready_read);
+ }
+ }
+
+ // P(F("mtn %d: no i/o ready\n") % GetCurrentProcessId());
+ return std::make_pair(pipe->get_socketfd(), ready_none);
+}
+
+void
+Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
+ {
+ assert(!is_pipe);
+ assert(!pipe);
+ is_pipe = true;
+ pipe = &ps;
+ ready_t = rt;
+ }
+
+void
+Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
+ {
+ try
+ {
+ add(const_cast(dynamic_cast(sb)),rt);
+ }
+ catch (...)
+ {
+ assert(!is_pipe);
+ Probe::add(sb,rt);
+ }
+ }
+
+void
+Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
+ {
+ assert(!ip_pipe);
+ Probe::add(ss,rt);
+ }
+#else // unix
+void
+Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
+ {
+ if (rt == ready_none || rt & ready_read)
+ add_socket(ps.get_readfd(), ready_read);
+ if (rt == ready_none || rt & ready_write)
+ add_socket(ps.get_writefd(), ready_write);
+ }
+
+void
+Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
+ {
+ try
+ {
+ add(const_cast(dynamic_cast(sb)),rt);
+ }
+ catch (...)
+ {
+ Probe::add(sb,rt);
+ }
+ }
+
+void
+Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
+ {
+ Probe::add(ss,rt);
+ }
+#endif
+
+#ifdef BUILD_UNIT_TESTS
+#include "unit_tests.hh"
+
+static void
+simple_pipe_test()
+{ try
+ {
+ Netxx::PipeStream pipe("cat",std::vector());
+
+ std::string result;
+ Netxx::PipeCompatibleProbe probe;
+ Netxx::Timeout timeout(2L), short_time(0,1000);
+
+ // time out because no data is available
+ probe.clear();
+ probe.add(pipe, Netxx::Probe::ready_read);
+ Netxx::Probe::result_type res = probe.ready(short_time);
+ I(res.second==Netxx::Probe::ready_none);
+
+ // write should be possible
+ probe.clear();
+ probe.add(pipe, Netxx::Probe::ready_write);
+ res = probe.ready(short_time);
+ I(res.second&Netxx::Probe::ready_write);
+ I(res.first==pipe.get_writefd());
+
+ // try binary transparency
+ for (int c = 0; c < 256; ++c)
+ {
+ char buf[1024];
+ buf[0] = c;
+ buf[1] = 255 - c;
+ pipe.write(buf, 2);
+
+ std::string result;
+ while (result.size() < 2)
+ { // wait for data to arrive
+ probe.clear();
+ probe.add(pipe, Netxx::Probe::ready_read);
+ res = probe.ready(timeout);
+ E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c);
+ I(res.first == pipe.get_readfd());
+ int bytes = pipe.read(buf, sizeof(buf));
+ result += std::string(buf, bytes);
+ }
+ I(result.size() == 2);
+ I(static_cast(result[0]) == c);
+ I(static_cast(result[1]) == 255 - c);
+ }
+
+ pipe.close();
+
+ }
+catch (informative_failure &e)
+ // for some reason boost does not provide
+ // enough information
+ {
+ W(F("Failure %s\n") % e.what);
+ throw;
+ }
+}
+
+void
+add_pipe_tests(test_suite * suite)
+{
+ I(suite);
+ suite->add(BOOST_TEST_CASE(&simple_pipe_test));
+}
+#endif
============================================================
--- netxx_pipe.hh 9b4e1e3543138f5e784814446299bce571ea5e18
+++ netxx_pipe.hh 9b4e1e3543138f5e784814446299bce571ea5e18
@@ -0,0 +1,129 @@
+// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil -*-
+// copyright (C) 2005 Christof Petig
+// all rights reserved.
+// licensed to the public under the terms of the GNU GPL (>= 2)
+// see the file COPYING for details
+
+#include
+#include
+#include
+#include
+#ifdef WIN32
+# include
+#endif
+
+/*
+ What is this all for?
+
+ If you want to transparently handle a pipe and a socket on unix and
+ windows you have to abstract some difficulties:
+
+ - sockets have a single filedescriptor for reading and writing
+ pipes usually come in pairs (one for reading and one for writing)
+
+ - process creation is different on unix and windows
+
+ => so Netxx::PipeStream is a Netxx::StreamBase which abstracts two pipes to
+ and from an external command
+
+ - windows can select on a socket but not on a pipe
+
+ => so Netxx::PipeCompatibleProbe is a Netxx::Probe like class which
+ _can_ handle pipes on windows (emulating select is difficult at best!)
+ (on unix Probe and PipeCompatibleProbe are nearly identical: with pipes
+ you should not select for both read and write on the same descriptor)
+
+*/
+
+namespace Netxx
+ {
+ class PipeCompatibleProbe;
+ class StreamServer;
+
+ class PipeStream : public StreamBase
+ {
+#ifdef WIN32
+ HANDLE named_pipe;
+ HANDLE child;
+ char readbuf[1024];
+ DWORD bytes_available;
+ bool read_in_progress;
+ OVERLAPPED overlap;
+ friend class PipeCompatibleProbe;
+#else
+ int readfd, writefd;
+ int child;
+#endif
+
+
+ public:
+ // do we need Timeout for symmetry with Stream?
+ explicit PipeStream (int readfd, int writefd);
+ explicit PipeStream (const std::string &cmd, const std::vector &args);
+ virtual ~PipeStream() { close(); }
+ virtual signed_size_type read (void *buffer, size_type length);
+ virtual signed_size_type write (const void *buffer, size_type length);
+ virtual void close (void);
+ virtual socket_type get_socketfd (void) const;
+ virtual const ProbeInfo* get_probe_info (void) const;
+ int get_readfd(void) const
+ {
+#ifdef WIN32
+ return -1;
+#else
+ return readfd;
+#endif
+ }
+ int get_writefd(void) const
+ {
+#ifdef WIN32
+ return -1;
+#else
+ return writefd;
+#endif
+ }
+ };
+
+#ifdef WIN32
+
+ // This probe can either handle _one_ PipeStream or several network
+ // Streams so if !is_pipe this acts like a Probe.
+ class PipeCompatibleProbe : public Probe
+ {
+ bool is_pipe;
+ // only meaningful if is_pipe is true
+ PipeStream *pipe;
+ ready_type ready_t;
+ public:
+ PipeCompatibleProbe() : is_pipe(), pipe(), ready_t()
+ {}
+ void clear()
+ {
+ if (is_pipe)
+ {
+ pipe=0;
+ is_pipe=false;
+ }
+ else
+ Probe::clear();
+ }
+ // This function does all the hard work (emulating a select).
+ result_type ready(const Timeout &timeout=Timeout(), ready_type rt=ready_none);
+ void add(PipeStream &ps, ready_type rt=ready_none);
+ void add(const StreamBase &sb, ready_type rt=ready_none);
+ void add(const StreamServer &ss, ready_type rt=ready_none);
+ void remove(const PipeStream &ps);
+ };
+#else
+
+ // We only act specially if a PipeStream is added (directly or via
+ // the StreamBase parent reference).
+ struct PipeCompatibleProbe : Probe
+ {
+ void add(PipeStream &ps, ready_type rt=ready_none);
+ void add(const StreamBase &sb, ready_type rt=ready_none);
+ void add(const StreamServer &ss, ready_type rt=ready_none);
+ };
+#endif
+
+}