/* Synchronous writing, asynchronous reading of pipes connected to a
subprocess.
Copyright (C) 2009 Free Software Foundation, Inc.
Written by Paolo Bonzini
, 2009.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see . */
#include
#include "pipe-filter.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include "error.h"
#include "gettext.h"
#include "pipe.h"
#include "wait-process.h"
#include "xalloc.h"
#define _(str) gettext (str)
#ifndef SSIZE_MAX
# define SSIZE_MAX ((ssize_t) (SIZE_MAX / 2))
#endif
/* We use a child process, and communicate through a bidirectional pipe.
To avoid deadlocks, let the child process decide when it wants to write,
and let the parent read accordingly. The parent uses select() to
know whether it must write or read. On platforms without select(),
we use non-blocking I/O. (This means the parent is busy looping
while waiting for the child. Not good. But hardly any platform
lacks select() nowadays.) */
/* On BeOS select() works only on sockets, not on normal file descriptors. */
#ifdef __BEOS__
# undef HAVE_SELECT
#endif
#ifdef EINTR
/* EINTR handling for close(), read(), write(), select().
These functions can return -1/EINTR even though we don't have any
signal handlers set up, namely when we get interrupted via SIGSTOP. */
static inline int
nonintr_close (int fd)
{
int retval;
do
retval = close (fd);
while (retval < 0 && errno == EINTR);
return retval;
}
#undef close /* avoid warning related to gnulib module unistd */
#define close nonintr_close
static inline ssize_t
nonintr_read (int fd, void *buf, size_t count)
{
ssize_t retval;
do
retval = read (fd, buf, count);
while (retval < 0 && errno == EINTR);
return retval;
}
#define read nonintr_read
static inline ssize_t
nonintr_write (int fd, const void *buf, size_t count)
{
ssize_t retval;
do
retval = write (fd, buf, count);
while (retval < 0 && errno == EINTR);
return retval;
}
#undef write /* avoid warning on VMS */
#define write nonintr_write
# if HAVE_SELECT
static inline int
nonintr_select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,
struct timeval *timeout)
{
int retval;
do
retval = select (n, readfds, writefds, exceptfds, timeout);
while (retval < 0 && errno == EINTR);
return retval;
}
# undef select /* avoid warning on VMS */
# define select nonintr_select
# endif
#endif
/* Non-blocking I/O. */
#ifndef O_NONBLOCK
# define O_NONBLOCK O_NDELAY
#endif
#if HAVE_SELECT
# define IS_EAGAIN(errcode) 0
#else
# ifdef EWOULDBLOCK
# define IS_EAGAIN(errcode) ((errcode) == EAGAIN || (errcode) == EWOULDBLOCK)
# else
# define IS_EAGAIN(errcode) ((errcode) == EAGAIN)
# endif
#endif
struct filter {
pid_t child;
const char *progname;
bool null_stderr;
bool exit_on_error;
char *read_buf;
size_t read_bufsize;
done_read_fn done_read;
void *private_data;
int fd[2];
int exit_code;
volatile bool reader_terminated;
volatile bool writer_terminated;
#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
volatile int writer_final_errno;
volatile int reader_final_errno;
const char *write_buf;
size_t write_bufsize;
HANDLE hReader, hWriter;
/* thread synchronization yadda yadda */
#else
fd_set readfds, writefds;
#endif
};
#if (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
static unsigned int WINAPI
writer_thread_func (void *thread_arg)
{
/* writing yadda yadda */
}
static unsigned int WINAPI
reader_thread_func (void *thread_arg)
{
/* reading yadda yadda */
}
static int
filter_init (struct filter *f)
{
f->writer_final_errno = 0;
f->reader_final_errno = 0;
f->write_buf = NULL;
f->write_bufsize = 0;
/* create synchronization objects yadda yadda */
f->hWriter =
(HANDLE) _beginthreadex (NULL, 100000, writer_thread_func, f, 0, NULL);
f->hReader =
(HANDLE) _beginthreadex (NULL, 100000, reader_thread_func, f, 0, NULL);
if (f->hWriter == NULL || f->hReader == NULL)
{
if (f->exit_on_error)
error (EXIT_FAILURE, 0, _("creation of threads failed"));
return -1;
}
else
return 0;
}
static int
filter_loop (struct filter *f, const char *buf, size_t size)
{
/* wake up threads yadda yadda */
if (f->writer_final_errno || f->reader_final_errno)
{
errno = (f->writer_final_errno ? f->writer_final_errno
: f->reader_final_errno);
return -1;
}
else
return 0;
}
static void
filter_cleanup (struct filter *f, bool try_io)
{
TerminateThread (f->hWriter, 1);
close (f->fd[1]);
f->writer_terminated = true;
/* clean up synchronization objects yadda yadda */
f->reader_terminated = true;
close (f->fd[0]);
}
#else
static int
filter_init (struct filter *f)
{
/* Enable non-blocking I/O. This permits the read() and write() calls
to return -1/EAGAIN without blocking; this is important for polling
if HAVE_SELECT is not defined. It also permits the read() and write()
calls to return after partial reads/writes; this is important if
HAVE_SELECT is defined, because select() only says that some data
can be read or written, not how many. Without non-blocking I/O,
Linux 2.2.17 and BSD systems prefer to block instead of returning
with partial results. */
int fcntl_flags;
if ((fcntl_flags = fcntl (f->fd[1], F_GETFL, 0)) < 0
|| fcntl (f->fd[1], F_SETFL, fcntl_flags | O_NONBLOCK) < 0
|| (fcntl_flags = fcntl (f->fd[0], F_GETFL, 0)) < 0
|| fcntl (f->fd[0], F_SETFL, fcntl_flags | O_NONBLOCK) < 0)
{
if (f->exit_on_error)
error (EXIT_FAILURE, errno,
_("cannot set up nonblocking I/O to %s subprocess"),
f->progname);
return -1;
}
FD_ZERO (&f->readfds);
FD_ZERO (&f->writefds);
return 0;
}
static int
filter_loop (struct filter *f, const char *buf, size_t bufsize)
{
static struct timeval tv0;
for (;;)
{
#if HAVE_SELECT
int n = f->fd[0] > f->fd[1] ? f->fd[0] + 1 : f->fd[1] + 1;
if (!f->reader_terminated)
FD_SET (f->fd[0], &f->readfds);
if (!f->writer_terminated)
FD_SET (f->fd[1], &f->writefds);
n = select (n, &f->readfds, (bufsize ? &f->writefds : NULL), NULL,
(!f->writer_terminated && !bufsize ? &tv0 : NULL));
if (n == 0)
break;
if (n < 0)
{
if (f->exit_on_error)
error (EXIT_FAILURE, errno,
_("communication with %s subprocess failed"),
f->progname);
f->writer_terminated = true;
return -1;
}
if (bufsize && FD_ISSET (f->fd[1], &f->writefds))
goto try_write;
if (FD_ISSET (f->fd[0], &f->readfds))
goto try_read;
break;
#endif
/* Attempt to write. */
#if HAVE_SELECT
try_write:
#endif
if (bufsize)
{
ssize_t nwritten = write (f->fd[1], buf,
bufsize > SSIZE_MAX ? SSIZE_MAX : bufsize);
if (nwritten < 0)
{
if (IS_EAGAIN (errno))
continue;
if (f->exit_on_error)
error (EXIT_FAILURE, errno,
_("write to %s subprocess failed"), f->progname);
f->writer_terminated = true;
return -1;
}
else
{
bufsize -= nwritten;
buf += nwritten;
}
}
#if HAVE_SELECT
continue;
#endif
/* Attempt to read. */
#if HAVE_SELECT
try_read:
#endif
{
ssize_t nread = read (f->fd[0], f->read_buf, f->read_bufsize);
if (nread < 0)
{
if (IS_EAGAIN (errno))
continue;
if (f->exit_on_error)
error (EXIT_FAILURE, errno,
_("read from %s subprocess failed"), f->progname);
}
if (nread <= 0)
{
f->reader_terminated = true;
return 0;
}
else
f->done_read (f->read_buf, nread, f->private_data);
}
#if HAVE_SELECT
continue;
#endif
}
}
static void
filter_cleanup (struct filter *f, bool try_io)
{
close (f->fd[1]);
f->writer_terminated = true;
if (try_io && !f->reader_terminated)
filter_loop (f, NULL, 0);
f->reader_terminated = true;
close (f->fd[0]);
}
#endif
static void
filter_terminate (struct filter *f)
{
if (f->exit_code == -1)
{
filter_cleanup (f, !f->reader_terminated && !f->writer_terminated);
f->exit_code = wait_subprocess (f->child, f->progname, true, f->null_stderr,
true, f->exit_on_error, NULL);
if (f->exit_on_error && f->exit_code)
error (EXIT_FAILURE, 0, _("subprocess %s failed (exit status %d)"),
f->progname, f->exit_code);
}
}
/* Create a subprocess and pipe some data through it.
progname is the program name used in error messages.
prog_path is the file name of the program to invoke.
prog_argv is a NULL terminated argument list, starting with
prog_path as first element.
If null_stderr is true, the subprocess' stderr will be redirected
to /dev/null, and the usual error message to stderr will be
omitted. This is suitable when the subprocess does not fulfill an
important task.
If exit_on_error is true, any error will cause the main process to
exit with an error status.
If the subprocess does not start correctly, exit if exit_on_error is
true, otherwise return NULL and set errno.
The caller will write to the subprocess through filter_write; during
calls to filter_write, the done_read function may be called to
process any data that the subprocess has written. done_read will
receive at most read_bufsize bytes stored into buf, as well as a
copy of private_data. */
struct filter *
filter_create (const char *progname,
const char *prog_path, const char **prog_argv,
bool null_stderr, bool exit_on_error,
char *read_buf, size_t read_bufsize,
done_read_fn done_read,
void *private_data)
{
struct filter *f = xmalloc (sizeof (struct filter));
pid_t child;
int fd[2];
/* Open a bidirectional pipe to a subprocess. */
f->child = create_pipe_bidi (progname, prog_path, (char **) prog_argv,
null_stderr, true, exit_on_error,
fd);
f->progname = progname;
f->null_stderr = null_stderr;
f->exit_on_error = exit_on_error;
f->exit_code = -1;
f->read_buf = read_buf;
f->read_bufsize = read_bufsize;
f->done_read = done_read;
f->private_data = private_data;
f->reader_terminated = false;
f->writer_terminated = false;
f->fd[0] = fd[0];
f->fd[1] = fd[1];
if (filter_init (f) < 0)
filter_terminate (f);
return f;
}
/* Write size bytes starting at buf into the pipe and in the meanwhile
possibly call the done_read function specified in create_filter.
The done_read function may be called in a different thread than
the current thread, depending on the platform. However, it will
always be called before filter_write has returned (or else will be
delayed to the next call to filter_write or filter_close). Return
only after all the entire buffer has been written to the pipe.
If the subprocess exits early with zero status, subsequent writes
will becomes no-ops and zero is returned.
If there is a problem reading or writing, return -1 and set errno.
If the subprocess exits early with nonzero status, return the status.
(In either case, filter_write will instead exit if exit_on_error was
passed as true).
Otherwise return 0. */
int
filter_write (struct filter *f, const char *buf, size_t size)
{
int rc, save_errno;
assert (buf);
if (f->exit_code != -1)
return f->exit_code;
if (!size)
return 0;
rc = filter_loop (f, buf, size);
if (!f->reader_terminated && !f->writer_terminated)
return 0;
save_errno = errno;
filter_terminate (f);
errno = save_errno;
return (rc < 0 ? rc : f->exit_code);
}
/* Finish reading the output via the done_read function specified in
create_filter. The done_read function may be called in a different
thread than. However, it will always be called before filter_close
has returned. The write side of the pipe is closed as soon as
filter_close starts, while the read side will be closed just before
it finishes. If there is a problem reading or closing the pipe,
return -1 and set errno. If the subprocess exits early with nonzero
status, return the status. (In either case, filter_close will
instead exit if exit_on_error was passed as true).
Otherwise return 0. */
int
filter_close (struct filter *f)
{
int rc, save_errno;
filter_terminate (f);
rc = f->exit_code;
save_errno = errno;
free (f);
errno = save_errno;
return rc;
}