# # patch "netxx_pipe.cc" # from [318b2bdcf99056633351a82117b72919aa3af7ac] # to [ad08aaaf554af2860edfba10250723ae7d6c47a3] # # patch "netxx_pipe.hh" # from [a7dc0c04e3c327a4afdd379b6eae7d709d105864] # to [1d8b8627413c3dedecd8d2aba09e0696a4846435] # ======================================================================== --- netxx_pipe.cc 318b2bdcf99056633351a82117b72919aa3af7ac +++ netxx_pipe.cc ad08aaaf554af2860edfba10250723ae7d6c47a3 @@ -10,9 +10,8 @@ #include Netxx::PipeStream::PipeStream(int _readfd, int _writefd) - : readfd(_readfd), writefd(_writefd), child() -{ -} + : readfd(_readfd), writefd(_writefd), child() +{} #ifndef __WIN32__ #include @@ -21,41 +20,48 @@ #include // create pipes for stdio and fork subprocess -static pid_t +static pid_t pipe_and_fork(int *fd1,int *fd2) -{ +{ pid_t result=-1; - fd1[0]=-1; fd1[1]=-1; - fd2[0]=-1; fd2[1]=-1; - if (pipe(fd1)) return -1; - if (pipe(fd2)) - { - close(fd1[0]); - close(fd1[1]); - return -1; - } + fd1[0]=-1; + fd1[1]=-1; + fd2[0]=-1; + fd2[1]=-1; + if (pipe(fd1)) + return -1; + if (pipe(fd2)) + { + close(fd1[0]); + close(fd1[1]); + return -1; + } result=fork(); if (result<0) - { close(fd1[0]); close(fd1[1]); - close(fd2[0]); close(fd2[1]); - return -1; - } + { + close(fd1[0]); + close(fd1[1]); + close(fd2[0]); + close(fd2[1]); + return -1; + } else if (!result) - { // fd1[1] for writing, fd2[0] for reading - close(fd1[0]); - close(fd2[1]); - if (dup2(fd2[0],0)!=0 || dup2(fd1[1],1)!=1) - { perror("dup2"); - exit(-1); // kill the useless child + { // fd1[1] for writing, fd2[0] for reading + close(fd1[0]); + close(fd2[1]); + if (dup2(fd2[0],0)!=0 || dup2(fd1[1],1)!=1) + { + perror("dup2"); + exit(-1); // kill the useless child + } + close(fd1[1]); + close(fd2[0]); } - close(fd1[1]); - close(fd2[0]); - } else - { // fd1[0] for reading, fd2[1] for writing - close(fd1[1]); - close(fd2[0]); - } + { // fd1[0] for reading, fd2[1] for writing + close(fd1[1]); + close(fd2[0]); + } return result; } #endif @@ -69,8 +75,9 @@ #endif Netxx::PipeStream::PipeStream (const std::string &cmd, const std::vector &args) - : readfd(), writefd(), child() -{ const unsigned newsize=64; + : readfd(), writefd(), child() +{ + const unsigned newsize=64; const char *newargv[newsize]; I(args.size()<(sizeof(newargv)/sizeof(newargv[0]))); unsigned newargc=0; @@ -79,12 +86,18 @@ newargv[newargc++]=i->c_str(); newargv[newargc]=0; #ifdef WIN32 + int fd1[2],fd2[2]; - fd1[0]=-1; fd1[1]=-1; - fd2[0]=-1; fd2[1]=-1; + fd1[0]=-1; + fd1[1]=-1; + fd2[0]=-1; + fd2[1]=-1; E(_pipe(fd1,0,_O_BINARY)==0, F("first pipe failed")); if (_pipe(fd2,0,_O_BINARY)) // | O_NOINHERIT - { ::close(fd1[0]); ::close(fd1[1]); E(false,F("second pipe failed")); } + { ::close(fd1[0]); + ::close(fd1[1]); + E(false,F("second pipe failed")); + } // abuse dup, use spawnvp? PROCESS_INFORMATION piProcInfo; STARTUPINFO siStartInfo; @@ -99,31 +112,33 @@ // as its argument I(args.size()<(sizeof(argv)/sizeof(argv[0]))); for (std::vector::const_iterator i=args.begin();i!=args.end();++i,++pos) - argv[pos]=i->c_str(); + argv[pos]=i->c_str(); argv[pos]=0; std::string cmdline=munge_argv_into_cmdline(argv); L(F("cmdline '%s'\n") % cmdline); FAIL_IF(CreateProcess,(0,const_cast(cmdline.c_str()), - 0,0,TRUE,0,0,0,&siStartInfo,&piProcInfo),==0); + 0,0,TRUE,0,0,0,&siStartInfo,&piProcInfo),==0); ::close(fd1[1]); ::close(fd2[0]); child=long(piProcInfo.hProcess); readfd=fd1[0]; writefd=fd2[1]; - + memset(&overlap,0,sizeof overlap); overlap.hEvent=CreateEvent(0,FALSE,FALSE,0); bytes_available=0; I(overlap.hEvent!=0); #else + int fd1[2],fd2[2]; child=pipe_and_fork(fd1,fd2); E(child>=0, F("pipe/fork failed %s") % strerror(errno)); if (!child) - { execvp(newargv[0],const_cast(newargv)); - perror(newargv[0]); - exit(errno); - } + { + execvp(newargv[0],const_cast(newargv)); + perror(newargv[0]); + exit(errno); + } readfd=fd1[0]; writefd=fd2[1]; fcntl(readfd,F_SETFL,fcntl(readfd,F_GETFL)|O_NONBLOCK); @@ -133,41 +148,51 @@ Netxx::signed_size_type Netxx::PipeStream::read (void *buffer, size_type length) { #ifdef WIN32 - if (length>bytes_available) length=bytes_available; + if (length>bytes_available) + length=bytes_available; if (length) - { memcpy(buffer,readbuf,length); - if (lengthget_writefd(),ready_write); +{ + if (!is_pipe) + return Probe::ready(timeout,rt); + if (rt==ready_none) + rt=ready_t; // remembered from add + if (rt&ready_write) + return std::make_pair(pipe->get_writefd(),ready_write); if (rt&ready_read) - { if (pipe->bytes_available) return std::make_pair(pipe->get_readfd(),ready_read); - - HANDLE h_read=(HANDLE)_get_osfhandle(pipe->get_readfd()); - DWORD bytes_read=0; - FAIL_IF( ReadFile,(h_read,pipe->readbuf,1,&bytes_read,&pipe->overlap),==0); - if (!bytes_read) - { FAIL_IF( WaitForSingleObject,(pipe->overlap.hEvent,timeout.get_sec()),==WAIT_FAILED); - FAIL_IF( GetOverlappedResult,(h_read,&pipe->overlap,&bytes_read,FALSE),==0); + { + if (pipe->bytes_available) + return std::make_pair(pipe->get_readfd(),ready_read); + + HANDLE h_read=(HANDLE)_get_osfhandle(pipe->get_readfd()); + DWORD bytes_read=0; + FAIL_IF( ReadFile,(h_read,pipe->readbuf,1,&bytes_read,&pipe->overlap),==0); if (!bytes_read) - { FAIL_IF( CancelIo,(h_read),==0); - std::make_pair(socket_type(-1),ready_none); - } + { + FAIL_IF( WaitForSingleObject,(pipe->overlap.hEvent,timeout.get_sec()),==WAIT_FAILED); + FAIL_IF( GetOverlappedResult,(h_read,&pipe->overlap,&bytes_read,FALSE),==0); + if (!bytes_read) + { + FAIL_IF( CancelIo,(h_read),==0); + std::make_pair(socket_type(-1),ready_none); + } + } + I(bytes_read==1); + pipe->bytes_available=bytes_read; + FAIL_IF( ReadFile,(h_read,pipe->readbuf+1,sizeof pipe->readbuf-1,&bytes_read,&pipe->overlap),==0); + FAIL_IF( CancelIo,(h_read),==0); + if (!bytes_read) + { + FAIL_IF( GetOverlappedResult,(h_read,&pipe->overlap,&bytes_read,FALSE),==0); + I(!bytes_read); + } + else + { + pipe->bytes_available+=bytes_read; + } + return std::make_pair(pipe->get_readfd(),ready_read); } - I(bytes_read==1); - pipe->bytes_available=bytes_read; - FAIL_IF( ReadFile,(h_read,pipe->readbuf+1,sizeof pipe->readbuf-1,&bytes_read,&pipe->overlap),==0); - FAIL_IF( CancelIo,(h_read),==0); - if (!bytes_read) - { FAIL_IF( GetOverlappedResult,(h_read,&pipe->overlap,&bytes_read,FALSE),==0); - I(!bytes_read); - } - else - { pipe->bytes_available+=bytes_read; - } - return std::make_pair(pipe->get_readfd(),ready_read); - } return std::make_pair(socket_type(-1),ready_none); } -void Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt) -{ assert(!is_pipe); - assert(!pipe); - is_pipe=true; - pipe=&ps; - ready_t=rt; -} +void Netxx::PipeCompatibleProbe::add + (PipeStream &ps, ready_type rt) + { + assert(!is_pipe); + assert(!pipe); + is_pipe=true; + pipe=&ps; + ready_t=rt; + } -void Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt) -{ // L(F("PCP::add()\n")); - try - { add(const_cast(dynamic_cast(sb)),rt); - // L(F("... was a pipe\n")); +void Netxx::PipeCompatibleProbe::add + (const StreamBase &sb, ready_type rt) + { // L(F("PCP::add()\n")); + try + { + add + (const_cast(dynamic_cast(sb)),rt); + // L(F("... was a pipe\n")); + } + catch (...) + { + assert(!is_pipe); + Probe::add + (sb,rt); + // L(F("... was a socket\n")); + } } - catch (...) - { assert(!is_pipe); - Probe::add(sb,rt); - // L(F("... was a socket\n")); - } -} -void Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt) -{ assert(!ip_pipe); - Probe::add(ss,rt); -} +void Netxx::PipeCompatibleProbe::add + (const StreamServer &ss, ready_type rt) + { + assert(!ip_pipe); + Probe::add + (ss,rt); + } #else // unix void -Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt) -{ - if (rt==ready_none || rt&ready_read) add_socket(ps.get_readfd(),ready_read); - if (rt==ready_none || rt&ready_write) add_socket(ps.get_writefd(),ready_write); -} +Netxx::PipeCompatibleProbe::add + (PipeStream &ps, ready_type rt) + { + if (rt==ready_none || rt&ready_read) + add_socket(ps.get_readfd(),ready_read); + if (rt==ready_none || rt&ready_write) + add_socket(ps.get_writefd(),ready_write); + } void -Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt) -{ - try - { add(const_cast(dynamic_cast(sb)),rt); +Netxx::PipeCompatibleProbe::add + (const StreamBase &sb, ready_type rt) + { + try + { + add + (const_cast(dynamic_cast(sb)),rt); + } + catch (...) + { + Probe::add + (sb,rt); + } } - catch (...) - { Probe::add(sb,rt); - } -} void -Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt) -{ - Probe::add(ss,rt); -} +Netxx::PipeCompatibleProbe::add + (const StreamServer &ss, ready_type rt) + { + Probe::add + (ss,rt); + } #endif #ifdef BUILD_UNIT_TESTS @@ -265,47 +320,49 @@ static void simple_pipe_test() -{ Netxx::PipeStream pipe("cat",std::vector()); +{ + Netxx::PipeStream pipe("cat",std::vector()); std::string result; Netxx::PipeCompatibleProbe probe; Netxx::Timeout timeout(2L), short_time(0,500); - + // time out because no data is available probe.clear(); probe.add(pipe, Netxx::Probe::ready_read); Netxx::Probe::result_type res = probe.ready(short_time); I(res.second==Netxx::Probe::ready_none); - + // write should be possible probe.clear(); probe.add(pipe, Netxx::Probe::ready_write); res = probe.ready(short_time); I(res.second&Netxx::Probe::ready_write); I(res.first==pipe.get_writefd()); - + // try binary transparency for (int c=0; c<256; ++c) - { char buf[1024]; - buf[0]=c; - buf[1]=255-c; - pipe.write(buf,2); - - std::string result; - while (result.size()<2) - { // wait for data to arrive - probe.clear(); - probe.add(pipe, Netxx::Probe::ready_read); - res = probe.ready(timeout); - E(res.second&Netxx::Probe::ready_read, F("timeout reading data %d") % c); - I(res.first==pipe.get_readfd()); - int bytes=pipe.read(buf,sizeof buf); - result+=std::string(buf,bytes); + { + char buf[1024]; + buf[0]=c; + buf[1]=255-c; + pipe.write(buf,2); + + std::string result; + while (result.size()<2) + { // wait for data to arrive + probe.clear(); + probe.add(pipe, Netxx::Probe::ready_read); + res = probe.ready(timeout); + E(res.second&Netxx::Probe::ready_read, F("timeout reading data %d") % c); + I(res.first==pipe.get_readfd()); + int bytes=pipe.read(buf,sizeof buf); + result+=std::string(buf,bytes); + } + I(result.size()==2); + I((unsigned char)(result[0])==c); + I((unsigned char)(result[1])==255-c); } - I(result.size()==2); - I((unsigned char)(result[0])==c); - I((unsigned char)(result[1])==255-c); - } pipe.close(); } @@ -313,6 +370,7 @@ add_pipe_tests(test_suite * suite) { I(suite); - suite->add(BOOST_TEST_CASE(&simple_pipe_test)); + suite->add + (BOOST_TEST_CASE(&simple_pipe_test)); } #endif ======================================================================== --- netxx_pipe.hh a7dc0c04e3c327a4afdd379b6eae7d709d105864 +++ netxx_pipe.hh 1d8b8627413c3dedecd8d2aba09e0696a4846435 @@ -13,10 +13,10 @@ #endif /* What is this all for? - + If you want to transparently handle a pipe and a socket on unix and windows you have to abstract some difficulties: - + - sockets have a single filedescriptor for reading and writing pipes usually come in pairs (one for reading and one for writing) @@ -33,77 +33,99 @@ */ -namespace Netxx { -class PipeCompatibleProbe; -class StreamServer; +namespace Netxx + { + class PipeCompatibleProbe; + class StreamServer; -class PipeStream : public StreamBase -{ int readfd, writefd; -// ProbeInfo pi_; - int child; + class PipeStream : public StreamBase + { + int readfd, writefd; + // ProbeInfo pi_; + int child; #ifdef WIN32 - char readbuf[1024]; - unsigned bytes_available; - OVERLAPPED overlap; - - friend class PipeCompatibleProbe; + + char readbuf[1024]; + unsigned bytes_available; + OVERLAPPED overlap; + + friend class PipeCompatibleProbe; #endif -public: - explicit PipeStream (int readfd, int writefd); // Timeout? - explicit PipeStream (const std::string &cmd, const std::vector &args); - virtual signed_size_type read (void *buffer, size_type length); - virtual signed_size_type write (const void *buffer, size_type length); - virtual void close (void); - virtual socket_type get_socketfd (void) const; - virtual const ProbeInfo* get_probe_info (void) const; - int get_readfd(void) const - { - return readfd; - } - int get_writefd(void) const - { - return writefd; - } -}; + public: + explicit PipeStream (int readfd, int writefd); // Timeout? + explicit PipeStream (const std::string &cmd, const std::vector &args); + virtual signed_size_type read (void *buffer, size_type length); + virtual signed_size_type write (const void *buffer, size_type length); + virtual void close (void); + virtual socket_type get_socketfd (void) const; + virtual const ProbeInfo* get_probe_info (void) const; + int get_readfd(void) const + { + return readfd; + } + int get_writefd(void) const + { + return writefd; + } + }; + #ifdef WIN32 + class PipeCompatibleProbe : public Probe - { // We need to make sure that only pipes are connected, if Streams are - // connected the old Probe functions still apply - // use WriteFileEx/ReadFileEx with Overlap? + { // We need to make sure that only pipes are connected, if Streams are + // connected the old Probe functions still apply + // use WriteFileEx/ReadFileEx with Overlap? bool is_pipe; PipeStream *pipe; ready_type ready_t; public: - PipeCompatibleProbe() : is_pipe(), pipe(), ready_t() {} + PipeCompatibleProbe() : is_pipe(), pipe(), ready_t() + {} void clear() - { - if (is_pipe) - { - pipe=0; - is_pipe=false; - } - else - Probe::clear(); + { + if (is_pipe) + { + pipe=0; + is_pipe=false; + } + else + Probe::clear(); } result_type ready(const Timeout &timeout=Timeout(), ready_type rt=ready_none); - void add(PipeStream &ps, ready_type rt=ready_none); - void add(const StreamBase &sb, ready_type rt=ready_none); - void add(const StreamServer &ss, ready_type rt=ready_none); - void remove(const PipeStream &ps); -#if 0 // should be covered by StreamBase - template void add (const T &t, ready_type rt=ready_none) - { if (is_pipe) throw std::runtime_error("stream added to a pipe probe"); - Probe::add(t,rt); - } + void add + (PipeStream &ps, ready_type rt=ready_none); + void add + (const StreamBase &sb, ready_type rt=ready_none); + void add + (const StreamServer &ss, ready_type rt=ready_none); + void remove + (const PipeStream &ps); +#if 0 // should be covered by StreamBase + + template + void add + (const T &t, ready_type rt=ready_none) + { + if (is_pipe) + throw std::runtime_error("stream added to a pipe probe"); + Probe::add + (t,rt); + } #endif - }; + + }; #else + struct PipeCompatibleProbe : Probe - { - void add(PipeStream &ps, ready_type rt=ready_none); - void add(const StreamBase &sb, ready_type rt=ready_none); - void add(const StreamServer &ss, ready_type rt=ready_none); - }; + { + void add + (PipeStream &ps, ready_type rt=ready_none); + void add + (const StreamBase &sb, ready_type rt=ready_none); + void add + (const StreamServer &ss, ready_type rt=ready_none); + }; #endif + }