bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: feature request: gzip/bzip support for sort


From: Dan Hipschman
Subject: Re: feature request: gzip/bzip support for sort
Date: Fri, 19 Jan 2007 22:33:05 -0800
User-agent: Mutt/1.5.9i

Hi,

I think this patch addresses everything Paul mentioned in his critique
of my last attempt.  I did look at gnulib pipe module, but there were
some problems with using it "out of the box".  First, it takes a
filename as the stdout of the child process, when for temp files it's
better to pass a file descriptor.  That's a rather small issue, though,
and would be pretty easy to add an additional function that fixed that.
Another problem is that since it does the fork/exec in one call, the
race condition where the child and parent both receive signals between
the fork and child's exec can't be solved the way I solve it in this
patch.  Rather than change the way sort handles signals, I pleaded with
Paul (well, I asked him nicely :-) to be able to put off the portability
issues of fork until I had more time.  Hence, I've just added a check
for fork() in configure.ac and any system that doesn't have it will have
to wait a while for this feature.

I also tried to address Jim's concern about fork() failing.  I clean up
zombie children every so often in the main loops of sort() and
mergefps() so as not to bleed the system dry.  This seems to keep the
number of defunct and running child processes close to a minimum.  If we
can't fork, I retry some number of times, sleeping a bit in between and
killing off zombies (sounds like a good time :-).  I was able to test
the case where fork() fails with a macro like this:

    unsigned int foo;
    #define fork() (foo++ % 3 ? fork() : (errno = EAGAIN, -1))

I'm not sure how to test the case where we fork a compression process,
it exits, we reap it, and we create another process with the same PID
before we run the decompression process on the file it created.  I did
address this issue, and of course I *think* my solution is correct, but
like I said, I'm not sure how to test it.

Anyway, here's the patch.


Index: NEWS
===================================================================
RCS file: /sources/coreutils/coreutils/NEWS,v
retrieving revision 1.467
diff -p -u -r1.467 NEWS
--- NEWS        18 Jan 2007 09:18:21 -0000      1.467
+++ NEWS        20 Jan 2007 05:49:27 -0000
@@ -29,6 +29,11 @@ GNU coreutils NEWS                      
 
   "rm --interactive=never F" no longer prompts for an unwritable F
 
+** New features
+
+  sort can now compresses temporary files to improve performance of
+  very large sorts.
+
 
 * Noteworthy changes in release 6.7 (2006-12-08) [stable]
 
Index: configure.ac
===================================================================
RCS file: /sources/coreutils/coreutils/configure.ac,v
retrieving revision 1.102
diff -p -u -r1.102 configure.ac
--- configure.ac        28 Dec 2006 08:18:17 -0000      1.102
+++ configure.ac        20 Jan 2007 05:49:27 -0000
@@ -39,6 +39,8 @@ gl_EARLY
 gl_INIT
 coreutils_MACROS
 
+AC_FUNC_FORK
+
 AC_CHECK_FUNCS(uname,
        OPTIONAL_BIN_PROGS="$OPTIONAL_BIN_PROGS uname\$(EXEEXT)"
        MAN="$MAN uname.1")
Index: doc/coreutils.texi
===================================================================
RCS file: /sources/coreutils/coreutils/doc/coreutils.texi,v
retrieving revision 1.363
diff -p -u -r1.363 coreutils.texi
--- doc/coreutils.texi  4 Jan 2007 11:04:46 -0000       1.363
+++ doc/coreutils.texi  20 Jan 2007 05:49:27 -0000
@@ -3411,6 +3411,19 @@ value as the directory for temporary fil
 @option{--temporary-directory} (@option{-T}) option in turn overrides
 the environment variable.
 
address@hidden GNUSORT_COMPRESSOR
+To improve performance when sorting very large files, GNU sort will,
+by default, try to compress temporary files with the program
address@hidden/bin/gzip}.  The environment variable @env{GNUSORT_COMPRESSOR}
+can be set to the name of another program to be used.  The program
+specified must compress standard input to standard output when no
+arguments are given to it, and it must decompress standard input to
+standard output when the @option{-d} argument is given to it.  To
+disable compression of temporary files, set the variable to the empty
+string.  Whitespace characters, and the backslash character are
+reserved for future use in the program name.  If the program exits
+with nonzero status, sort will terminate with an error.
+
 
 The following options affect the ordering of output lines.  They may be
 specified globally or as part of a specific key field.  If no key
Index: src/sort.c
===================================================================
RCS file: /sources/coreutils/coreutils/src/sort.c,v
retrieving revision 1.345
diff -p -u -r1.345 sort.c
--- src/sort.c  19 Jan 2007 22:28:31 -0000      1.345
+++ src/sort.c  20 Jan 2007 05:49:29 -0000
@@ -1,5 +1,5 @@
 /* sort - sort lines of text (with all kinds of options).
-   Copyright (C) 1988, 1991-2006 Free Software Foundation, Inc.
+   Copyright (C) 1988, 1991-2007 Free Software Foundation, Inc.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -25,6 +25,7 @@
 
 #include <getopt.h>
 #include <sys/types.h>
+#include <sys/wait.h>
 #include <signal.h>
 #include "system.h"
 #include "error.h"
@@ -261,6 +262,9 @@ static bool have_read_stdin;
 /* List of key field comparisons to be tried.  */
 static struct keyfield *keylist;
 
+/* Program used to (de)compress temp files.  Must accept -d.  */
+static const char *compress_program;
+
 static void sortlines_temp (struct line *, size_t, struct line *);
 
 /* Report MESSAGE for FILE, then clean up and exit.
@@ -403,11 +407,146 @@ static sigset_t caught_signals;
 struct tempnode
 {
   struct tempnode *volatile next;
+  pid_t pid;     /* If compressed, the pid of compressor, else zero */
   char name[1];  /* Actual size is 1 + file name length.  */
 };
 static struct tempnode *volatile temphead;
 static struct tempnode *volatile *temptail = &temphead;
 
+struct sortfile
+{
+  char *name;
+  pid_t pid;     /* If compressed, the pid of compressor, else zero */
+};
+
+enum procstate { ALIVE, ZOMBIE };
+
+/* An entry in the compression process table.  The COUNT field is there
+   in case we fork a new compression process that has the same PID as an
+   old zombie process that is still in the table (because the process to
+   decompress the temp file it was associated with hasn't started yet).  */
+struct procnode
+{
+  struct procnode *next;
+  pid_t pid;
+  enum procstate state;
+  size_t count;
+};
+
+#define PROCTABSZ 29
+#define HASH_PID(pid) ((unsigned int) (pid) % PROCTABSZ)
+
+/* A table where we store compression process states.  We clean up all
+   processes in a timely manner so as not to exhaust system resources,
+   so we store the info on whether the process is still running, or has
+   been reaped here.  */
+struct procnode *proctab[PROCTABSZ];
+
+/* The total number of forked processes (compressors and decompressors)
+   that have not been reaped yet. */
+size_t nprocs;
+
+/* The number of child processes we'll allow before we try to reap them. */
+#define PROC_THRESH 2
+
+static pid_t
+reap (pid_t pid)
+{
+  int status;
+  pid_t cpid = waitpid (pid, &status, pid < 0 ? WNOHANG : 0);
+
+  if (cpid < 0)
+    error (SORT_FAILURE, errno, _("waiting for %s [-d]"),
+           compress_program);
+  else if (0 < cpid)
+    {
+      if (! WIFEXITED (status) || WEXITSTATUS (status))
+       error (SORT_FAILURE, 0, _("%s [-d] terminated abnormally"),
+              compress_program);
+      --nprocs;
+    }
+
+  return cpid;
+}
+
+static void
+register_proc (pid_t pid)
+{
+  unsigned int bucket = HASH_PID (pid);
+  struct procnode *node = proctab[bucket];
+
+  for ( ; node; node = node->next)
+    if (pid == node->pid)
+      {
+       node->state = ALIVE;
+       ++node->count;
+       return;
+      }
+
+  node = xmalloc (sizeof *node);
+  node->pid = pid;
+  node->state = ALIVE;
+  node->count = 1;
+  node->next = proctab[bucket];
+  proctab[bucket] = node;
+}
+
+/* This is called when we reap a random process.  We don't know
+   whether we have reaped a compression process or a decompression
+   process until we look in the table.  If there's an ALIVE entry for
+   it, then we have reaped a compression process, so change the state
+   to ZOMBIE.  Otherwise, it's a decompression processes, so ignore it.  */
+
+static void
+update_proc (pid_t pid)
+{
+  unsigned int bucket = HASH_PID (pid);
+  struct procnode *node = proctab[bucket];
+
+  for ( ; node; node = node->next)
+    if (pid == node->pid)
+      {
+       node->state = ZOMBIE;
+       return;
+      }
+}
+
+/* This is for when we need to wait for a compression process to exit.
+   If it has a ZOMBIE entry in the table then it's already dead and has
+   been reaped.  Note that if there's an ALIVE entry for it, it still may
+   already have died and been reaped if a second process was created with
+   the same PID.  This is probably exceedingly rare, but to be on the safe
+   side we will have to wait for any compression process with this PID.  */
+
+static void
+wait_proc (pid_t pid)
+{
+  unsigned int bucket = HASH_PID (pid);
+  struct procnode *node, **pnode = &proctab[bucket];
+
+  for ( ; (node = *pnode)->pid != pid; pnode = &node->next)
+    continue;
+
+  if (node->state == ALIVE)
+    reap (pid);
+
+  node->state = ZOMBIE;
+  if (! --node->count)
+    {
+      *pnode = node->next;
+      free (node);
+    }
+}
+
+static void
+reap_some (void)
+{
+  pid_t pid;
+
+  while (0 < nprocs && (pid = reap (-1)))
+    update_proc (pid);
+}
+
 /* Clean up any remaining temporary files.  */
 
 static void
@@ -441,8 +580,8 @@ exit_cleanup (void)
 /* Create a new temporary file, returning its newly allocated name.
    Store into *PFP a stream open for writing.  */
 
-static char *
-create_temp_file (FILE **pfp)
+static struct tempnode *
+create_temp_file (int *pfd)
 {
   static char const slashbase[] = "/sortXXXXXX";
   static size_t temp_dir_index;
@@ -458,6 +597,7 @@ create_temp_file (FILE **pfp)
   memcpy (file, temp_dir, len);
   memcpy (file + len, slashbase, sizeof slashbase);
   node->next = NULL;
+  node->pid = 0;
   if (++temp_dir_index == temp_dir_count)
     temp_dir_index = 0;
 
@@ -473,10 +613,11 @@ create_temp_file (FILE **pfp)
   sigprocmask (SIG_SETMASK, &oldset, NULL);
   errno = saved_errno;
 
-  if (fd < 0 || (*pfp = fdopen (fd, "w")) == NULL)
+  if (fd < 0)
     die (_("cannot create temporary file"), file);
 
-  return file;
+  *pfd = fd;
+  return node;
 }
 
 /* Return a stream for FILE, opened with mode HOW.  A null FILE means
@@ -534,6 +675,174 @@ xfclose (FILE *fp, char const *file)
 }
 
 static void
+dup2_or_die (int oldfd, int newfd)
+{
+  if (dup2 (oldfd, newfd) < 0)
+    error (SORT_FAILURE, errno, _("dup2 failed"));
+}
+
+/* Fork a child process for piping to and do common cleanup.  The
+   TRIES parameter tells us how many times to try to fork before
+   giving up.  Return the PID of the child or -1 if fork failed.  */
+
+static pid_t
+pipe_fork (int pipefds[2], size_t tries)
+{
+#if HAVE_WORKING_FORK
+  struct tempnode *saved_temphead;
+  sigset_t oldset;
+  int saved_errno;
+  unsigned int wait_retry = 1;
+  pid_t pid;
+
+  if (pipe (pipefds) < 0)
+    return -1;
+
+  while (tries--)
+    {
+      /* This is so the child process won't delete our temp files
+        if it receives a signal before exec-ing.  */
+      sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+      saved_temphead = temphead;
+      temphead = NULL;
+
+      pid = fork ();
+      saved_errno = errno;
+      if (pid)
+       temphead = saved_temphead;
+
+      sigprocmask (SIG_SETMASK, &oldset, NULL);
+      errno = saved_errno;
+
+      if (0 <= pid || errno != EAGAIN)
+       break;
+      else
+       {
+         sleep (wait_retry);
+         wait_retry *= 2;
+         reap_some ();
+       }
+    }
+
+  if (pid < 0)
+    {
+      close (pipefds[0]);
+      close (pipefds[1]);
+    }
+  else if (pid == 0)
+    {
+      close (STDIN_FILENO);
+      close (STDOUT_FILENO);
+    }
+  else
+    ++nprocs;
+
+  return pid;
+
+#else  /* ! HAVE_WORKING_FORK */
+  return -1;
+#endif
+}
+
+/* Creates a temp file and compression program to filter output to it.  */
+
+static char *
+create_temp (FILE **pfp, pid_t *ppid)
+{
+  int tempfd;
+  struct tempnode *node = create_temp_file (&tempfd);
+  char *name = node->name;
+
+  if (compress_program)
+    {
+      int pipefds[2];
+
+      node->pid = pipe_fork (pipefds, 2);
+      if (0 < node->pid)
+       {
+         close (tempfd);
+         close (pipefds[0]);
+         tempfd = pipefds[1];
+
+         register_proc (node->pid);
+       }
+      else if (node->pid == 0)
+       {
+         close (pipefds[1]);
+         dup2_or_die (tempfd, STDOUT_FILENO);
+         close (tempfd);
+         dup2_or_die (pipefds[0], STDIN_FILENO);
+         close (pipefds[0]);
+
+         if (execlp (compress_program, compress_program,
+                     (char *) NULL) < 0)
+           error (SORT_FAILURE, errno, _("couldn't execute %s"),
+                  compress_program);
+       }
+      else
+       node->pid = 0;
+    }
+
+  if ((*pfp = fdopen (tempfd, "w")) == NULL)
+    die (_("couldn't create temporary file"), name);
+
+  if (ppid)
+    *ppid = node->pid;
+
+  return name;
+}
+
+/* Open a temp file for decompression and reading.  PID is the ID of the
+   process used to compress this file.  */
+
+static FILE *
+open_temp (const char *name, pid_t pid)
+{
+  int tempfd;
+  FILE *fp;
+
+  wait_proc (pid);
+
+  tempfd = open (name, O_RDONLY);
+  if (tempfd < 0)
+    die (_("couldn't open temporary file"), name);
+
+  if (compress_program)
+    {
+      int pipefds[2];
+      pid_t child_pid = pipe_fork (pipefds, 8);
+
+      if (0 < child_pid)
+       {
+         close (tempfd);
+         close (pipefds[1]);
+         tempfd = pipefds[0];
+       }
+      else if (child_pid == 0)
+       {
+         close (pipefds[0]);
+         dup2_or_die (tempfd, STDIN_FILENO);
+         close (tempfd);
+         dup2_or_die (pipefds[1], STDOUT_FILENO);
+         close (pipefds[1]);
+
+         if (execlp (compress_program, compress_program,
+                     "-d", (char *) NULL) < 0)
+           error (SORT_FAILURE, errno, _("couldn't execute %s -d"),
+                  compress_program);
+       }
+      else
+       error (SORT_FAILURE, errno, _("couldn't create process for %s -d"),
+              compress_program);
+    }
+
+  if ((fp = fdopen (tempfd, "r")) == NULL)
+    die (_("couldn't create temporary file"), name);
+
+  return fp;
+}
+
+static void
 write_bytes (const char *buf, size_t n_bytes, FILE *fp, const char 
*output_file)
 {
   if (fwrite (buf, 1, n_bytes, fp) != n_bytes)
@@ -1605,7 +1914,7 @@ check (char const *file_name)
    file has not been opened yet (or written to, if standard output).  */
 
 static void
-mergefps (char **files, size_t ntemps, size_t nfiles,
+mergefps (struct sortfile *files, size_t ntemps, size_t nfiles,
          FILE *ofp, char const *output_file)
 {
   FILE *fps[NMERGE];           /* Input streams for each file.  */
@@ -1628,10 +1937,12 @@ mergefps (char **files, size_t ntemps, s
   /* Read initial lines from each input file. */
   for (i = 0; i < nfiles; )
     {
-      fps[i] = xfopen (files[i], "r");
+      fps[i] = (files[i].pid
+               ? open_temp (files[i].name, files[i].pid)
+               : xfopen (files[i].name, "r"));
       initbuf (&buffer[i], sizeof (struct line),
               MAX (merge_buffer_size, sort_size / nfiles));
-      if (fillbuf (&buffer[i], fps[i], files[i]))
+      if (fillbuf (&buffer[i], fps[i], files[i].name))
        {
          struct line const *linelim = buffer_linelim (&buffer[i]);
          cur[i] = linelim - 1;
@@ -1641,11 +1952,11 @@ mergefps (char **files, size_t ntemps, s
       else
        {
          /* fps[i] is empty; eliminate it from future consideration.  */
-         xfclose (fps[i], files[i]);
+         xfclose (fps[i], files[i].name);
          if (i < ntemps)
            {
              ntemps--;
-             zaptemp (files[i]);
+             zaptemp (files[i].name);
            }
          free (buffer[i].buf);
          --nfiles;
@@ -1714,7 +2025,7 @@ mergefps (char **files, size_t ntemps, s
        cur[ord[0]] = smallest - 1;
       else
        {
-         if (fillbuf (&buffer[ord[0]], fps[ord[0]], files[ord[0]]))
+         if (fillbuf (&buffer[ord[0]], fps[ord[0]], files[ord[0]].name))
            {
              struct line const *linelim = buffer_linelim (&buffer[ord[0]]);
              cur[ord[0]] = linelim - 1;
@@ -1727,11 +2038,11 @@ mergefps (char **files, size_t ntemps, s
                if (ord[i] > ord[0])
                  --ord[i];
              --nfiles;
-             xfclose (fps[ord[0]], files[ord[0]]);
+             xfclose (fps[ord[0]], files[ord[0]].name);
              if (ord[0] < ntemps)
                {
                  ntemps--;
-                 zaptemp (files[ord[0]]);
+                 zaptemp (files[ord[0]].name);
                }
              free (buffer[ord[0]].buf);
              for (i = ord[0]; i < nfiles; ++i)
@@ -1774,6 +2085,10 @@ mergefps (char **files, size_t ntemps, s
          ord[j] = ord[j + 1];
        ord[count_of_smaller_lines] = ord0;
       }
+
+      /* Free up some resources every once in a while.  */
+      if (PROC_THRESH < nprocs)
+       reap_some ();
     }
 
   if (unique && savedline)
@@ -1912,8 +2227,8 @@ sortlines_temp (struct line *lines, size
    common cases.  */
 
 static size_t
-avoid_trashing_input (char **files, size_t ntemps, size_t nfiles,
-                     char const *outfile)
+avoid_trashing_input (struct sortfile *files, size_t ntemps,
+                     size_t nfiles, char const *outfile)
 {
   size_t i;
   bool got_outstat = false;
@@ -1921,11 +2236,11 @@ avoid_trashing_input (char **files, size
 
   for (i = ntemps; i < nfiles; i++)
     {
-      bool is_stdin = STREQ (files[i], "-");
+      bool is_stdin = STREQ (files[i].name, "-");
       bool same;
       struct stat instat;
 
-      if (outfile && STREQ (outfile, files[i]) && !is_stdin)
+      if (outfile && STREQ (outfile, files[i].name) && !is_stdin)
        same = true;
       else
        {
@@ -1941,7 +2256,7 @@ avoid_trashing_input (char **files, size
 
          same = (((is_stdin
                    ? fstat (STDIN_FILENO, &instat)
-                   : stat (files[i], &instat))
+                   : stat (files[i].name, &instat))
                   == 0)
                  && SAME_INODE (instat, outstat));
        }
@@ -1949,9 +2264,11 @@ avoid_trashing_input (char **files, size
       if (same)
        {
          FILE *tftp;
-         char *temp = create_temp_file (&tftp);
-         mergefps (&files[i], 0, nfiles - i, tftp, temp);
-         files[i] = temp;
+         pid_t pid;
+         char *temp = create_temp (&tftp, &pid);
+         mergefps (&files[i],0, nfiles - i, tftp, temp);
+         files[i].name = temp;
+         files[i].pid = pid;
          return i + 1;
        }
     }
@@ -1965,7 +2282,8 @@ avoid_trashing_input (char **files, size
    OUTPUT_FILE; a null OUTPUT_FILE stands for standard output.  */
 
 static void
-merge (char **files, size_t ntemps, size_t nfiles, char const *output_file)
+merge (struct sortfile *files, size_t ntemps, size_t nfiles,
+       char const *output_file)
 {
   while (NMERGE < nfiles)
     {
@@ -1986,11 +2304,13 @@ merge (char **files, size_t ntemps, size
       for (out = in = 0; out < nfiles / NMERGE; out++, in += NMERGE)
        {
          FILE *tfp;
-         char *temp = create_temp_file (&tfp);
+         pid_t pid;
+         char *temp = create_temp (&tfp, &pid);
          size_t nt = MIN (ntemps, NMERGE);
          ntemps -= nt;
          mergefps (&files[in], nt, NMERGE, tfp, temp);
-         files[out] = temp;
+         files[out].name = temp;
+         files[out].pid = pid;
        }
 
       remainder = nfiles - in;
@@ -2003,11 +2323,13 @@ merge (char **files, size_t ntemps, size
             files as possible, to avoid needless I/O.  */
          size_t nshortmerge = remainder - cheap_slots + 1;
          FILE *tfp;
-         char *temp = create_temp_file (&tfp);
+         pid_t pid;
+         char *temp = create_temp (&tfp, &pid);
          size_t nt = MIN (ntemps, nshortmerge);
          ntemps -= nt;
          mergefps (&files[in], nt, nshortmerge, tfp, temp);
-         files[out++] = temp;
+         files[out].name = temp;
+         files[out++].pid = pid;
          in += nshortmerge;
        }
 
@@ -2079,7 +2401,7 @@ sort (char * const *files, size_t nfiles
          else
            {
              ++ntemps;
-             temp_output = create_temp_file (&tfp);
+             temp_output = create_temp (&tfp, NULL);
            }
 
          do
@@ -2094,6 +2416,10 @@ sort (char * const *files, size_t nfiles
 
          xfclose (tfp, temp_output);
 
+         /* Free up some resources every once in a while.  */
+         if (PROC_THRESH < nprocs)
+           reap_some ();
+
          if (output_file_created)
            goto finish;
        }
@@ -2107,10 +2433,11 @@ sort (char * const *files, size_t nfiles
     {
       size_t i;
       struct tempnode *node = temphead;
-      char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles);
+      struct sortfile *tempfiles = xnmalloc (ntemps, sizeof *tempfiles);
       for (i = 0; node; i++)
        {
-         tempfiles[i] = node->name;
+         tempfiles[i].name = node->name;
+         tempfiles[i].pid = node->pid;
          node = node->next;
        }
       merge (tempfiles, ntemps, ntemps, output_file);
@@ -2683,6 +3010,23 @@ main (int argc, char **argv)
       files = &minus;
     }
 
+  compress_program = getenv ("GNUSORT_COMPRESSOR");
+  if (! compress_program)
+    {
+      static const char *compressors[] = { "/bin/gzip" };
+      enum { ncompressors = sizeof compressors / sizeof compressors[0] };
+      size_t i;
+
+      for (i = 0; i < ncompressors; ++i)
+       if (access (compressors[i], X_OK) == 0)
+         {
+           compress_program = compressors[i];
+           break;
+         }
+    }
+  else if (compress_program[0] == '\0')
+    compress_program = NULL;
+
   if (checkonly)
     {
       if (nfiles > 1)
@@ -2698,7 +3042,15 @@ main (int argc, char **argv)
     }
 
   if (mergeonly)
-    merge (files, 0, nfiles, outfile);
+    {
+      struct sortfile *sortfiles = xcalloc (nfiles, sizeof *sortfiles);
+      size_t i;
+
+      for (i = 0; i < nfiles; ++i)
+       sortfiles[i].name = files[i];
+
+      merge (sortfiles, 0, nfiles, outfile);
+    }
   else
     sort (files, nfiles, outfile);
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]