# # # add_file "netxx_pipe.cc" # content [257cde413eb1573139f1400d764f3c54651c6c24] # # add_file "netxx_pipe.hh" # content [9b4e1e3543138f5e784814446299bce571ea5e18] # ============================================================ --- netxx_pipe.cc 257cde413eb1573139f1400d764f3c54651c6c24 +++ netxx_pipe.cc 257cde413eb1573139f1400d764f3c54651c6c24 @@ -0,0 +1,609 @@ +// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil -*- +// copyright (C) 2005 Christof Petig +// all rights reserved. +// licensed to the public under the terms of the GNU GPL (>= 2) +// see the file COPYING for details + +#include +#include "sanity.hh" +#include "platform.hh" +#include + +#ifdef WIN32 +#include +#include +#include +#else +#include +#include +#include +#include +#endif + +Netxx::PipeStream::PipeStream(int _readfd, int _writefd) + : child(INVALID_HANDLE_VALUE), + bytes_available(0) +{ +#ifdef WIN32 + if (_setmode(_readfd, _O_BINARY) == -1) + L(FL("failed to set input file descriptor to binary")); + + if (_setmode(_writefd, _O_BINARY) == -1) + L(FL("failed to set output file descriptor to binary")); + + named_pipe = (HANDLE)_get_osfhandle(_readfd); + + E(named_pipe != INVALID_HANDLE_VALUE, + F("pipe handle is invalid")); + + // Create infrastructure for overlapping I/O + memset(&overlap, 0, sizeof(overlap)); + overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0); + bytes_available = 0; + I(overlap.hEvent != 0); +#endif +} + + +#ifndef WIN32 + +// Create pipes for stdio and fork subprocess, returns -1 on error, 0 +// to child and PID to parent. + +static pid_t +pipe_and_fork(int fd1[2], int fd2[2]) +{ + pid_t result = -1; + fd1[0] = -1; + fd1[1] = -1; + fd2[0] = -1; + fd2[1] = -1; + + if (pipe(fd1)) + return -1; + + if (pipe(fd2)) + { + close(fd1[0]); + close(fd1[1]); + return -1; + } + + result = fork(); + + if (result < 0) + { + close(fd1[0]); + close(fd1[1]); + close(fd2[0]); + close(fd2[1]); + return -1; + } + + else if (!result) + { + // fd1[1] for writing, fd2[0] for reading + close(fd1[0]); + close(fd2[1]); + if (dup2(fd2[0], 0) != 0 || + dup2(fd1[1], 1) != 1) + { + perror("dup2"); + exit(-1); // kill the useless child + } + close(fd1[1]); + close(fd2[0]); + } + + else + { + // fd1[0] for reading, fd2[1] for writing + close(fd1[1]); + close(fd2[0]); + } + + return result; +} +#endif + +static std::string +err_msg() +{ + char buf[1024]; + I(FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, + NULL, GetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + (LPSTR) &buf, sizeof(buf) / sizeof(TCHAR), NULL) != 0); + return std::string(buf); +} + +Netxx::PipeStream::PipeStream (const std::string & cmd, + const std::vector & args) + : child(INVALID_HANDLE_VALUE), + bytes_available(0) +#ifdef WIN32 + ,read_in_progress(false) +#endif +{ + // Unfortunately neither munge_argv_into_cmdline nor execvp do take + // a vector as argument. + + const unsigned newsize = 64; + const char *newargv[newsize]; + I(args.size() < (sizeof(newargv) / sizeof(newargv[0]))); + + unsigned newargc = 0; + newargv[newargc++]=cmd.c_str(); + for (std::vector::const_iterator i = args.begin(); + i != args.end(); ++i) + newargv[newargc++] = i->c_str(); + newargv[newargc] = 0; + +#ifdef WIN32 + + // In order to use nonblocking i/o on windows, you must use named + // pipes and overlapped i/o. There is no other way, alas. + + static unsigned long serial = 0; + std::string pipename = (F("\\\\.\\pipe\\netxx_pipe_%ld_%d") + % GetCurrentProcessId() + % (++serial)).str(); + + // Create the parent's handle to the named pipe. + + // P(F("mtn %d: creating pipe %s\n") + // % GetCurrentProcessId() % pipename); + + named_pipe = CreateNamedPipe(pipename.c_str(), + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_WAIT, + 1, + sizeof(readbuf), + sizeof(readbuf), + 1000, + 0); + + // P(F("mtn %d: created pipe %s\n") + // % GetCurrentProcessId() % pipename); + + E(named_pipe != INVALID_HANDLE_VALUE, + F("CreateNamedPipe(%s,...) call failed: %s") + % pipename % err_msg()); + + // Open the child's handle to the named pipe. + + SECURITY_ATTRIBUTES inherit; + memset(&inherit,0,sizeof inherit); + inherit.nLength=sizeof inherit; + inherit.bInheritHandle = TRUE; + + HANDLE hpipe = CreateFile(pipename.c_str(), + GENERIC_READ|GENERIC_WRITE, 0, + &inherit, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0); + + E(hpipe != INVALID_HANDLE_VALUE, + F("CreateFile(%s,...) call failed: %s") + % pipename % err_msg()); + + // P(F("mtn %d: creating file handle on named pipe %s\n") + // % GetCurrentProcessId() % pipename); + + // Set up the child with the pipes as stdin/stdout and inheriting stderr. + + PROCESS_INFORMATION piProcInfo; + STARTUPINFO siStartInfo; + + memset(&piProcInfo, 0, sizeof(piProcInfo)); + memset(&siStartInfo, 0, sizeof(siStartInfo)); + + siStartInfo.cb = sizeof(siStartInfo); + siStartInfo.hStdError = (HANDLE)(_get_osfhandle(2)); + siStartInfo.hStdOutput = hpipe; + siStartInfo.hStdInput = hpipe; + siStartInfo.dwFlags |= STARTF_USESTDHANDLES; + + std::string cmdline = munge_argv_into_cmdline(newargv); + L(FL("Subprocess command line: '%s'\n") % cmdline); + + // P(F("mtn %d: subprocess cmd line: '%s'\n") + // % GetCurrentProcessId() % cmdline); + + BOOL started = CreateProcess(NULL, // Application name + const_cast(cmdline.c_str()), + NULL, // Process attributes + NULL, // Thread attributes + TRUE, // Inherit handles + 0, // Creation flags + NULL, // Environment + NULL, // Current directory + &siStartInfo, + &piProcInfo); + E(started, + F("CreateProcess(%s,...) call failed: %s") + % cmdline % err_msg()); + + // P(F("mtn %d: started subprocess: '%s'\n") + // % GetCurrentProcessId() % cmdline); + + child = piProcInfo.hProcess; + + // P(F("mtn %d: opened handles\n") + // % GetCurrentProcessId()); + + // create infrastructure for overlapping I/O + memset(&overlap, 0, sizeof(overlap)); + overlap.hEvent = CreateEvent(0, TRUE, TRUE, 0); + bytes_available = 0; + I(overlap.hEvent != 0); + + // P(F("mtn %d: created overlap event\n") + // % GetCurrentProcessId()); + +#else // !WIN32 + + int fd1[2], fd2[2]; + child = pipe_and_fork(fd1, fd2); + E(child >= 0, F("pipe/fork failed %s") % strerror(errno)); + if (!child) + { + execvp(newargv[0], const_cast(newargv)); + perror(newargv[0]); + exit(errno); + } + readfd = fd1[0]; + writefd = fd2[1]; + fcntl(readfd, F_SETFL, fcntl(readfd, F_GETFL) | O_NONBLOCK); +#endif + + // P(F("mtn %d: set up i/o channels\n") + // % GetCurrentProcessId()); +} + +// Non blocking read. + +Netxx::signed_size_type +Netxx::PipeStream::read (void *buffer, size_type length) +{ +#ifdef WIN32 + // P(F("mtn %d: attempting read of length %d\n") + // % GetCurrentProcessId() % length); + + if (length > bytes_available) + length = bytes_available; + + // P(F("mtn %d: reading %d bytes of %d available\n") + // % GetCurrentProcessId() % length % bytes_available); + + if (length) + { + memcpy(buffer, readbuf, length); + if (length < bytes_available) + memmove(readbuf, readbuf+length, bytes_available-length); + bytes_available -= length; + } + + // P(F("mtn %d: read complete, %d bytes\n") + // % GetCurrentProcessId() % length); + return length; +#else + return ::read(readfd, buffer, length); +#endif +} + +Netxx::signed_size_type +Netxx::PipeStream::write(const void *buffer, size_type length) +{ + // P(F("mtn %d: writing, %d bytes\n") + // % GetCurrentProcessId() % length); +#ifdef WIN32 + DWORD written = 0; + BOOL ok = WriteFile(named_pipe, buffer, length, &written, NULL); + E(ok, F("WriteFile call failed: %s") % err_msg()); +#else + size_t written = ::write(writefd, buffer, length); +#endif + // P(F("mtn %d: write complete, %d bytes\n") + // % GetCurrentProcessId() % length); + return written; +} + +void +Netxx::PipeStream::close (void) +{ + +#ifdef WIN32 + // P(F("mtn %d: closing named pipe\n") + // % GetCurrentProcessId()); + + if (named_pipe != INVALID_HANDLE_VALUE) + CloseHandle(named_pipe); + named_pipe = INVALID_HANDLE_VALUE; + + if (overlap.hEvent != INVALID_HANDLE_VALUE) + CloseHandle(overlap.hEvent); + overlap.hEvent = INVALID_HANDLE_VALUE; + + if (child != INVALID_HANDLE_VALUE) + WaitForSingleObject(child, INFINITE); + child = INVALID_HANDLE_VALUE; +#else + if (readfd != -1) + ::close(readfd); + readfd = -1; + + if (writefd != -1) + ::close(writefd); + writefd = -1; + + if (child) + waitpid(child,0,0); + child = 0; +#endif +} + +Netxx::socket_type +Netxx::PipeStream::get_socketfd (void) const +{ +#ifdef WIN32 + return (Netxx::socket_type) named_pipe; +#else + return Netxx::socket_type(-1); +#endif +} + +const Netxx::ProbeInfo* +Netxx::PipeStream::get_probe_info (void) const +{ + return 0; +} + +#ifdef WIN32 + +// to emulate the semantics of the select call we wait up to timeout for the +// first byte and ask for more bytes with no timeout +// perhaps there is a more efficient/less complicated way (tell me if you know) + +static std::string +status_name(DWORD wstatus) +{ + switch (wstatus) { + case WAIT_TIMEOUT: return "WAIT_TIMEOUT"; + case WAIT_OBJECT_0: return "WAIT_OBJECT_0"; + case WAIT_FAILED: return "WAIT_FAILED"; + case WAIT_OBJECT_0+1: return "WAIT_OBJECT_0+1"; + default: return "UNKNOWN"; + } +} + +Netxx::Probe::result_type +Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt) +{ + if (!is_pipe) + return Probe::ready(timeout, rt); + + // L(F("mtn %d: checking for i/o ready state\n") % GetCurrentProcessId()); + + if (rt == ready_none) + rt = ready_t; // remembered from add + + if (rt & ready_write) + { + // P(F("mtn %d: write is possible, returning\n") + // % GetCurrentProcessId()); + return std::make_pair(pipe->get_socketfd(), ready_write); + } + + if (rt & ready_read) + { + if (pipe->bytes_available == 0) + { + // Issue an async request to fill our buffer. + // P(F("mtn %d: issuing readfile\n") % GetCurrentProcessId()); + BOOL ok = ReadFile(pipe->named_pipe, pipe->readbuf, + sizeof(pipe->readbuf), NULL, &pipe->overlap); + E(ok || GetLastError() == ERROR_IO_PENDING, + F("ReadFile call failed: %s") % err_msg()); + pipe->read_in_progress = true; + } + + if (pipe->read_in_progress) + { + I(pipe->bytes_available == 0); + // Attempt to wait for the completion of the read-in-progress. + int milliseconds = ((timeout.get_sec() * 1000) + + (timeout.get_usec() / 1000)); + L(FL("WaitForSingleObject(,%d)\n") % milliseconds); + + DWORD wstatus = WAIT_FAILED; + + if (pipe->child != INVALID_HANDLE_VALUE) + { + // We're a server; we're going to wait for the client to exit as well + // as the pipe read status, because apparently you don't find out about + // closed pipes during an overlapped read request (?) + HANDLE handles[2]; + handles[0] = pipe->overlap.hEvent; + handles[1] = pipe->child; + // P( F("mtn %d: waiting %d milliseconds for multiple objects\n") + // % GetCurrentProcessId() % milliseconds); + wstatus = WaitForMultipleObjects(2, handles, FALSE, milliseconds); + // P(F("mtn %d: wait finished with %s\n") + // % GetCurrentProcessId() % status_name(wstatus)); + E(wstatus != WAIT_FAILED, + F("WaitForMultipleObjects call failed: %s") % err_msg()); + if (wstatus == WAIT_OBJECT_0 + 1) + return std::make_pair(pipe->get_socketfd(), ready_oobd); + } + else + { + // P(F("mtn %d: waiting %d milliseconds for single object\n") + // % GetCurrentProcessId() % milliseconds); + wstatus = WaitForSingleObject(pipe->overlap.hEvent, milliseconds); + // P(F("mtn %d: wait finished with %s\n") + // % GetCurrentProcessId() % status_name(wstatus)); + E(wstatus != WAIT_FAILED, + F("WaitForSingleObject call failed: %s") % err_msg()); + } + + if (wstatus == WAIT_TIMEOUT) + return std::make_pair(-1, ready_none); + + // P(F("mtn %d: getting overlapped result\n") % GetCurrentProcessId()); + BOOL ok = GetOverlappedResult(pipe->named_pipe, &pipe->overlap, + &pipe->bytes_available, FALSE); + // P(F("mtn %d: overlapped result: %s\n") % GetCurrentProcessId() % ok); + if (ok) + { + // We completed our read. + pipe->read_in_progress = false; + } + else + { + // We did not complete our read. + E(GetLastError() == ERROR_IO_INCOMPLETE, + F("GetOverlappedResult call failed: %s") % err_msg()); + } + } + + if (pipe->bytes_available != 0) + { + // P(F("mtn %d: %d bytes are available, returning\n") + // % GetCurrentProcessId() % pipe->bytes_available); + return std::make_pair(pipe->get_socketfd(), ready_read); + } + } + + // P(F("mtn %d: no i/o ready\n") % GetCurrentProcessId()); + return std::make_pair(pipe->get_socketfd(), ready_none); +} + +void +Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt) + { + assert(!is_pipe); + assert(!pipe); + is_pipe = true; + pipe = &ps; + ready_t = rt; + } + +void +Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt) + { + try + { + add(const_cast(dynamic_cast(sb)),rt); + } + catch (...) + { + assert(!is_pipe); + Probe::add(sb,rt); + } + } + +void +Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt) + { + assert(!ip_pipe); + Probe::add(ss,rt); + } +#else // unix +void +Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt) + { + if (rt == ready_none || rt & ready_read) + add_socket(ps.get_readfd(), ready_read); + if (rt == ready_none || rt & ready_write) + add_socket(ps.get_writefd(), ready_write); + } + +void +Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt) + { + try + { + add(const_cast(dynamic_cast(sb)),rt); + } + catch (...) + { + Probe::add(sb,rt); + } + } + +void +Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt) + { + Probe::add(ss,rt); + } +#endif + +#ifdef BUILD_UNIT_TESTS +#include "unit_tests.hh" + +static void +simple_pipe_test() +{ try + { + Netxx::PipeStream pipe("cat",std::vector()); + + std::string result; + Netxx::PipeCompatibleProbe probe; + Netxx::Timeout timeout(2L), short_time(0,1000); + + // time out because no data is available + probe.clear(); + probe.add(pipe, Netxx::Probe::ready_read); + Netxx::Probe::result_type res = probe.ready(short_time); + I(res.second==Netxx::Probe::ready_none); + + // write should be possible + probe.clear(); + probe.add(pipe, Netxx::Probe::ready_write); + res = probe.ready(short_time); + I(res.second&Netxx::Probe::ready_write); + I(res.first==pipe.get_writefd()); + + // try binary transparency + for (int c = 0; c < 256; ++c) + { + char buf[1024]; + buf[0] = c; + buf[1] = 255 - c; + pipe.write(buf, 2); + + std::string result; + while (result.size() < 2) + { // wait for data to arrive + probe.clear(); + probe.add(pipe, Netxx::Probe::ready_read); + res = probe.ready(timeout); + E(res.second & Netxx::Probe::ready_read, F("timeout reading data %d") % c); + I(res.first == pipe.get_readfd()); + int bytes = pipe.read(buf, sizeof(buf)); + result += std::string(buf, bytes); + } + I(result.size() == 2); + I(static_cast(result[0]) == c); + I(static_cast(result[1]) == 255 - c); + } + + pipe.close(); + + } +catch (informative_failure &e) + // for some reason boost does not provide + // enough information + { + W(F("Failure %s\n") % e.what); + throw; + } +} + +void +add_pipe_tests(test_suite * suite) +{ + I(suite); + suite->add(BOOST_TEST_CASE(&simple_pipe_test)); +} +#endif ============================================================ --- netxx_pipe.hh 9b4e1e3543138f5e784814446299bce571ea5e18 +++ netxx_pipe.hh 9b4e1e3543138f5e784814446299bce571ea5e18 @@ -0,0 +1,129 @@ +// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil -*- +// copyright (C) 2005 Christof Petig +// all rights reserved. +// licensed to the public under the terms of the GNU GPL (>= 2) +// see the file COPYING for details + +#include +#include +#include +#include +#ifdef WIN32 +# include +#endif + +/* + What is this all for? + + If you want to transparently handle a pipe and a socket on unix and + windows you have to abstract some difficulties: + + - sockets have a single filedescriptor for reading and writing + pipes usually come in pairs (one for reading and one for writing) + + - process creation is different on unix and windows + + => so Netxx::PipeStream is a Netxx::StreamBase which abstracts two pipes to + and from an external command + + - windows can select on a socket but not on a pipe + + => so Netxx::PipeCompatibleProbe is a Netxx::Probe like class which + _can_ handle pipes on windows (emulating select is difficult at best!) + (on unix Probe and PipeCompatibleProbe are nearly identical: with pipes + you should not select for both read and write on the same descriptor) + +*/ + +namespace Netxx + { + class PipeCompatibleProbe; + class StreamServer; + + class PipeStream : public StreamBase + { +#ifdef WIN32 + HANDLE named_pipe; + HANDLE child; + char readbuf[1024]; + DWORD bytes_available; + bool read_in_progress; + OVERLAPPED overlap; + friend class PipeCompatibleProbe; +#else + int readfd, writefd; + int child; +#endif + + + public: + // do we need Timeout for symmetry with Stream? + explicit PipeStream (int readfd, int writefd); + explicit PipeStream (const std::string &cmd, const std::vector &args); + virtual ~PipeStream() { close(); } + virtual signed_size_type read (void *buffer, size_type length); + virtual signed_size_type write (const void *buffer, size_type length); + virtual void close (void); + virtual socket_type get_socketfd (void) const; + virtual const ProbeInfo* get_probe_info (void) const; + int get_readfd(void) const + { +#ifdef WIN32 + return -1; +#else + return readfd; +#endif + } + int get_writefd(void) const + { +#ifdef WIN32 + return -1; +#else + return writefd; +#endif + } + }; + +#ifdef WIN32 + + // This probe can either handle _one_ PipeStream or several network + // Streams so if !is_pipe this acts like a Probe. + class PipeCompatibleProbe : public Probe + { + bool is_pipe; + // only meaningful if is_pipe is true + PipeStream *pipe; + ready_type ready_t; + public: + PipeCompatibleProbe() : is_pipe(), pipe(), ready_t() + {} + void clear() + { + if (is_pipe) + { + pipe=0; + is_pipe=false; + } + else + Probe::clear(); + } + // This function does all the hard work (emulating a select). + result_type ready(const Timeout &timeout=Timeout(), ready_type rt=ready_none); + void add(PipeStream &ps, ready_type rt=ready_none); + void add(const StreamBase &sb, ready_type rt=ready_none); + void add(const StreamServer &ss, ready_type rt=ready_none); + void remove(const PipeStream &ps); + }; +#else + + // We only act specially if a PipeStream is added (directly or via + // the StreamBase parent reference). + struct PipeCompatibleProbe : Probe + { + void add(PipeStream &ps, ready_type rt=ready_none); + void add(const StreamBase &sb, ready_type rt=ready_none); + void add(const StreamServer &ss, ready_type rt=ready_none); + }; +#endif + +}