bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH] Change 'sort' to handle fd exhaustion better.


From: Pádraig Brady
Subject: Re: [PATCH] Change 'sort' to handle fd exhaustion better.
Date: Mon, 16 Mar 2009 11:25:58 +0000
User-agent: Thunderbird 2.0.0.6 (X11/20071008)

This looks good. In the attached version I've tweaked the
commit message, updated THANKS and changed the test to a+x
(which is required for one of the checker scripts).

cheers,
Pádraig.
>From f7d704a7b17bfab3df91f57cef0c715e642f54af Mon Sep 17 00:00:00 2001
From: Paul Eggert <address@hidden>
Date: Fri, 13 Mar 2009 15:48:30 -0700
Subject: [PATCH] sort: handle fd exhaustion better when merging

This is an alternative to my 9 March patch labeled "Silently lower
nmerge; don't (sometimes incorrectly) range-check"
<http://lists.gnu.org/archive/html/bug-coreutils/2009-03/msg00070.html>.
It differs by not using 'dup' to probe for extra file descriptors;
instead, it simply calls 'open' (and 'pipe') to open files and pipes,
until one of these calls fails due to file descriptor exhaustion; it
then backs off by 1, does a merge with the files that it has opened,
and then retries with the (now-smaller) number of files.

This patch requires quite a few more changes to the source code than
the earlier patch, but it is in some sense "better" because it doesn't
need to call "dup" ahead of time in order to decide whether "open" or
"pipe" will fail.  Also, it's more robust in the case where "open" or
"pipe" fails with errno==EMFILE because some system-wide limit is
exhausted.  It may be useful to apply all of this patch, along with
the other patch's test case (which is somewhat different).

* src/sort.c (create_temp_file): New arg SURVIVE_FD_EXHAUSTION.
(stream_open): New function, containing guts of xfopen.
(xfopen): Use it.
(pipe_fork): Set errno on failure.
(maybe_create_temp): New function, containing guts of create_temp.
(create_temp): Use it.
(open_temp): Distinguish failures due to file descriptor exhaustion
from other failures, and on fd exhaustion return a notice to caller
rather than dying.  Don't test execlp's return value; when it returns,
it *always* returns -1.
(open_input_files): New function.
(mergefps): New arg FPS. It's now the caller's responsibility to open
the input and output files.  All callers changed.
(mergefiles): New function.
(avoid_trashing_input, merge): Handle the case where a single merge
can't merge as much as we wanted due to file descriptor exhaustion, by
merging as much as we can and then retrying.
* tests/Makefile.am (TESTS): Add misc/sort-continue.
* tests/misc/sort-continue: New file.
* THANKS: Add Glen Lenker and Matt Pham who coauthored this patch.
---
 THANKS                   |    2 +
 src/sort.c               |  292 ++++++++++++++++++++++++++++++++++------------
 tests/Makefile.am        |    1 +
 tests/misc/sort-continue |   47 ++++++++
 4 files changed, 268 insertions(+), 74 deletions(-)
 create mode 100755 tests/misc/sort-continue

diff --git a/THANKS b/THANKS
index 46d077b..47263ac 100644
--- a/THANKS
+++ b/THANKS
@@ -203,6 +203,7 @@ Geoff Whale                         address@hidden
 Gerald Pfeifer                      address@hidden
 Gerhard Poul                        address@hidden
 Germano Leichsenring                address@hidden
+Glen Lenker                         address@hidden
 Göran Uddeborg                      address@hidden
 Guochun Shi                         address@hidden
 GOTO Masanori                       address@hidden
@@ -364,6 +365,7 @@ Mate Wierdl                         address@hidden
 Matej Vela                          address@hidden
 Matt Kraai                          address@hidden
 Matt Perry                          address@hidden
+Matt Pham                           address@hidden
 Matt Schalit                        address@hidden
 Matt Swift                          address@hidden
 Matthew Arnison                     address@hidden
diff --git a/src/sort.c b/src/sort.c
index 7b0b064..ced0f2d 100644
--- a/src/sort.c
+++ b/src/sort.c
@@ -732,10 +732,13 @@ exit_cleanup (void)
 }
 
 /* Create a new temporary file, returning its newly allocated tempnode.
-   Store into *PFD the file descriptor open for writing.  */
+   Store into *PFD the file descriptor open for writing.
+   If the creation fails, return NULL and store -1 into *PFD if the
+   failure is due to file descriptor exhaustion and
+   SURVIVE_FD_EXHAUSTION; otherwise, die.  */
 
 static struct tempnode *
-create_temp_file (int *pfd)
+create_temp_file (int *pfd, bool survive_fd_exhaustion)
 {
   static char const slashbase[] = "/sortXXXXXX";
   static size_t temp_dir_index;
@@ -768,8 +771,13 @@ create_temp_file (int *pfd)
   errno = saved_errno;
 
   if (fd < 0)
-    error (SORT_FAILURE, errno, _("cannot create temporary file in %s"),
-          quote (temp_dir));
+    {
+      if (! (survive_fd_exhaustion && errno == EMFILE))
+       error (SORT_FAILURE, errno, _("cannot create temporary file in %s"),
+              quote (temp_dir));
+      free (node);
+      node = NULL;
+    }
 
   *pfd = fd;
   return node;
@@ -779,27 +787,30 @@ create_temp_file (int *pfd)
    standard output; HOW should be "w".  When opening for input, "-"
    means standard input.  To avoid confusion, do not return file
    descriptors STDIN_FILENO, STDOUT_FILENO, or STDERR_FILENO when
-   opening an ordinary FILE.  */
+   opening an ordinary FILE.  Return NULL if unsuccessful.  */
 
 static FILE *
-xfopen (const char *file, const char *how)
+stream_open (const char *file, const char *how)
 {
-  FILE *fp;
-
   if (!file)
-    fp = stdout;
-  else if (STREQ (file, "-") && *how == 'r')
+    return stdout;
+  if (STREQ (file, "-") && *how == 'r')
     {
       have_read_stdin = true;
-      fp = stdin;
-    }
-  else
-    {
-      fp = fopen (file, how);
-      if (! fp)
-       die (_("open failed"), file);
+      return stdin;
     }
+  return fopen (file, how);
+}
+
+/* Same as stream_open, except always return a non-null value; die on
+   failure.  */
 
+static FILE *
+xfopen (const char *file, const char *how)
+ {
+  FILE *fp = stream_open (file, how);
+  if (!fp)
+    die (_("open failed"), file);
   return fp;
 }
 
@@ -838,7 +849,8 @@ dup2_or_die (int oldfd, int newfd)
 
 /* Fork a child process for piping to and do common cleanup.  The
    TRIES parameter tells us how many times to try to fork before
-   giving up.  Return the PID of the child or -1 if fork failed.  */
+   giving up.  Return the PID of the child, or -1 (setting errno)
+   on failure. */
 
 static pid_t
 pipe_fork (int pipefds[2], size_t tries)
@@ -881,8 +893,10 @@ pipe_fork (int pipefds[2], size_t tries)
 
   if (pid < 0)
     {
+      saved_errno = errno;
       close (pipefds[0]);
       close (pipefds[1]);
+      errno = saved_errno;
     }
   else if (pid == 0)
     {
@@ -900,15 +914,22 @@ pipe_fork (int pipefds[2], size_t tries)
 }
 
 /* Create a temporary file and start a compression program to filter output
-   to that file.  Set *PFP to the file handle and if *PPID is non-NULL,
-   set it to the PID of the newly-created process.  */
+   to that file.  Set *PFP to the file handle and if PPID is non-NULL,
+   set *PPID to the PID of the newly-created process.  If the creation
+   fails, return NULL if the failure is due to file descriptor
+   exhaustion and SURVIVE_FD_EXHAUSTION; otherwise, die.  */
 
 static char *
-create_temp (FILE **pfp, pid_t *ppid)
+maybe_create_temp (FILE **pfp, pid_t *ppid, bool survive_fd_exhaustion)
 {
   int tempfd;
-  struct tempnode *node = create_temp_file (&tempfd);
-  char *name = node->name;
+  struct tempnode *node = create_temp_file (&tempfd, survive_fd_exhaustion);
+  char *name;
+
+  if (! node)
+    return NULL;
+
+  name = node->name;
 
   if (compress_program)
     {
@@ -949,48 +970,68 @@ create_temp (FILE **pfp, pid_t *ppid)
   return name;
 }
 
+/* Create a temporary file and start a compression program to filter output
+   to that file.  Set *PFP to the file handle and if *PPID is non-NULL,
+   set it to the PID of the newly-created process.  Die on failure.  */
+
+static char *
+create_temp (FILE **pfp, pid_t *ppid)
+{
+  return maybe_create_temp (pfp, ppid, false);
+}
+
 /* Open a compressed temp file and start a decompression process through
    which to filter the input.  PID must be the valid processes ID of the
-   process used to compress the file.  */
+   process used to compress the file.  Return NULL (setting errno to
+   EMFILE) if we ran out of file descriptors, and die on any other
+   kind of failure.  */
 
 static FILE *
 open_temp (const char *name, pid_t pid)
 {
   int tempfd, pipefds[2];
-  pid_t child_pid;
-  FILE *fp;
+  FILE *fp = NULL;
 
   wait_proc (pid);
 
   tempfd = open (name, O_RDONLY);
   if (tempfd < 0)
-    die (_("couldn't open temporary file"), name);
+    return NULL;
 
-  child_pid = pipe_fork (pipefds, MAX_FORK_TRIES_DECOMPRESS);
-  if (0 < child_pid)
+  switch (pipe_fork (pipefds, MAX_FORK_TRIES_DECOMPRESS))
     {
+    case -1:
+      if (errno != EMFILE)
+       error (SORT_FAILURE, errno, _("couldn't create process for %s -d"),
+              compress_program);
       close (tempfd);
-      close (pipefds[1]);
-    }
-  else if (child_pid == 0)
-    {
+      errno = EMFILE;
+      break;
+
+    case 0:
       close (pipefds[0]);
       dup2_or_die (tempfd, STDIN_FILENO);
       close (tempfd);
       dup2_or_die (pipefds[1], STDOUT_FILENO);
       close (pipefds[1]);
 
-      if (execlp (compress_program, compress_program, "-d", (char *) NULL) < 0)
-       error (SORT_FAILURE, errno, _("couldn't execute %s -d"),
-              compress_program);
-    }
-  else
-    error (SORT_FAILURE, errno, _("couldn't create process for %s -d"),
-          compress_program);
+      execlp (compress_program, compress_program, "-d", (char *) NULL);
+      error (SORT_FAILURE, errno, _("couldn't execute %s -d"),
+            compress_program);
 
-  fp = fdopen (pipefds[0], "r");
-  if (! fp)
-    die (_("couldn't create temporary file"), name);
+    default:
+      close (tempfd);
+      close (pipefds[1]);
+
+      fp = fdopen (pipefds[0], "r");
+      if (! fp)
+       {
+         int saved_errno = errno;
+         close (pipefds[0]);
+         errno = saved_errno;
+       }
+      break;
+    }
 
   return fp;
 }
@@ -2148,31 +2189,53 @@ check (char const *file_name, char checkonly)
   return ordered;
 }
 
+/* Open FILES (there are NFILES of them) and store the resulting array
+   of stream pointers into (*PFPS).  Allocate the array.  Return the
+   number of successfully opened files, setting errno if this value is
+   less than NFILES.  */
+
+static size_t
+open_input_files (struct sortfile *files, size_t nfiles, FILE ***pfps)
+{
+  FILE **fps = *pfps = xnmalloc (nfiles, sizeof *fps);
+  int i;
+
+  /* Open as many input files as we can.  */
+  for (i = 0; i < nfiles; i++)
+    {
+      fps[i] = (files[i].pid
+               ? open_temp (files[i].name, files[i].pid)
+               : stream_open (files[i].name, "r"));
+      if (!fps[i])
+       break;
+    }
+
+  return i;
+}
+
 /* Merge lines from FILES onto OFP.  NTEMPS is the number of temporary
    files (all of which are at the start of the FILES array), and
    NFILES is the number of files; 0 <= NTEMPS <= NFILES <= NMERGE.
-   Close input and output files before returning.
+   FPS is the vector of open stream corresponding to the files.
+   Close input and output streams before returning.
    OUTPUT_FILE gives the name of the output file.  If it is NULL,
-   the output file is standard output.  If OFP is NULL, the output
-   file has not been opened yet (or written to, if standard output).  */
+   the output file is standard output.  */
 
 static void
 mergefps (struct sortfile *files, size_t ntemps, size_t nfiles,
-         FILE *ofp, char const *output_file)
+         FILE *ofp, char const *output_file, FILE **fps)
 {
-  FILE **fps = xnmalloc (nmerge, sizeof *fps);
-                               /* Input streams for each file.  */
-  struct buffer *buffer = xnmalloc (nmerge, sizeof *buffer);
+  struct buffer *buffer = xnmalloc (nfiles, sizeof *buffer);
                                /* Input buffers for each file. */
   struct line saved;           /* Saved line storage for unique check. */
   struct line const *savedline = NULL;
                                /* &saved if there is a saved line. */
   size_t savealloc = 0;                /* Size allocated for the saved line. */
-  struct line const **cur = xnmalloc (nmerge, sizeof *cur);
+  struct line const **cur = xnmalloc (nfiles, sizeof *cur);
                                /* Current line in each line table. */
-  struct line const **base = xnmalloc (nmerge, sizeof *base);
+  struct line const **base = xnmalloc (nfiles, sizeof *base);
                                /* Base of each line table.  */
-  size_t *ord = xnmalloc (nmerge, sizeof *ord);
+  size_t *ord = xnmalloc (nfiles, sizeof *ord);
                                /* Table representing a permutation of fps,
                                   such that cur[ord[0]] is the smallest line
                                   and will be next output. */
@@ -2185,9 +2248,6 @@ mergefps (struct sortfile *files, size_t ntemps, size_t 
nfiles,
   /* Read initial lines from each input file. */
   for (i = 0; i < nfiles; )
     {
-      fps[i] = (files[i].pid
-               ? open_temp (files[i].name, files[i].pid)
-               : xfopen (files[i].name, "r"));
       initbuf (&buffer[i], sizeof (struct line),
               MAX (merge_buffer_size, sort_size / nfiles));
       if (fillbuf (&buffer[i], fps[i], files[i].name))
@@ -2209,13 +2269,13 @@ mergefps (struct sortfile *files, size_t ntemps, size_t 
nfiles,
          free (buffer[i].buf);
          --nfiles;
          for (j = i; j < nfiles; ++j)
-           files[j] = files[j + 1];
+           {
+             files[j] = files[j + 1];
+             fps[j] = fps[j + 1];
+           }
        }
     }
 
-  if (! ofp)
-    ofp = xfopen (output_file, "w");
-
   /* Set up the ord table according to comparisons among input lines.
      Since this only reorders two items if one is strictly greater than
      the other, it is stable. */
@@ -2353,6 +2413,28 @@ mergefps (struct sortfile *files, size_t ntemps, size_t 
nfiles,
   free(cur);
 }
 
+/* Merge lines from FILES onto OFP.  NTEMPS is the number of temporary
+   files (all of which are at the start of the FILES array), and
+   NFILES is the number of files; 0 <= NTEMPS <= NFILES <= NMERGE.
+   Close input and output files before returning.
+   OUTPUT_FILE gives the name of the output file.
+
+   Return the number of files successfully merged.  This number can be
+   less than NFILES if we ran low on file descriptors, but in this
+   case it is never less than 2.  */
+
+static size_t
+mergefiles (struct sortfile *files, size_t ntemps, size_t nfiles,
+           FILE *ofp, char const *output_file)
+{
+  FILE **fps;
+  size_t nopened = open_input_files (files, nfiles, &fps);
+  if (nopened < nfiles && nopened < 2)
+    die (_("open failed"), files[nopened].name);
+  mergefps (files, ntemps, nopened, ofp, output_file, fps);
+  return nopened;
+}
+
 /* Merge into T the two sorted arrays of lines LO (with NLO members)
    and HI (with NHI members).  T, LO, and HI point just past their
    respective arrays, and the arrays are in reverse order.  NLO and
@@ -2519,10 +2601,19 @@ avoid_trashing_input (struct sortfile *files, size_t 
ntemps,
          FILE *tftp;
          pid_t pid;
          char *temp = create_temp (&tftp, &pid);
-         mergefps (&files[i],0, nfiles - i, tftp, temp);
-         files[i].name = temp;
-         files[i].pid = pid;
-         return i + 1;
+         size_t num_merged = 0;
+         while (i + num_merged < nfiles)
+           {
+             num_merged += mergefiles (&files[i], 0, nfiles - i, tftp, temp);
+             files[i].name = temp;
+             files[i].pid = pid;
+
+             memmove(&files[i], &files[i + num_merged],
+                     num_merged * sizeof *files);
+             ntemps += 1;
+             nfiles -= num_merged - 1;;
+             i += num_merged;
+           }
        }
     }
 
@@ -2553,17 +2644,20 @@ merge (struct sortfile *files, size_t ntemps, size_t 
nfiles,
       /* Number of easily-available slots at the next loop iteration.  */
       size_t cheap_slots;
 
-      /* Do as many NMERGE-size merges as possible.  */
-      for (out = in = 0; out < nfiles / nmerge; out++, in += nmerge)
+      /* Do as many NMERGE-size merges as possible. In the case that
+         nmerge is bogus, increment by the maximum number of file
+         descriptors allowed.  */
+      for (out = in = 0; nmerge <= nfiles - in; out++)
        {
          FILE *tfp;
          pid_t pid;
          char *temp = create_temp (&tfp, &pid);
-         size_t nt = MIN (ntemps, nmerge);
-         ntemps -= nt;
-         mergefps (&files[in], nt, nmerge, tfp, temp);
+         size_t num_merged = mergefiles (&files[in], MIN (ntemps, nmerge),
+                                         nmerge, tfp, temp);
+         ntemps -= MIN (ntemps, num_merged);
          files[out].name = temp;
          files[out].pid = pid;
+         in += num_merged;
        }
 
       remainder = nfiles - in;
@@ -2578,12 +2672,12 @@ merge (struct sortfile *files, size_t ntemps, size_t 
nfiles,
          FILE *tfp;
          pid_t pid;
          char *temp = create_temp (&tfp, &pid);
-         size_t nt = MIN (ntemps, nshortmerge);
-         ntemps -= nt;
-         mergefps (&files[in], nt, nshortmerge, tfp, temp);
+         size_t num_merged = mergefiles (&files[in], MIN (ntemps, nshortmerge),
+                                         nshortmerge, tfp, temp);
+         ntemps -= MIN (ntemps, num_merged);
          files[out].name = temp;
          files[out++].pid = pid;
-         in += nshortmerge;
+         in += num_merged;
        }
 
       /* Put the remaining input files into the last NMERGE-sized output
@@ -2594,7 +2688,57 @@ merge (struct sortfile *files, size_t ntemps, size_t 
nfiles,
     }
 
   nfiles = avoid_trashing_input (files, ntemps, nfiles, output_file);
-  mergefps (files, ntemps, nfiles, NULL, output_file);
+
+  /* We aren't guaranteed that this final mergefiles will work, therefore we
+     try to merge into the output, and then merge as much as we can into a
+     temp file if we can't. Repeat.  */
+
+  for (;;)
+    {
+      /* Merge directly into the output file if possible.  */
+      FILE **fps;
+      size_t nopened = open_input_files (files, nfiles, &fps);
+
+      if (nopened == nfiles)
+       {
+         FILE *ofp = stream_open (output_file, "w");
+         if (ofp)
+           {
+             mergefps (files, ntemps, nfiles, ofp, output_file, fps);
+             break;
+           }
+         if (errno != EMFILE || nopened <= 2)
+           die (_("open failed"), output_file);
+       }
+      else if (nopened <= 2)
+       die (_("open failed"), files[nopened].name);
+
+      /* We ran out of file descriptors.  Close one of the input
+        files, to gain a file descriptor.  Then create a temporary
+        file with our spare file descriptor.  Retry if that failed
+        (e.g., some other process could open a file between the time
+        we closed and tried to create).  */
+      FILE *tfp;
+      pid_t pid;
+      char *temp;
+      do
+       {
+         nopened--;
+         xfclose (fps[nopened], files[nopened].name);
+         temp = maybe_create_temp (&tfp, &pid, ! (nopened <= 2));
+       }
+      while (!temp);
+
+      /* Merge into the newly allocated temporary.  */
+      mergefps (&files[0], MIN (ntemps, nopened), nopened, tfp, temp, fps);
+      ntemps -= MIN (ntemps, nopened);
+      files[0].name = temp;
+      files[0].pid = pid;
+
+      memmove (&files[1], &files[nopened], (nfiles - nopened) * sizeof *files);
+      ntemps++;
+      nfiles -= nopened - 1;
+    }
 }
 
 /* Sort NFILES FILES onto OUTPUT_FILE. */
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 10be0c6..2fb01c4 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -201,6 +201,7 @@ TESTS =                                             \
   misc/shuf                                    \
   misc/sort                                    \
   misc/sort-compress                           \
+  misc/sort-continue                           \
   misc/sort-files0-from                                \
   misc/sort-merge                              \
   misc/sort-rand                               \
diff --git a/tests/misc/sort-continue b/tests/misc/sort-continue
new file mode 100755
index 0000000..b94cb2a
--- /dev/null
+++ b/tests/misc/sort-continue
@@ -0,0 +1,47 @@
+#!/bin/bash
+# Tests for file descriptor exhaustion.
+
+# Copyright (C) 2009 Free Software Foundation, Inc.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+if test "$VERBOSE" = yes; then
+  set -x
+  sort --version
+fi
+
+. $srcdir/test-lib.sh
+
+for i in `seq 31`; do
+  echo $i | tee -a in > __test.$i || framework_failure
+done
+
+fail=0
+
+(
+ ulimit -n 6
+ exec 0</dev/null 3<&- 4<&- 5<&-
+ sort -n -m __test.* > out
+) &&
+compare in out || { fail=1; echo "file descriptor exhaustion not handled" 
1>&2; }
+
+echo "32" | tee -a in > in1
+(
+ ulimit -n 6
+ exec 3<&- 4<&- 5<&-
+ cat in1 | sort -n -m __test.* - > out
+) &&
+compare in out || { fail=1; echo "stdin not handled properly" 1>&2; }
+
+Exit $fail
-- 
1.5.3.6

reply via email to

[Prev in Thread] Current Thread [Next in Thread]