bug-coreutils
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: feature request: gzip/bzip support for sort


From: Dan Hipschman
Subject: Re: feature request: gzip/bzip support for sort
Date: Sun, 14 Jan 2007 17:02:11 -0800
User-agent: Mutt/1.5.9i

OK, here's my current patch and theory on why it's still not fast
enough.  Here's profiling info on the cvs version of sort, up to
write_bytes:

  %   cumulative   self              self     total
 time   seconds   seconds    calls   s/call   s/call  name
 38.09      7.85     7.85                             memcoll
 22.42     12.47     4.62  3827913     0.00     0.00  compare
 13.73     15.30     2.83   133249     0.00     0.00  mergelines
  6.94     16.73     1.43                             xmemcoll
  5.92     17.95     1.22        1     1.22     2.84  mergefps
  4.44     18.87     0.92       88     0.01     0.01  fillbuf
  1.89     19.26     0.39        1     0.39    10.50  sort
  1.70     19.61     0.35                             xalloc_die
  1.31     19.88     0.27                             md5_process_block
  1.02     20.09     0.21                             keycompare
  0.85     20.26     0.18    58368     0.00     0.00  sortlines_temp
  0.66     20.40     0.14    58368     0.00     0.00  sortlines
  0.63     20.53     0.13   500000     0.00     0.00  write_bytes

Now here's the version compressing temp files, with a few irrelevant
things removed:

  %   cumulative   self              self     total
 time   seconds   seconds    calls   s/call   s/call  name
 42.09      8.70     8.70                             memcoll
 20.20     12.88     4.18  3827913     0.00     0.00  compare
 12.19     15.40     2.52   133249     0.00     0.00  mergelines
  6.39     16.72     1.32                             xmemcoll
  4.57     17.66     0.95        1     0.95     2.63  mergefps
  4.40     18.57     0.91       88     0.01     0.01  fillbuf
  1.79     18.94     0.37   500000     0.00     0.00  compress_and_write_bytes
  1.74     19.30     0.36                             md5_process_block
  1.60     19.63     0.33        1     0.33     9.87  sort
  1.40     19.92     0.29    58368     0.00     0.00  sortlines_temp
  1.40     20.21     0.29                             xalloc_die
  1.04     20.43     0.22    58368     0.00     0.00  sortlines
  0.65     20.56     0.14                             keycompare
  0.15     20.59     0.03   251662     0.00     0.00  write_bytes
  0.15     20.62     0.03      569     0.00     0.00  load_compression_buffer
  0.05     20.66     0.01     6688     0.00     0.00  read_and_decompress_bytes
  0.05     20.67     0.01       88     0.00     0.00  get_compression_buffer
  0.00     20.67     0.00     3954     0.00     0.00  read_bytes
  0.00     20.67     0.00     1123     0.00     0.00  compress_out
  0.00     20.67     0.00      554     0.00     0.00  compress_wrkmem
  0.00     20.67     0.00      554     0.00     0.00  flush_compression
  0.00     20.67     0.00       16     0.00     0.00  
flush_compression_and_xfclose

compress_and_write_bytes takes the place of write_bytes, and as you can
see, it's slower.  In the second version, we spend less time in
write_bytes, so the time sending data to the kernel to write to disk has
decreased.  The problem is, we're more than losing that time in
compress_and_write_bytes.  The part that actaully does compression is in
flush_compression, and that's fast.  The time seems to be spent just
copying bytes into the buffer where it will later be compressed.

It may be possible to eliminate this time if we had a stream compression
algorithm instead of a block compression algorithm.  My patch still just
slows sort down (although, less than before).  If anybody can find any
obvious improvements to it that I've missed, then maybe there's still
hope for it, but I don't think so at this point.  I'll look at some
different compression algorithms later, or if someone else wants to take
up the cause, that's fine by me, because I'm a little busy with school
right now.  Still, I'll see what I can come up with in my spare time.

Thanks,
Dan


Index: sort.c
===================================================================
RCS file: /sources/coreutils/coreutils/src/sort.c,v
retrieving revision 1.344
diff -p -u -r1.344 sort.c
--- sort.c      13 Dec 2006 21:27:05 -0000      1.344
+++ sort.c      14 Jan 2007 23:59:50 -0000
@@ -50,6 +50,10 @@ struct rlimit { size_t rlim_cur; };
 # define getrlimit(Resource, Rlp) (-1)
 #endif
 
+#if HAVE_LZO1X_H
+# include <lzo1x.h>
+#endif
+
 /* The official name of this program (e.g., no `g' prefix).  */
 #define PROGRAM_NAME "sort"
 
@@ -261,6 +265,12 @@ static bool have_read_stdin;
 /* List of key field comparisons to be tried.  */
 static struct keyfield *keylist;
 
+/* Flag to compress temp files. */
+static bool compress_temps = HAVE_LZO1X_H;
+
+/* The number of chars to allocate for compression buffers. */
+static size_t compress_buf_size = 65536;
+
 static void sortlines_temp (struct line *, size_t, struct line *);
 
 /* Report MESSAGE for FILE, then clean up and exit.
@@ -399,15 +409,87 @@ static struct option const long_options[
 /* The set of signals that are caught.  */
 static sigset_t caught_signals;
 
+/* Buffer for compressing output to temp files. */
+struct compress_buf
+{
+  char *pos, *lim;
+  char buf[1];  /* Actual size is compression_buffer_size */
+};
+
 /* The list of temporary files. */
 struct tempnode
 {
-  struct tempnode *volatile next;
+  struct tempnode *volatile prev;  /* Previous in order of the list.  */
+  struct tempnode *volatile next;  /* Next in order of the list.  */
+  struct tempnode *volatile link;  /* Next in hash chain.  */
+  struct compress_buf *compress_buf;
+  bool compress; /* Should we try to compress? */
   char name[1];  /* Actual size is 1 + file name length.  */
 };
 static struct tempnode *volatile temphead;
 static struct tempnode *volatile *temptail = &temphead;
 
+#define SZTEMPTABLE 389
+
+/* A hash table for fast lookups of tempnode from filename.  */
+static struct tempnode *temptable[SZTEMPTABLE];
+
+static unsigned int
+hashval (const char *name)
+{
+  unsigned int h = 0;
+
+  while (*name)
+    h = 5 * h + *name++;
+
+  return h % SZTEMPTABLE;
+}
+
+/* Add a tempnode to the hash table.  */
+
+static void
+hash_temp (struct tempnode *node)
+{
+  unsigned int bucket = hashval (node->name);
+
+  node->link = temptable[bucket];
+  temptable[bucket] = node;
+}
+
+/* Retrieve and remove a tempnode from the hash table.  */
+
+static struct tempnode *
+unhash_temp (const char *name)
+{
+  unsigned int bucket = hashval (name);
+  struct tempnode *node;
+  struct tempnode *volatile *pnode = &temptable[bucket];
+
+  while ((node = *pnode)->name != name)
+    pnode = &node->link;
+
+  *pnode = node->link;
+  return node;
+}
+
+/* Retrieves a tempnode without removing it from the hash table.
+   If the node is not in the table, return NULL.  */
+
+static struct tempnode *
+find_temp (const char *name)
+{
+  if (name)
+    {
+      unsigned int bucket = hashval (name);
+      struct tempnode *node = temptable[bucket];
+
+      for (; node; node = node->link)
+       if (node->name == name)
+         return node;
+    }
+  return NULL;
+}
+
 /* Clean up any remaining temporary files.  */
 
 static void
@@ -438,7 +520,9 @@ create_temp_file (FILE **pfp)
 
   memcpy (file, temp_dir, len);
   memcpy (file + len, slashbase, sizeof slashbase);
-  node->next = NULL;
+  node->prev = node->next = node->link = NULL;
+  node->compress_buf = NULL;
+  node->compress = compress_temps;
   if (++temp_dir_index == temp_dir_count)
     temp_dir_index = 0;
 
@@ -447,8 +531,15 @@ create_temp_file (FILE **pfp)
   fd = mkstemp (file);
   if (0 <= fd)
     {
+      struct tempnode *volatile prev =
+       temptail == &temphead
+       ? NULL
+       : ((struct tempnode *)
+          ((char *) temptail - offsetof (struct tempnode, next)));
       *temptail = node;
+      node->prev = prev;
       temptail = &node->next;
+      hash_temp (node);
     }
   saved_errno = errno;
   sigprocmask (SIG_SETMASK, &oldset, NULL);
@@ -521,6 +612,15 @@ write_bytes (const char *buf, size_t n_b
     die (_("write failed"), output_file);
 }
 
+static size_t
+read_bytes (void *buf, size_t n_bytes, FILE *fp, const char *input_file)
+{
+  size_t n = fread (buf, 1, n_bytes, fp);
+  if (ferror (fp))
+    die (_("read failed"), input_file);
+  return n;
+}
+
 /* Append DIR to the array of temporary directory names.  */
 static void
 add_temp_dir (char const *dir)
@@ -536,31 +636,234 @@ add_temp_dir (char const *dir)
 static void
 zaptemp (const char *name)
 {
-  struct tempnode *volatile *pnode;
   struct tempnode *node;
-  struct tempnode *next;
   sigset_t oldset;
   int unlink_status;
   int unlink_errno = 0;
 
-  for (pnode = &temphead; (node = *pnode)->name != name; pnode = &node->next)
-    continue;
-
   /* Unlink the temporary file in a critical section to avoid races.  */
-  next = node->next;
   sigprocmask (SIG_BLOCK, &caught_signals, &oldset);
+  node = unhash_temp (name);
   unlink_status = unlink (name);
   unlink_errno = errno;
-  *pnode = next;
+  if (node->prev)
+    node->prev->next = node->next;
+  else
+    temphead = node->next;
+  if (node->next)
+    node->next->prev = node->prev;
+  else
+    temptail = node->prev ? &node->prev->next : &temphead;
   sigprocmask (SIG_SETMASK, &oldset, NULL);
 
   if (unlink_status != 0)
     error (0, unlink_errno, _("warning: cannot remove: %s"), name);
-  if (! next)
-    temptail = pnode;
+  free (node->compress_buf);
   free (node);
 }
 
+/* Get the buffer associated with this temp file or create it if necessary. */
+
+static struct compress_buf *
+get_compression_buffer (const char *tfname)
+{
+  struct compress_buf *cbuf;
+  struct tempnode *node;
+
+  if (! compress_temps || ! (node = find_temp (tfname)))
+    return NULL;
+
+  if (node->compress && !node->compress_buf)
+    {
+      cbuf = malloc (offsetof (struct compress_buf, buf) + compress_buf_size);
+      if (! cbuf)
+       {
+         node->compress = false;
+         return NULL;
+       }
+      cbuf->pos = cbuf->buf;
+      cbuf->lim = cbuf->buf + compress_buf_size;
+      node->compress_buf = cbuf;
+    }
+
+  return node->compress_buf;
+}
+
+#if HAVE_LZO1X_H
+static lzo_bytep
+compress_out (void)
+{
+  static lzo_bytep out;
+  if (!out)
+    out = xmalloc (compress_buf_size + compress_buf_size / 16 + 64 + 3);
+  return out;
+}
+
+static lzo_bytep
+compress_wrkmem (void)
+{
+  static lzo_bytep wrkmem;
+  if (!wrkmem)
+    wrkmem = xmalloc (LZO1X_1_MEM_COMPRESS);
+  return wrkmem;
+}
+#endif
+
+/* Actually write the compressed data to disk. */
+
+static void
+flush_compression (FILE *fp, const char *file, struct compress_buf *cbuf)
+{
+#if HAVE_LZO1X_H
+  lzo_bytep out = compress_out ();
+  lzo_bytep wrkmem = compress_wrkmem ();
+  lzo_uint in_len = (lzo_uint) (cbuf->pos - cbuf->buf);
+  lzo_uint out_len;
+
+  lzo1x_1_compress ((lzo_bytep) cbuf->buf, in_len, out, &out_len, wrkmem);
+  write_bytes ((char *) &in_len, sizeof in_len, fp, file);
+  write_bytes ((char *) &out_len, sizeof out_len, fp, file);
+  if (in_len <= out_len)
+    write_bytes (cbuf->buf, in_len, fp, file);
+  else
+    write_bytes ((char *) out, out_len, fp, file);
+
+  cbuf->pos = cbuf->buf;
+
+#endif
+}
+
+static void
+load_compression_buffer (FILE *fp, const char *fname,
+                        struct compress_buf *cbuf)
+{
+#if HAVE_LZO1X_H
+  lzo_bytep out = compress_out ();
+  lzo_uint in_len;
+  lzo_uint out_len;
+  size_t n;
+  bool eof;
+
+  n = read_bytes (&in_len, sizeof in_len, fp, fname);
+  if (feof (fp) && n == 0)
+    return;
+
+  eof = (n != sizeof in_len);
+  if (!eof)
+    eof = (read_bytes (&out_len, sizeof out_len, fp, fname) != sizeof out_len);
+
+  if (!eof)
+    {
+      cbuf->pos = cbuf->lim - in_len;
+      if (in_len <= out_len)
+       {
+         eof = (read_bytes (cbuf->pos, in_len, fp, fname) != in_len);
+       }
+      else
+       {
+         lzo_uint new_len;
+         eof = (read_bytes (out, out_len, fp, fname) != out_len);
+         if (!eof)
+           lzo1x_decompress (out, out_len,
+                             (lzo_bytep) cbuf->pos, &new_len, NULL);
+       }
+    }
+
+  if (eof)
+    die (_("premature EOF in temporary file"), fname);
+
+#endif
+}
+
+/* Saves input until the buffer is full.  Then it compresses the buffer
+   and writes them to disk.  */
+
+static void
+compress_and_write_bytes (const char *buf, size_t n_bytes,
+                         FILE *fp, const char *output_file,
+                         struct compress_buf *cbuf)
+{
+  const char *rest = buf;
+  size_t left = n_bytes;
+
+  if (!cbuf)
+    {
+      write_bytes (buf, n_bytes, fp, output_file);
+      return;
+    }
+
+  while (0 < left)
+    {
+      size_t nbuf = (size_t) (cbuf->lim - cbuf->pos);
+      size_t n2cp = MIN (left, nbuf);
+
+      memcpy (cbuf->pos, rest, n2cp);
+      rest += n2cp;
+      left -= n2cp;
+      cbuf->pos += n2cp;
+
+      if (cbuf->pos == cbuf->lim)
+       flush_compression (fp, output_file, cbuf);
+    }
+}
+
+/* Read up to NCHARS decompressed bytes from a file.
+   Returns the number of bytes read.  */
+
+static size_t
+read_and_decompress_bytes (void *buf, size_t nchars,
+                          FILE *fp, const char *fname,
+                          struct compress_buf *cbuf)
+{
+  char *rest = buf;
+  size_t left = nchars;
+
+  if (!cbuf)
+    {
+      return read_bytes (buf, nchars, fp, fname);
+    }
+
+  while (0 < left)
+    {
+      size_t nbuf;
+      size_t n2cp;
+
+      if (cbuf->pos == cbuf->lim)
+       {
+         if (feof (fp))
+           return nchars - left;
+         load_compression_buffer (fp, fname, cbuf);
+       }
+
+      nbuf = (size_t) (cbuf->lim - cbuf->pos);
+      n2cp = MIN (left, nbuf);
+
+      memcpy (rest, cbuf->pos, n2cp);
+      rest += n2cp;
+      left -= n2cp;
+      cbuf->pos += n2cp;
+    }
+
+  return nchars;
+}
+
+/* Flush anything still waiting for compression and xfclose the file. */
+
+static void
+flush_compression_and_xfclose (FILE *fp, char const *file,
+                              struct compress_buf *cbuf)
+{
+  if (cbuf)
+    {
+      flush_compression (fp, file, cbuf);
+
+      /* This is necessary so the buffer will be reloaded */
+      /* when the file is reopened. */
+      cbuf->pos = cbuf->lim;
+    }
+  xfclose (fp, file);
+}
+
 #if HAVE_NL_LANGINFO
 
 static int
@@ -980,10 +1283,13 @@ fillbuf (struct buffer *buf, FILE *fp, c
   char eol = eolchar;
   size_t line_bytes = buf->line_bytes;
   size_t mergesize = merge_buffer_size - MIN_MERGE_BUFFER_SIZE;
+  struct compress_buf *cbuf;
 
   if (buf->eof)
     return false;
 
+  cbuf = get_compression_buffer (file);
+
   if (buf->used != buf->left)
     {
       memmove (buf->buf, buf->buf + buf->used - buf->left, buf->left);
@@ -1007,23 +1313,19 @@ fillbuf (struct buffer *buf, FILE *fp, c
             rest of the input file consists entirely of newlines,
             except that the last byte is not a newline.  */
          size_t readsize = (avail - 1) / (line_bytes + 1);
-         size_t bytes_read = fread (ptr, 1, readsize, fp);
+         size_t bytes_read =
+           read_and_decompress_bytes (ptr, readsize, fp, file, cbuf);
          char *ptrlim = ptr + bytes_read;
          char *p;
          avail -= bytes_read;
 
          if (bytes_read != readsize)
            {
-             if (ferror (fp))
-               die (_("read failed"), file);
-             if (feof (fp))
-               {
-                 buf->eof = true;
-                 if (buf->buf == ptrlim)
-                   return false;
-                 if (ptrlim[-1] != eol)
-                   *ptrlim++ = eol;
-               }
+             buf->eof = true;
+             if (buf->buf == ptrlim)
+               return false;
+             if (ptrlim[-1] != eol)
+               *ptrlim++ = eol;
            }
 
          /* Find and record each line in the just-read input.  */
@@ -1593,6 +1895,7 @@ mergefps (char **files, size_t ntemps, s
   struct buffer buffer[NMERGE];        /* Input buffers for each file. */
   struct line saved;           /* Saved line storage for unique check. */
   struct line const *savedline = NULL;
+  struct compress_buf *cbuf;
                                /* &saved if there is a saved line. */
   size_t savealloc = 0;                /* Size allocated for the saved line. */
   struct line const *cur[NMERGE]; /* Current line in each line table. */
@@ -1606,6 +1909,8 @@ mergefps (char **files, size_t ntemps, s
   struct keyfield const *key = keylist;
   saved.text = NULL;
 
+  cbuf = get_compression_buffer (output_file);
+
   /* Read initial lines from each input file. */
   for (i = 0; i < nfiles; )
     {
@@ -1659,7 +1964,8 @@ mergefps (char **files, size_t ntemps, s
          if (savedline && compare (savedline, smallest))
            {
              savedline = NULL;
-             write_bytes (saved.text, saved.length, ofp, output_file);
+             compress_and_write_bytes (saved.text, saved.length,
+                                       ofp, output_file, cbuf);
            }
          if (!savedline)
            {
@@ -1688,7 +1994,8 @@ mergefps (char **files, size_t ntemps, s
            }
        }
       else
-       write_bytes (smallest->text, smallest->length, ofp, output_file);
+       compress_and_write_bytes (smallest->text, smallest->length,
+                                 ofp, output_file, cbuf);
 
       /* Check if we need to read more lines into core. */
       if (base[ord[0]] < smallest)
@@ -1759,11 +2066,12 @@ mergefps (char **files, size_t ntemps, s
 
   if (unique && savedline)
     {
-      write_bytes (saved.text, saved.length, ofp, output_file);
+      compress_and_write_bytes (saved.text, saved.length,
+                               ofp, output_file, cbuf);
       free (saved.text);
     }
 
-  xfclose (ofp, output_file);
+  flush_compression_and_xfclose (ofp, output_file, cbuf);
 }
 
 /* Merge into T the two sorted arrays of lines LO (with NLO members)
@@ -2034,6 +2342,7 @@ sort (char * const *files, size_t nfiles
        {
          struct line *line;
          struct line *linebase;
+         struct compress_buf *cbuf;
 
          if (buf.eof && nfiles
              && (bytes_per_line + 1
@@ -2056,24 +2365,27 @@ sort (char * const *files, size_t nfiles
              tfp = xfopen (output_file, "w");
              temp_output = output_file;
              output_file_created = true;
+             cbuf = NULL;
            }
          else
            {
              ++ntemps;
              temp_output = create_temp_file (&tfp);
+             cbuf = get_compression_buffer (temp_output);
            }
 
          do
            {
              line--;
-             write_bytes (line->text, line->length, tfp, temp_output);
+             compress_and_write_bytes (line->text, line->length,
+                                       tfp, temp_output, cbuf);
              if (unique)
                while (linebase < line && compare (line, line - 1) == 0)
                  line--;
            }
          while (linebase < line);
 
-         xfclose (tfp, temp_output);
+         flush_compression_and_xfclose (tfp, temp_output, cbuf);
 
          if (output_file_created)
            goto finish;
@@ -2607,6 +2919,11 @@ main (int argc, char **argv)
        }
     }
 
+#if HAVE_LZO1X_H
+  if (lzo_init () != LZO_E_OK)
+    compress_temps = false;   /* Guess again. */
+#endif
+
   /* Inheritance of global options to individual keys. */
   for (key = keylist; key; key = key->next)
     {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]