[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: feature request: gzip/bzip support for sort
From: |
Dan Hipschman |
Subject: |
Re: feature request: gzip/bzip support for sort |
Date: |
Mon, 15 Jan 2007 17:53:12 -0800 |
User-agent: |
Mutt/1.5.9i |
On Sat, Jan 13, 2007 at 10:07:59PM -0800, Paul Eggert wrote:
> 3. I can see where the user might be able to specify a better
> algorithm, for a particular data set. For that, how about if we have
> a --compress-program=PROGRAM option, which lets the user plug in any
> program that works as a pipeline? E.g., --compress-program=gzip would
> use gzip. The default would be to use "PROGRAM -d" to decompress; we
> could have another option if that doesn't suffice.
This is pretty close. There is just a few more problems that I see:
* It doesn't respond well to SIGPIPE. It terminates without
success, so at least the user knows something went wrong, if
they check the status, but it doesn't print an error message.
* It checks the exit status of decompression processes at the
very end, so the user is notified (with an error message and
non-successful exit code) if a decompression process fails, but
after the output has been generated. I don't think there's
anything we can do about this in general, though.
* If we can't fork/exec a decompression process, we're hosed,
but if we can't fork/exec a compression process, a possible
improvement might be to just proceed without compression (and
maybe print a warning). On the other hand, the user might want
us to terminate (e.g., if they just spelled the name of the
program wrong).
* Like Paul said, we might want a separate decompress program,
and the user might want to pass in options (e.g.,
--compress-program="gzip -1").
I'm still testing, but so far it's looking pretty good. If you think
this patch has a good chance of making it into coreutils, then I'll keep
working on it. It does a good job of compression, so I think it will
solve (most of) the OP's problem, but it's not a speed booster on my
machine. If someone with a multi-processor computer wants to test it
maybe we can get some good news on that front.
Here's the patch for comments. Thanks,
Dan
Index: sort.c
===================================================================
RCS file: /sources/coreutils/coreutils/src/sort.c,v
retrieving revision 1.344
diff -p -u -r1.344 sort.c
--- sort.c 13 Dec 2006 21:27:05 -0000 1.344
+++ sort.c 16 Jan 2007 01:34:50 -0000
@@ -1,5 +1,5 @@
/* sort - sort lines of text (with all kinds of options).
- Copyright (C) 1988, 1991-2006 Free Software Foundation, Inc.
+ Copyright (C) 1988, 1991-2007 Free Software Foundation, Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -25,6 +25,7 @@
#include <getopt.h>
#include <sys/types.h>
+#include <sys/wait.h>
#include <signal.h>
#include "system.h"
#include "error.h"
@@ -261,6 +262,9 @@ static bool have_read_stdin;
/* List of key field comparisons to be tried. */
static struct keyfield *keylist;
+/* Program used to (de)compress temp files. Must accept -d. */
+static const char *compress_program;
+
static void sortlines_temp (struct line *, size_t, struct line *);
/* Report MESSAGE for FILE, then clean up and exit.
@@ -328,6 +332,9 @@ Other options:\n\
multiple options specify multiple directories\n\
-u, --unique with -c, check for strict ordering;\n\
without -c, output only the first of an equal
run\n\
+ --compress-progam=PROG (de)compress temporaries with PROG, which must\n\
+ (de)compress stdin to stdout, and accept the
-d\n\
+ command line option for decompressing\n\
"), DEFAULT_TMPDIR);
fputs (_("\
-z, --zero-terminated end lines with 0 byte, not newline\n\
@@ -364,7 +371,8 @@ native byte values.\n\
non-character as a pseudo short option, starting with CHAR_MAX + 1. */
enum
{
- RANDOM_SOURCE_OPTION = CHAR_MAX + 1
+ RANDOM_SOURCE_OPTION = CHAR_MAX + 1,
+ COMPRESS_PROGRAM
};
static char const short_options[] = "-bcdfgik:mMno:rRsS:t:T:uy:z";
@@ -373,6 +381,7 @@ static struct option const long_options[
{
{"ignore-leading-blanks", no_argument, NULL, 'b'},
{"check", no_argument, NULL, 'c'},
+ {"compress-program", required_argument, NULL, COMPRESS_PROGRAM},
{"dictionary-order", no_argument, NULL, 'd'},
{"ignore-case", no_argument, NULL, 'f'},
{"general-numeric-sort", no_argument, NULL, 'g'},
@@ -403,6 +412,7 @@ static sigset_t caught_signals;
struct tempnode
{
struct tempnode *volatile next;
+ pid_t pid; /* If compressed, the pid of compressor, else zero */
char name[1]; /* Actual size is 1 + file name length. */
};
static struct tempnode *volatile temphead;
@@ -422,8 +432,8 @@ cleanup (void)
/* Create a new temporary file, returning its newly allocated name.
Store into *PFP a stream open for writing. */
-static char *
-create_temp_file (FILE **pfp)
+static struct tempnode *
+create_temp_file (int *pfd)
{
static char const slashbase[] = "/sortXXXXXX";
static size_t temp_dir_index;
@@ -439,6 +449,7 @@ create_temp_file (FILE **pfp)
memcpy (file, temp_dir, len);
memcpy (file + len, slashbase, sizeof slashbase);
node->next = NULL;
+ node->pid = 0;
if (++temp_dir_index == temp_dir_count)
temp_dir_index = 0;
@@ -454,10 +465,11 @@ create_temp_file (FILE **pfp)
sigprocmask (SIG_SETMASK, &oldset, NULL);
errno = saved_errno;
- if (fd < 0 || (*pfp = fdopen (fd, "w")) == NULL)
+ if (fd < 0)
die (_("cannot create temporary file"), file);
- return file;
+ *pfd = fd;
+ return node;
}
/* Return a stream for FILE, opened with mode HOW. A null FILE means
@@ -515,6 +527,169 @@ xfclose (FILE *fp, char const *file)
}
static void
+close_or_die (int fd, const char *name)
+{
+ if (close (fd) < 0)
+ die (_("close failed"), name);
+}
+
+static void
+dup2_or_die (int oldfd, int newfd)
+{
+ if (dup2 (oldfd, newfd) < 0)
+ error (SORT_FAILURE, errno, _("dup2 failed"));
+}
+
+/* Fork a child process for piping to and do common cleanup */
+
+static pid_t
+pipe_fork (int pipefds[2])
+{
+ struct tempnode *saved_temphead;
+ sigset_t oldset;
+ pid_t pid;
+
+ if (pipe (pipefds) < 0)
+ error (SORT_FAILURE, errno, _("couldn't create pipe"));
+
+ /* This is so the child process won't delete our temp files
+ if it receives a signal before exec-ing. */
+ sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+ saved_temphead = temphead;
+ temphead = NULL;
+
+ if ((pid = fork ()) < 0)
+ error (SORT_FAILURE, errno, _("couldn't fork"));
+ else if (0 < pid)
+ {
+ temphead = saved_temphead;
+ sigprocmask (SIG_SETMASK, &oldset, NULL);
+ }
+ else
+ {
+ sigprocmask (SIG_SETMASK, &oldset, NULL);
+
+ close_or_die (STDIN_FILENO, _("standard input"));
+ close_or_die (STDOUT_FILENO, _("standard output"));
+ }
+
+ return pid;
+}
+
+/* Creates a temp file and compression program to filter output to it. */
+
+static char *
+create_temp (FILE **pfp, pid_t *ppid)
+{
+ int tempfd;
+ struct tempnode *node = create_temp_file (&tempfd);
+ char *name = node->name;
+
+ if (compress_program)
+ {
+ int pipefds[2];
+
+ if ((node->pid = pipe_fork (pipefds)))
+ {
+ close_or_die (tempfd, name);
+ close_or_die (pipefds[0], _("input end of pipe"));
+
+ tempfd = pipefds[1];
+ }
+ else
+ {
+ close_or_die (pipefds[1], _("output end of pipe"));
+ dup2_or_die (tempfd, STDOUT_FILENO);
+ close_or_die (tempfd, name);
+ dup2_or_die (pipefds[0], STDIN_FILENO);
+ close_or_die (pipefds[0], _("input end of pipe"));
+
+ if (execlp (compress_program, compress_program,
+ (char *) NULL) < 0)
+ error (SORT_FAILURE, errno, _("couldn't execute %s"),
+ compress_program);
+ }
+ }
+
+ if ((*pfp = fdopen (tempfd, "w")) == NULL)
+ die (_("couldn't create temp"), name);
+
+ if (ppid)
+ *ppid = node->pid;
+
+ return name;
+}
+
+static bool
+reap (pid_t pid)
+{
+ int status;
+ pid_t cpid;
+
+ if ((cpid = waitpid (pid, &status, 0)) < 0
+ && (0 <= pid || errno != ECHILD))
+ error (SORT_FAILURE, errno, _("waiting for %s"),
+ compress_program);
+
+ if (0 < cpid)
+ {
+ if (! WIFEXITED (status) || WEXITSTATUS (status))
+ error (SORT_FAILURE, 0, _("%s terminated abnormally"),
+ compress_program);
+ return true;
+ }
+
+ return false;
+}
+
+/* Open a temp file for decompression and reading. */
+
+static FILE *
+open_temp (const char *name, pid_t pid)
+{
+ int tempfd;
+ FILE *fp;
+
+ if (pid)
+ reap (pid);
+
+ tempfd = open (name, O_RDONLY);
+ if (tempfd < 0)
+ die (_("couldn't open temp"), name);
+
+ if (compress_program)
+ {
+ int pipefds[2];
+
+ if (pipe_fork (pipefds))
+ {
+ close_or_die (tempfd, name);
+ close_or_die (pipefds[1], _("output end of pipe"));
+
+ tempfd = pipefds[0];
+ }
+ else
+ {
+ close_or_die (pipefds[0], _("input end of pipe"));
+ dup2_or_die (tempfd, STDIN_FILENO);
+ close_or_die (tempfd, name);
+ dup2_or_die (pipefds[1], STDOUT_FILENO);
+ close_or_die (pipefds[1], _("output end of pipe"));
+
+ if (execlp (compress_program, compress_program,
+ "-d", (char *) NULL) < 0)
+ error (SORT_FAILURE, errno, _("couldn't execute %s -d"),
+ compress_program);
+ }
+ }
+
+ if ((fp = fdopen (tempfd, "r")) == NULL)
+ die (_("couldn't create temp"), name);
+
+ return fp;
+}
+
+static void
write_bytes (const char *buf, size_t n_bytes, FILE *fp, const char
*output_file)
{
if (fwrite (buf, 1, n_bytes, fp) != n_bytes)
@@ -1586,7 +1761,7 @@ check (char const *file_name)
file has not been opened yet (or written to, if standard output). */
static void
-mergefps (char **files, size_t ntemps, size_t nfiles,
+mergefps (char **files, pid_t *pids, size_t ntemps, size_t nfiles,
FILE *ofp, char const *output_file)
{
FILE *fps[NMERGE]; /* Input streams for each file. */
@@ -1609,7 +1784,8 @@ mergefps (char **files, size_t ntemps, s
/* Read initial lines from each input file. */
for (i = 0; i < nfiles; )
{
- fps[i] = xfopen (files[i], "r");
+ fps[i] =
+ pids[i] ? open_temp (files[i], pids[i]) : xfopen (files[i], "r");
initbuf (&buffer[i], sizeof (struct line),
MAX (merge_buffer_size, sort_size / nfiles));
if (fillbuf (&buffer[i], fps[i], files[i]))
@@ -1631,7 +1807,10 @@ mergefps (char **files, size_t ntemps, s
free (buffer[i].buf);
--nfiles;
for (j = i; j < nfiles; ++j)
- files[j] = files[j + 1];
+ {
+ files[j] = files[j + 1];
+ pids[j] = pids[j + 1];
+ }
}
}
@@ -1719,6 +1898,7 @@ mergefps (char **files, size_t ntemps, s
{
fps[i] = fps[i + 1];
files[i] = files[i + 1];
+ pids[i] = pids[i + 1];
buffer[i] = buffer[i + 1];
cur[i] = cur[i + 1];
base[i] = base[i + 1];
@@ -1893,8 +2073,8 @@ sortlines_temp (struct line *lines, size
common cases. */
static size_t
-avoid_trashing_input (char **files, size_t ntemps, size_t nfiles,
- char const *outfile)
+avoid_trashing_input (char **files, pid_t *pids, size_t ntemps,
+ size_t nfiles, char const *outfile)
{
size_t i;
bool got_outstat = false;
@@ -1930,9 +2110,11 @@ avoid_trashing_input (char **files, size
if (same)
{
FILE *tftp;
- char *temp = create_temp_file (&tftp);
- mergefps (&files[i], 0, nfiles - i, tftp, temp);
+ pid_t pid;
+ char *temp = create_temp (&tftp, &pid);
+ mergefps (&files[i], &pids[i], 0, nfiles - i, tftp, temp);
files[i] = temp;
+ pids[i] = pid;
return i + 1;
}
}
@@ -1946,7 +2128,8 @@ avoid_trashing_input (char **files, size
OUTPUT_FILE; a null OUTPUT_FILE stands for standard output. */
static void
-merge (char **files, size_t ntemps, size_t nfiles, char const *output_file)
+merge (char **files, pid_t *pids, size_t ntemps, size_t nfiles,
+ char const *output_file)
{
while (NMERGE < nfiles)
{
@@ -1967,10 +2150,12 @@ merge (char **files, size_t ntemps, size
for (out = in = 0; out < nfiles / NMERGE; out++, in += NMERGE)
{
FILE *tfp;
- char *temp = create_temp_file (&tfp);
+ pid_t pid;
+ char *temp = create_temp (&tfp, &pid);
size_t nt = MIN (ntemps, NMERGE);
ntemps -= nt;
- mergefps (&files[in], nt, NMERGE, tfp, temp);
+ mergefps (&files[in], &pids[in], nt, NMERGE, tfp, temp);
+ pids[out] = pid;
files[out] = temp;
}
@@ -1984,10 +2169,12 @@ merge (char **files, size_t ntemps, size
files as possible, to avoid needless I/O. */
size_t nshortmerge = remainder - cheap_slots + 1;
FILE *tfp;
- char *temp = create_temp_file (&tfp);
+ pid_t pid;
+ char *temp = create_temp (&tfp, &pid);
size_t nt = MIN (ntemps, nshortmerge);
ntemps -= nt;
- mergefps (&files[in], nt, nshortmerge, tfp, temp);
+ mergefps (&files[in], &pids[in], nt, nshortmerge, tfp, temp);
+ pids[out] = pid;
files[out++] = temp;
in += nshortmerge;
}
@@ -1995,12 +2182,13 @@ merge (char **files, size_t ntemps, size
/* Put the remaining input files into the last NMERGE-sized output
window, so they will be merged in the next pass. */
memmove(&files[out], &files[in], (nfiles - in) * sizeof *files);
+ memmove(&pids[out], &pids[in], (nfiles - in) * sizeof *pids);
ntemps += out;
nfiles -= in - out;
}
- nfiles = avoid_trashing_input (files, ntemps, nfiles, output_file);
- mergefps (files, ntemps, nfiles, NULL, output_file);
+ nfiles = avoid_trashing_input (files, pids, ntemps, nfiles, output_file);
+ mergefps (files, pids, ntemps, nfiles, NULL, output_file);
}
/* Sort NFILES FILES onto OUTPUT_FILE. */
@@ -2060,7 +2248,7 @@ sort (char * const *files, size_t nfiles
else
{
++ntemps;
- temp_output = create_temp_file (&tfp);
+ temp_output = create_temp (&tfp, NULL);
}
do
@@ -2088,13 +2276,16 @@ sort (char * const *files, size_t nfiles
{
size_t i;
struct tempnode *node = temphead;
- char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles);
+ pid_t *pids;
+ char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles + sizeof *pids);
+ pids = (pid_t *) (tempfiles + ntemps);
for (i = 0; node; i++)
{
tempfiles[i] = node->name;
+ pids[i] = node->pid;
node = node->next;
}
- merge (tempfiles, ntemps, ntemps, output_file);
+ merge (tempfiles, pids, ntemps, ntemps, output_file);
free (tempfiles);
}
}
@@ -2463,6 +2654,10 @@ main (int argc, char **argv)
checkonly = true;
break;
+ case COMPRESS_PROGRAM:
+ compress_program = optarg;
+ break;
+
case 'k':
key = key_init (&key_buf);
@@ -2679,10 +2874,17 @@ main (int argc, char **argv)
}
if (mergeonly)
- merge (files, 0, nfiles, outfile);
+ {
+ pid_t *pids = xnmalloc (nfiles, sizeof *pids);
+ memset (pids, 0, nfiles * sizeof *pids);
+ merge (files, pids, 0, nfiles, outfile);
+ }
else
sort (files, nfiles, outfile);
+ while (reap (-1))
+ continue;
+
if (have_read_stdin && fclose (stdin) == EOF)
die (_("close failed"), "-");
- feature request: gzip/bzip support for sort, Craig Macdonald, 2007/01/13
- Re: feature request: gzip/bzip support for sort, Jim Meyering, 2007/01/13
- Re: feature request: gzip/bzip support for sort, Dan Hipschman, 2007/01/13
- Re: feature request: gzip/bzip support for sort, Dan Hipschman, 2007/01/13
- Re: feature request: gzip/bzip support for sort, Dan Hipschman, 2007/01/14
- Re: feature request: gzip/bzip support for sort, Dan Hipschman, 2007/01/15
- Re: feature request: gzip/bzip support for sort,
Dan Hipschman <=
- Re: feature request: gzip/bzip support for sort, Paul Eggert, 2007/01/16
- Re: feature request: gzip/bzip support for sort, Dan Hipschman, 2007/01/20
- Re: feature request: gzip/bzip support for sort, James Youngman, 2007/01/21
- Re: feature request: gzip/bzip support for sort, Jim Meyering, 2007/01/21
- Re: feature request: gzip/bzip support for sort, Jim Meyering, 2007/01/21
- Re: feature request: gzip/bzip support for sort, Dan Hipschman, 2007/01/21
- Re: feature request: gzip/bzip support for sort, Jim Meyering, 2007/01/21
- Re: feature request: gzip/bzip support for sort, Dan Hipschman, 2007/01/21
- Re: feature request: gzip/bzip support for sort, Dan Hipschman, 2007/01/21
- Re: feature request: gzip/bzip support for sort, Bauke Jan Douma, 2007/01/22