bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: feature request: gzip/bzip support for sort


From: Dan Hipschman
Subject: Re: feature request: gzip/bzip support for sort
Date: Mon, 15 Jan 2007 17:53:12 -0800
User-agent: Mutt/1.5.9i

On Sat, Jan 13, 2007 at 10:07:59PM -0800, Paul Eggert wrote:
> 3.  I can see where the user might be able to specify a better
> algorithm, for a particular data set.  For that, how about if we have
> a --compress-program=PROGRAM option, which lets the user plug in any
> program that works as a pipeline?  E.g., --compress-program=gzip would
> use gzip.  The default would be to use "PROGRAM -d" to decompress; we
> could have another option if that doesn't suffice.

This is pretty close.  There is just a few more problems that I see:

        * It doesn't respond well to SIGPIPE.  It terminates without
        success, so at least the user knows something went wrong, if
        they check the status, but it doesn't print an error message.

        * It checks the exit status of decompression processes at the
        very end, so the user is notified (with an error message and
        non-successful exit code) if a decompression process fails, but
        after the output has been generated.  I don't think there's
        anything we can do about this in general, though.

        * If we can't fork/exec a decompression process, we're hosed,
        but if we can't fork/exec a compression process, a possible
        improvement might be to just proceed without compression (and
        maybe print a warning).  On the other hand, the user might want
        us to terminate (e.g., if they just spelled the name of the
        program wrong).

        * Like Paul said, we might want a separate decompress program,
        and the user might want to pass in options (e.g.,
        --compress-program="gzip -1").

I'm still testing, but so far it's looking pretty good.  If you think
this patch has a good chance of making it into coreutils, then I'll keep
working on it.  It does a good job of compression, so I think it will
solve (most of) the OP's problem, but it's not a speed booster on my
machine.  If someone with a multi-processor computer wants to test it
maybe we can get some good news on that front.

Here's the patch for comments.  Thanks,
Dan


Index: sort.c
===================================================================
RCS file: /sources/coreutils/coreutils/src/sort.c,v
retrieving revision 1.344
diff -p -u -r1.344 sort.c
--- sort.c      13 Dec 2006 21:27:05 -0000      1.344
+++ sort.c      16 Jan 2007 01:34:50 -0000
@@ -1,5 +1,5 @@
 /* sort - sort lines of text (with all kinds of options).
-   Copyright (C) 1988, 1991-2006 Free Software Foundation, Inc.
+   Copyright (C) 1988, 1991-2007 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -25,6 +25,7 @@
 
 #include <getopt.h>
 #include <sys/types.h>
+#include <sys/wait.h>
 #include <signal.h>
 #include "system.h"
 #include "error.h"
@@ -261,6 +262,9 @@ static bool have_read_stdin;
 /* List of key field comparisons to be tried.  */
 static struct keyfield *keylist;
 
+/* Program used to (de)compress temp files.  Must accept -d.  */
+static const char *compress_program;
+
 static void sortlines_temp (struct line *, size_t, struct line *);
 
 /* Report MESSAGE for FILE, then clean up and exit.
@@ -328,6 +332,9 @@ Other options:\n\
                               multiple options specify multiple directories\n\
   -u, --unique              with -c, check for strict ordering;\n\
                               without -c, output only the first of an equal 
run\n\
+      --compress-progam=PROG  (de)compress temporaries with PROG, which must\n\
+                              (de)compress stdin to stdout, and accept the 
-d\n\
+                              command line option for decompressing\n\
 "), DEFAULT_TMPDIR);
       fputs (_("\
   -z, --zero-terminated     end lines with 0 byte, not newline\n\
@@ -364,7 +371,8 @@ native byte values.\n\
    non-character as a pseudo short option, starting with CHAR_MAX + 1.  */
 enum
 {
-  RANDOM_SOURCE_OPTION = CHAR_MAX + 1
+  RANDOM_SOURCE_OPTION = CHAR_MAX + 1,
+  COMPRESS_PROGRAM
 };
 
 static char const short_options[] = "-bcdfgik:mMno:rRsS:t:T:uy:z";
@@ -373,6 +381,7 @@ static struct option const long_options[
 {
   {"ignore-leading-blanks", no_argument, NULL, 'b'},
   {"check", no_argument, NULL, 'c'},
+  {"compress-program", required_argument, NULL, COMPRESS_PROGRAM},
   {"dictionary-order", no_argument, NULL, 'd'},
   {"ignore-case", no_argument, NULL, 'f'},
   {"general-numeric-sort", no_argument, NULL, 'g'},
@@ -403,6 +412,7 @@ static sigset_t caught_signals;
 struct tempnode
 {
   struct tempnode *volatile next;
+  pid_t pid;     /* If compressed, the pid of compressor, else zero */
   char name[1];  /* Actual size is 1 + file name length.  */
 };
 static struct tempnode *volatile temphead;
@@ -422,8 +432,8 @@ cleanup (void)
 /* Create a new temporary file, returning its newly allocated name.
    Store into *PFP a stream open for writing.  */
 
-static char *
-create_temp_file (FILE **pfp)
+static struct tempnode *
+create_temp_file (int *pfd)
 {
   static char const slashbase[] = "/sortXXXXXX";
   static size_t temp_dir_index;
@@ -439,6 +449,7 @@ create_temp_file (FILE **pfp)
   memcpy (file, temp_dir, len);
   memcpy (file + len, slashbase, sizeof slashbase);
   node->next = NULL;
+  node->pid = 0;
   if (++temp_dir_index == temp_dir_count)
     temp_dir_index = 0;
 
@@ -454,10 +465,11 @@ create_temp_file (FILE **pfp)
   sigprocmask (SIG_SETMASK, &oldset, NULL);
   errno = saved_errno;
 
-  if (fd < 0 || (*pfp = fdopen (fd, "w")) == NULL)
+  if (fd < 0)
     die (_("cannot create temporary file"), file);
 
-  return file;
+  *pfd = fd;
+  return node;
 }
 
 /* Return a stream for FILE, opened with mode HOW.  A null FILE means
@@ -515,6 +527,169 @@ xfclose (FILE *fp, char const *file)
 }
 
 static void
+close_or_die (int fd, const char *name)
+{
+  if (close (fd) < 0)
+    die (_("close failed"), name);
+}
+
+static void
+dup2_or_die (int oldfd, int newfd)
+{
+  if (dup2 (oldfd, newfd) < 0)
+    error (SORT_FAILURE, errno, _("dup2 failed"));
+}
+
+/* Fork a child process for piping to and do common cleanup */
+
+static pid_t
+pipe_fork (int pipefds[2])
+{
+  struct tempnode *saved_temphead;
+  sigset_t oldset;
+  pid_t pid;
+
+  if (pipe (pipefds) < 0)
+    error (SORT_FAILURE, errno, _("couldn't create pipe"));
+
+  /* This is so the child process won't delete our temp files
+     if it receives a signal before exec-ing.  */
+  sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+  saved_temphead = temphead;
+  temphead = NULL;
+
+  if ((pid = fork ()) < 0)
+    error (SORT_FAILURE, errno, _("couldn't fork"));
+  else if (0 < pid)
+    {
+      temphead = saved_temphead;
+      sigprocmask (SIG_SETMASK, &oldset, NULL);
+    }
+  else
+    {
+      sigprocmask (SIG_SETMASK, &oldset, NULL);
+
+      close_or_die (STDIN_FILENO, _("standard input"));
+      close_or_die (STDOUT_FILENO, _("standard output"));
+    }
+
+  return pid;
+}
+
+/* Creates a temp file and compression program to filter output to it.  */
+
+static char *
+create_temp (FILE **pfp, pid_t *ppid)
+{
+  int tempfd;
+  struct tempnode *node = create_temp_file (&tempfd);
+  char *name = node->name;
+
+  if (compress_program)
+    {
+      int pipefds[2];
+
+      if ((node->pid = pipe_fork (pipefds)))
+       {
+         close_or_die (tempfd, name);
+         close_or_die (pipefds[0], _("input end of pipe"));
+
+         tempfd = pipefds[1];
+       }
+      else
+       {
+         close_or_die (pipefds[1], _("output end of pipe"));
+         dup2_or_die (tempfd, STDOUT_FILENO);
+         close_or_die (tempfd, name);
+         dup2_or_die (pipefds[0], STDIN_FILENO);
+         close_or_die (pipefds[0], _("input end of pipe"));
+
+         if (execlp (compress_program, compress_program,
+                     (char *) NULL) < 0)
+           error (SORT_FAILURE, errno, _("couldn't execute %s"),
+                  compress_program);
+       }
+    }
+
+  if ((*pfp = fdopen (tempfd, "w")) == NULL)
+    die (_("couldn't create temp"), name);
+
+  if (ppid)
+    *ppid = node->pid;
+
+  return name;
+}
+
+static bool
+reap (pid_t pid)
+{
+  int status;
+  pid_t cpid;
+
+  if ((cpid = waitpid (pid, &status, 0)) < 0
+      && (0 <= pid || errno != ECHILD))
+    error (SORT_FAILURE, errno, _("waiting for %s"),
+           compress_program);
+
+  if (0 < cpid)
+    {
+      if (! WIFEXITED (status) || WEXITSTATUS (status))
+        error (SORT_FAILURE, 0, _("%s terminated abnormally"),
+               compress_program);
+      return true;
+    }
+
+  return false;
+}
+
+/* Open a temp file for decompression and reading.  */
+
+static FILE *
+open_temp (const char *name, pid_t pid)
+{
+  int tempfd;
+  FILE *fp;
+
+  if (pid)
+    reap (pid);
+
+  tempfd = open (name, O_RDONLY);
+  if (tempfd < 0)
+    die (_("couldn't open temp"), name);
+
+  if (compress_program)
+    {
+      int pipefds[2];
+
+      if (pipe_fork (pipefds))
+       {
+         close_or_die (tempfd, name);
+         close_or_die (pipefds[1], _("output end of pipe"));
+
+         tempfd = pipefds[0];
+       }
+      else
+       {
+         close_or_die (pipefds[0], _("input end of pipe"));
+         dup2_or_die (tempfd, STDIN_FILENO);
+         close_or_die (tempfd, name);
+         dup2_or_die (pipefds[1], STDOUT_FILENO);
+         close_or_die (pipefds[1], _("output end of pipe"));
+
+         if (execlp (compress_program, compress_program,
+                     "-d", (char *) NULL) < 0)
+           error (SORT_FAILURE, errno, _("couldn't execute %s -d"),
+                  compress_program);
+       }
+    }
+
+  if ((fp = fdopen (tempfd, "r")) == NULL)
+    die (_("couldn't create temp"), name);
+
+  return fp;
+}
+
+static void
 write_bytes (const char *buf, size_t n_bytes, FILE *fp, const char 
*output_file)
 {
   if (fwrite (buf, 1, n_bytes, fp) != n_bytes)
@@ -1586,7 +1761,7 @@ check (char const *file_name)
    file has not been opened yet (or written to, if standard output).  */
 
 static void
-mergefps (char **files, size_t ntemps, size_t nfiles,
+mergefps (char **files, pid_t *pids, size_t ntemps, size_t nfiles,
          FILE *ofp, char const *output_file)
 {
   FILE *fps[NMERGE];           /* Input streams for each file.  */
@@ -1609,7 +1784,8 @@ mergefps (char **files, size_t ntemps, s
   /* Read initial lines from each input file. */
   for (i = 0; i < nfiles; )
     {
-      fps[i] = xfopen (files[i], "r");
+      fps[i] =
+       pids[i] ? open_temp (files[i], pids[i]) : xfopen (files[i], "r");
       initbuf (&buffer[i], sizeof (struct line),
               MAX (merge_buffer_size, sort_size / nfiles));
       if (fillbuf (&buffer[i], fps[i], files[i]))
@@ -1631,7 +1807,10 @@ mergefps (char **files, size_t ntemps, s
          free (buffer[i].buf);
          --nfiles;
          for (j = i; j < nfiles; ++j)
-           files[j] = files[j + 1];
+           {
+             files[j] = files[j + 1];
+             pids[j] = pids[j + 1];
+           }
        }
     }
 
@@ -1719,6 +1898,7 @@ mergefps (char **files, size_t ntemps, s
                {
                  fps[i] = fps[i + 1];
                  files[i] = files[i + 1];
+                 pids[i] = pids[i + 1];
                  buffer[i] = buffer[i + 1];
                  cur[i] = cur[i + 1];
                  base[i] = base[i + 1];
@@ -1893,8 +2073,8 @@ sortlines_temp (struct line *lines, size
    common cases.  */
 
 static size_t
-avoid_trashing_input (char **files, size_t ntemps, size_t nfiles,
-                     char const *outfile)
+avoid_trashing_input (char **files, pid_t *pids, size_t ntemps,
+                     size_t nfiles, char const *outfile)
 {
   size_t i;
   bool got_outstat = false;
@@ -1930,9 +2110,11 @@ avoid_trashing_input (char **files, size
       if (same)
        {
          FILE *tftp;
-         char *temp = create_temp_file (&tftp);
-         mergefps (&files[i], 0, nfiles - i, tftp, temp);
+         pid_t pid;
+         char *temp = create_temp (&tftp, &pid);
+         mergefps (&files[i], &pids[i], 0, nfiles - i, tftp, temp);
          files[i] = temp;
+         pids[i] = pid;
          return i + 1;
        }
     }
@@ -1946,7 +2128,8 @@ avoid_trashing_input (char **files, size
    OUTPUT_FILE; a null OUTPUT_FILE stands for standard output.  */
 
 static void
-merge (char **files, size_t ntemps, size_t nfiles, char const *output_file)
+merge (char **files, pid_t *pids, size_t ntemps, size_t nfiles,
+       char const *output_file)
 {
   while (NMERGE < nfiles)
     {
@@ -1967,10 +2150,12 @@ merge (char **files, size_t ntemps, size
       for (out = in = 0; out < nfiles / NMERGE; out++, in += NMERGE)
        {
          FILE *tfp;
-         char *temp = create_temp_file (&tfp);
+         pid_t pid;
+         char *temp = create_temp (&tfp, &pid);
          size_t nt = MIN (ntemps, NMERGE);
          ntemps -= nt;
-         mergefps (&files[in], nt, NMERGE, tfp, temp);
+         mergefps (&files[in], &pids[in], nt, NMERGE, tfp, temp);
+         pids[out] = pid;
          files[out] = temp;
        }
 
@@ -1984,10 +2169,12 @@ merge (char **files, size_t ntemps, size
             files as possible, to avoid needless I/O.  */
          size_t nshortmerge = remainder - cheap_slots + 1;
          FILE *tfp;
-         char *temp = create_temp_file (&tfp);
+         pid_t pid;
+         char *temp = create_temp (&tfp, &pid);
          size_t nt = MIN (ntemps, nshortmerge);
          ntemps -= nt;
-         mergefps (&files[in], nt, nshortmerge, tfp, temp);
+         mergefps (&files[in], &pids[in], nt, nshortmerge, tfp, temp);
+         pids[out] = pid;
          files[out++] = temp;
          in += nshortmerge;
        }
@@ -1995,12 +2182,13 @@ merge (char **files, size_t ntemps, size
       /* Put the remaining input files into the last NMERGE-sized output
         window, so they will be merged in the next pass.  */
       memmove(&files[out], &files[in], (nfiles - in) * sizeof *files);
+      memmove(&pids[out], &pids[in], (nfiles - in) * sizeof *pids);
       ntemps += out;
       nfiles -= in - out;
     }
 
-  nfiles = avoid_trashing_input (files, ntemps, nfiles, output_file);
-  mergefps (files, ntemps, nfiles, NULL, output_file);
+  nfiles = avoid_trashing_input (files, pids, ntemps, nfiles, output_file);
+  mergefps (files, pids, ntemps, nfiles, NULL, output_file);
 }
 
 /* Sort NFILES FILES onto OUTPUT_FILE. */
@@ -2060,7 +2248,7 @@ sort (char * const *files, size_t nfiles
          else
            {
              ++ntemps;
-             temp_output = create_temp_file (&tfp);
+             temp_output = create_temp (&tfp, NULL);
            }
 
          do
@@ -2088,13 +2276,16 @@ sort (char * const *files, size_t nfiles
     {
       size_t i;
       struct tempnode *node = temphead;
-      char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles);
+      pid_t *pids;
+      char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles + sizeof *pids);
+      pids = (pid_t *) (tempfiles + ntemps);
       for (i = 0; node; i++)
        {
          tempfiles[i] = node->name;
+         pids[i] = node->pid;
          node = node->next;
        }
-      merge (tempfiles, ntemps, ntemps, output_file);
+      merge (tempfiles, pids, ntemps, ntemps, output_file);
       free (tempfiles);
     }
 }
@@ -2463,6 +2654,10 @@ main (int argc, char **argv)
          checkonly = true;
          break;
 
+       case COMPRESS_PROGRAM:
+         compress_program = optarg;
+         break;
+
        case 'k':
          key = key_init (&key_buf);
 
@@ -2679,10 +2874,17 @@ main (int argc, char **argv)
     }
 
   if (mergeonly)
-    merge (files, 0, nfiles, outfile);
+    {
+      pid_t *pids = xnmalloc (nfiles, sizeof *pids);
+      memset (pids, 0, nfiles * sizeof *pids);
+      merge (files, pids, 0, nfiles, outfile);
+    }
   else
     sort (files, nfiles, outfile);
 
+  while (reap (-1))
+    continue;
+
   if (have_read_stdin && fclose (stdin) == EOF)
     die (_("close failed"), "-");
 





reply via email to

[Prev in Thread] Current Thread [Next in Thread]