poke-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH] ios: Introduce IO stream device for stdin, stdout, stderr.


From: Egeyar Bagcioglu
Subject: [PATCH] ios: Introduce IO stream device for stdin, stdout, stderr.
Date: Sat, 11 Jul 2020 20:45:44 +0200

Hi Jose,

Hereby I submit the support for stdin, stdout, stderr with the following
changes that you asked for:
* Moving ios-dev-buffer.h to ios-buffer.h
* Change the handlers of the stdio.
* Converting stdin to read-only, converting stdout and stderr to write-only.

We also talked about adding suport for device files. I need to keep that
out of this commit, so that I can forget about the stdio while implementing
that. Otherwise, my brain starts paging out things.

Regards
Egeyar 

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The handlers for stdin, stdout and stderr are these words surrounded by '<'
and '>' characters. open("<stdin>") for example opens stdin.

Upon opening, a read stream device initializes a buffer to store the read
data. It was discussed during the first poke-conf that such a buffer should
not be kept as one piece, as this might easily make copying the whole buffer
very expensive during expanding and flushing (i.e. discarding the early parts
of the buffer). Therefore, the buffer is implemented in chunks. These chunks
are kept in an hashtable -if you may- where the hash function is simply
chunk_no % IOB_BUCKET_COUNT.

Write streams do not use a buffer. Instead they maintain an offset. Trying
to rewrite to an already written offset results in an error. Writing to a
greater offset while leaving a gap, results in the gap being filled with 0s
(0 bytes, not '0').

Currently, ios_dev_stream_flush still requires language level support.

2020-07-11  Egeyar Bagcioglu  <egeyar@gmail.com>

        * libpoke/Makefile.am (libpoke_la_SOURCES): Add ios-dev-buffer.h
        and ios-dev-stream.c.
        * libpoke/ios-buffer.h: New file.
        * libpoke/ios-dev-stream.c: New file.
        * libpoke/ios.c: Extern ios_dev_stream.
        (ios_dev_ifs): Add ios_dev_stream.
---
 libpoke/Makefile.am      |   3 +-
 libpoke/ios-buffer.h     | 249 +++++++++++++++++++++++++++++++++++++++
 libpoke/ios-dev-stream.c | 232 ++++++++++++++++++++++++++++++++++++
 libpoke/ios.c            |   2 +
 4 files changed, 485 insertions(+), 1 deletion(-)
 create mode 100644 libpoke/ios-buffer.h
 create mode 100644 libpoke/ios-dev-stream.c

diff --git a/libpoke/Makefile.am b/libpoke/Makefile.am
index bab9793f..3a910b82 100644
--- a/libpoke/Makefile.am
+++ b/libpoke/Makefile.am
@@ -55,7 +55,8 @@ libpoke_la_SOURCES = libpoke.h libpoke.c \
                      pvm-program.h pvm-program.c \
                      pvm.jitter \
                      ios.c ios.h ios-dev.h \
-                     ios-dev-file.c ios-dev-mem.c
+                     ios-dev-file.c ios-dev-mem.c \
+                     ios-dev-buffer.h ios-dev-stream.c
 
 libpoke_la_SOURCES += ../common/pk-utils.c ../common/pk-utils.h
 
diff --git a/libpoke/ios-buffer.h b/libpoke/ios-buffer.h
new file mode 100644
index 00000000..9a52a56a
--- /dev/null
+++ b/libpoke/ios-buffer.h
@@ -0,0 +1,249 @@
+/* ios-buffer.h - The buffer for IO devices.  */
+
+/* Copyright (C) 2020 Egeyar Bagcioglu */
+
+/* This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <config.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <malloc.h>
+#include <assert.h>
+
+#define IOB_CHUNK_SIZE         2048
+#define IOB_BUCKET_COUNT       8
+
+#define IOB_CHUNK_OFFSET(offset)       \
+  ((offset) % IOB_CHUNK_SIZE)
+
+#define IOB_CHUNK_NO(offset)           \
+  ((offset) / IOB_CHUNK_SIZE)
+
+#define IOB_BUCKET_NO(chunk_no)                \
+  ((chunk_no) % IOB_BUCKET_COUNT)
+
+typedef struct ios_buffer_chunk
+{
+  uint8_t bytes[IOB_CHUNK_SIZE];
+  int chunk_no;
+  struct ios_buffer_chunk *next;
+} ios_buffer_chunk;
+
+/* begin_offset is the first offset that's not yet flushed, initilized as 0.
+   end_offset of an instream is the next byte to read to.  end_offset of an
+   outstream is the successor of the greatest offset that is written to.  */
+
+typedef struct ios_buffer
+{
+  ios_buffer_chunk* chunks[IOB_BUCKET_COUNT];
+  ios_dev_off begin_offset;
+  ios_dev_off end_offset;
+  int next_chunk_no;
+} ios_buffer;
+
+static ios_buffer *
+ios_buffer_init ()
+{
+  ios_buffer *bio = calloc (1, sizeof (ios_buffer));
+  return bio;
+}
+
+static int
+ios_buffer_free (ios_buffer *buffer)
+{
+  ios_buffer_chunk *chunk, *chunk_next;
+  for (int i = 0; i < IOB_BUCKET_COUNT; i++)
+    {
+      chunk = buffer->chunks[i];
+      while (chunk)
+       {
+         chunk_next = chunk->next;
+         free (chunk);
+         chunk = chunk_next;
+       }
+    }
+
+  free (buffer);
+  return 1;
+}
+
+static ios_buffer_chunk*
+ios_buffer_get_chunk (ios_buffer *buffer, int chunk_no)
+{
+  int bucket_no = IOB_BUCKET_NO (chunk_no);
+  ios_buffer_chunk *chunk = buffer->chunks[bucket_no];
+
+  for ( ; chunk; chunk = chunk->next)
+    if (chunk->chunk_no == chunk_no)
+      return chunk;
+
+  return NULL;
+}
+
+static int
+ios_buffer_allocate_new_chunk (ios_buffer *buffer, int final_chunk_no,
+                              ios_buffer_chunk **final_chunk)
+{
+  ios_buffer_chunk *chunk;
+  int bucket_no;
+
+  assert (buffer->next_chunk_no <= final_chunk_no);
+
+  do
+    {
+      chunk = calloc(1, sizeof (ios_buffer_chunk));
+      if (!chunk)
+       return IOD_ERROR;
+      /* Place the new chunk into the buffer.  */
+      chunk->chunk_no = buffer->next_chunk_no;
+      bucket_no = IOB_BUCKET_NO (chunk->chunk_no);
+      chunk->next = buffer->chunks[bucket_no];
+      buffer->chunks[bucket_no] = chunk;
+      buffer->next_chunk_no++;
+    }
+  while (buffer->next_chunk_no <= final_chunk_no);
+
+  /* end_offset is updated as the buffer is written to. Therefore, it is not
+     updated here, but in ios_buffer_pwrite.  */
+  *final_chunk = chunk;
+  return 0;
+}
+
+/* Since ios_dev_stream_pread already needs to check begin_offset and
+   end_offset, so this function does not.  It assumes that the given range
+   already exists in the buffer.  */
+
+static int
+ios_buffer_pread (ios_buffer *buffer, void *buf, size_t count,
+                 ios_dev_off offset)
+{
+  int chunk_no;
+  ios_buffer_chunk *chunk;
+  ios_dev_off chunk_offset;
+  size_t already_read_count = 0,
+        to_be_read_count = 0;
+
+  chunk_no = IOB_CHUNK_NO (offset);
+  chunk_offset = IOB_CHUNK_OFFSET (offset);
+  chunk = ios_buffer_get_chunk (buffer, chunk_no);
+  if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk))
+    return IOD_ERROR;
+
+  /* The amount we read from this chunk is the maximum of
+     the COUNT requested and the size of the rest of this chunk. */
+  to_be_read_count = IOB_CHUNK_SIZE - chunk_offset > count
+                    ? count
+                    : IOB_CHUNK_SIZE - chunk_offset;
+
+  memcpy (buf, (void *) chunk + chunk_offset, to_be_read_count);
+
+  while ((already_read_count += to_be_read_count) < count)
+    {
+      to_be_read_count = count - already_read_count > IOB_CHUNK_SIZE
+                        ? IOB_CHUNK_SIZE
+                        : count - already_read_count;
+
+      chunk = ios_buffer_get_chunk (buffer, ++chunk_no);
+      if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk))
+       return IOD_ERROR;
+      memcpy (buf + already_read_count, chunk, to_be_read_count);
+    };
+
+  return 0;
+}
+
+/* Since ios_dev_stream_pwrite already needs to check begin_offset, this
+   function does not.  It assumes the given range is not discarded.  It also
+   allocates new chunks when necessary.  */
+
+static int
+ios_buffer_pwrite (ios_buffer *buffer, const void *buf, size_t count,
+                  ios_dev_off offset)
+{
+  int chunk_no;
+  ios_buffer_chunk *chunk;
+  ios_dev_off chunk_offset;
+  size_t already_written_count = 0,
+        to_be_written_count = 0;
+
+  chunk_no = IOB_CHUNK_NO (offset);
+  chunk_offset = IOB_CHUNK_OFFSET (offset);
+  chunk = ios_buffer_get_chunk (buffer, chunk_no);
+  if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk))
+    return IOD_ERROR;
+
+  /* The amount we write to this chunk is the maximum of the COUNT requested
+     and the size of the rest of this chunk. */
+  to_be_written_count = IOB_CHUNK_SIZE - chunk_offset > count
+                       ? count
+                       : IOB_CHUNK_SIZE - chunk_offset;
+
+  memcpy ((void *) chunk + chunk_offset, buf, to_be_written_count);
+
+  while ((already_written_count += to_be_written_count) < count)
+    {
+      to_be_written_count = count - already_written_count > IOB_CHUNK_SIZE
+                           ? IOB_CHUNK_SIZE
+                           : count - already_written_count;
+
+      chunk = ios_buffer_get_chunk (buffer, ++chunk_no);
+      if (!chunk && ios_buffer_allocate_new_chunk (buffer, chunk_no, &chunk))
+       return IOD_ERROR;
+      memcpy (chunk, buf + already_written_count, to_be_written_count);
+    };
+
+  /* Lastly, keep track of the greatest offset we wrote to in the buffer.
+     (In fact, end_offset is the least offset we have not written to yet.)  */
+  if (buffer->end_offset < offset + count)
+    buffer->end_offset = offset + count;
+
+  return 0;
+}
+
+static int
+ios_buffer_forget_till (ios_buffer *buffer, ios_dev_off offset)
+{
+  ios_buffer_chunk *chunk, *chunk_next;
+  int chunk_no = IOB_CHUNK_NO (offset);
+
+  for (int i = 0; i < IOB_BUCKET_COUNT; i++)
+    {
+      chunk = buffer->chunks[i];
+      buffer->chunks[i] = NULL;
+      while (chunk)
+       {
+         chunk_next = chunk->next;
+         if (chunk->chunk_no >= chunk_no)
+           {
+             chunk->next = buffer->chunks[i];
+             buffer->chunks[i] = chunk;
+           }
+         else
+           free (chunk);
+         chunk = chunk_next;
+       }
+    }
+
+  /* If this is a write stream, we must have written out some data to get here.
+     In this case, begin_offset is equal to OFFSET by now.  */
+  if (buffer->begin_offset < chunk_no * IOB_CHUNK_SIZE)
+    {
+      buffer->begin_offset = chunk_no * IOB_CHUNK_SIZE;
+      assert (buffer->end_offset >= buffer->begin_offset);
+    }
+  else
+    assert (buffer->begin_offset <= offset);
+  return 0;
+}
diff --git a/libpoke/ios-dev-stream.c b/libpoke/ios-dev-stream.c
new file mode 100644
index 00000000..cbbbce41
--- /dev/null
+++ b/libpoke/ios-dev-stream.c
@@ -0,0 +1,232 @@
+/* ios-dev-stream.c - Streaming IO devices.  */
+
+/* Copyright (C) 2020 Egeyar Bagcioglu */
+
+/* This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <config.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include <stdio.h>
+#include <malloc.h>
+#include <string.h>
+#include <assert.h>
+
+#include "ios.h"
+#include "ios-dev.h"
+#include "ios-buffer.h"
+
+#define IOS_STDIN_HANDLER      ("<stdin>")
+#define IOS_STDOUT_HANDLER     ("<stdout>")
+#define IOS_STDERR_HANDLER     ("<stderr>")
+
+/* State associated with a stream device.  */
+
+struct ios_dev_stream
+{
+  char *handler;
+  FILE *file;
+  uint64_t flags;
+  union
+    {
+      ios_buffer *buffer;
+      uint64_t write_offset;
+    };
+};
+
+static char *
+ios_dev_stream_handler_normalize (const char *handler, uint64_t flags)
+{
+  /* TODO handle the case where strdup fails. */
+  if (!strcmp (handler, IOS_STDIN_HANDLER)
+      || !strcmp (handler, IOS_STDOUT_HANDLER)
+      || !strcmp (handler, IOS_STDERR_HANDLER))
+    return strdup (handler);
+  else
+    return NULL;
+}
+
+static void *
+ios_dev_stream_open (const char *handler, uint64_t flags, int *error)
+{
+  struct ios_dev_stream *sio;
+
+  sio = malloc (sizeof (struct ios_dev_stream));
+  if (!sio)
+    goto error;
+
+  sio->handler = strdup (handler);
+  if (!sio->handler)
+    goto error;
+
+  if (!strcmp (handler, IOS_STDIN_HANDLER))
+    {
+      sio->file = stdin;
+      sio->flags = IOS_F_READ;
+      sio->buffer = ios_buffer_init ();
+      if (!sio->buffer)
+       goto error;
+    }
+  else if (!strcmp (handler, IOS_STDOUT_HANDLER))
+    {
+      sio->file = stdout;
+      sio->flags = IOS_F_WRITE;
+      sio->write_offset = 0;
+    }
+  else if (!strcmp (handler, IOS_STDERR_HANDLER))
+    {
+      sio->file = stderr;
+      sio->flags = IOS_F_WRITE;
+      sio->write_offset = 0;
+    }
+  else
+    goto error;
+
+  return sio;
+
+error:
+  if (sio)
+    {
+      if (sio->handler)
+       free (sio->handler);
+      free (sio);
+    }
+  *error = IOD_ERROR;
+  return NULL;
+}
+
+static int
+ios_dev_stream_close (void *iod)
+{
+  struct ios_dev_stream *sio = iod;
+
+  ios_buffer_free (sio->buffer);
+  free (sio);
+
+  return 1;
+}
+
+static uint64_t
+ios_dev_stream_get_flags (void *iod)
+{
+  struct ios_dev_stream *sio = iod;
+  return sio->flags;
+}
+
+static int
+ios_dev_stream_pread (void *iod, void *buf, size_t count, ios_dev_off offset)
+{
+  struct ios_dev_stream *sio = iod;
+  ios_buffer *buffer = sio->buffer;
+  size_t read_count, total_read_count = 0;
+
+  if (sio->flags & IOS_F_WRITE)
+    return IOD_ERROR;
+
+  /* If the beginning of the buffer is discarded, return EOF. */
+  if (buffer->begin_offset > offset)
+    return IOD_EOF;
+
+  /* If the requsted range is in the buffer, return it. */
+  if (buffer->end_offset >= offset + count)
+    return ios_buffer_pread (buffer, buf, count, offset);
+
+  /* What was last read into the buffer may be before or after the
+     offset that this function is provided with.  */
+  if (buffer->end_offset == offset)
+    {
+      do
+       {
+         read_count = fread (buf + total_read_count, count, 1, sio->file);
+         total_read_count += read_count;
+       }
+      while (total_read_count < count && read_count);
+
+      if (ios_buffer_pwrite (buffer, buf, total_read_count, offset)
+         || total_read_count < count)
+       return IOD_ERROR;
+
+      return IOS_OK;
+    }
+  else
+    {
+      size_t to_be_read = (offset + count) - buffer->end_offset;
+      void *temp = malloc (to_be_read);
+      fread (temp, to_be_read, 1, sio->file);
+      if (ios_buffer_pwrite (buffer, temp, to_be_read, buffer->end_offset))
+       return IOD_ERROR;
+      free (temp);
+      return ios_buffer_pread (buffer, buf, count, offset);
+    }
+}
+
+static int
+ios_dev_stream_pwrite (void *iod, const void *buf, size_t count,
+                      ios_dev_off offset)
+{
+  struct ios_dev_stream *sio = iod;
+
+  if (sio->flags & IOS_F_READ)
+    return IOD_ERROR;
+
+  /* If the offset we want to write to is already written out,
+     we return an error.  */
+  if (sio->write_offset > offset)
+    return IOD_EOF;
+
+  if (offset > sio->write_offset)
+    /* TODO: Write this more efficiently. */
+    for (int i=0; i < (offset - sio->write_offset); i++)
+      fputc (0,         sio->file);
+
+  fwrite (buf, count, 1, sio->file);
+  sio->write_offset = offset + count;
+
+  return IOS_OK;
+}
+
+static ios_dev_off
+ios_dev_stream_size (void *iod)
+{
+  struct ios_dev_stream *sio = iod;
+  if (sio->flags & IOS_F_READ)
+    return sio->buffer->end_offset;
+  else
+    return sio->write_offset;
+}
+
+static int
+ios_dev_stream_flush (void *iod, ios_dev_off offset)
+{
+  struct ios_dev_stream *sio = iod;
+  if (sio->flags & IOS_F_READ)
+    return ios_buffer_forget_till (sio->buffer, offset);
+  else
+    return IOS_OK;
+}
+
+struct ios_dev_if ios_dev_stream
+  __attribute__ ((visibility ("hidden"))) =
+  {
+   .handler_normalize = ios_dev_stream_handler_normalize,
+   .open = ios_dev_stream_open,
+   .close = ios_dev_stream_close,
+   .pread = ios_dev_stream_pread,
+   .pwrite = ios_dev_stream_pwrite,
+   .get_flags = ios_dev_stream_get_flags,
+   .size = ios_dev_stream_size,
+   .flush = ios_dev_stream_flush
+  };
diff --git a/libpoke/ios.c b/libpoke/ios.c
index 38dd0472..f4b842a6 100644
--- a/libpoke/ios.c
+++ b/libpoke/ios.c
@@ -85,6 +85,7 @@ static struct ios *cur_io;
 
 extern struct ios_dev_if ios_dev_mem; /* ios-dev-mem.c */
 extern struct ios_dev_if ios_dev_file; /* ios-dev-file.c */
+extern struct ios_dev_if ios_dev_stream; /* ios-dev-stream.c */
 #ifdef HAVE_LIBNBD
 extern struct ios_dev_if ios_dev_nbd; /* ios-dev-nbd.c */
 #endif
@@ -92,6 +93,7 @@ extern struct ios_dev_if ios_dev_nbd; /* ios-dev-nbd.c */
 static struct ios_dev_if *ios_dev_ifs[] =
   {
    &ios_dev_mem,
+   &ios_dev_stream,
 #ifdef HAVE_LIBNBD
    &ios_dev_nbd,
 #endif
-- 
2.25.4




reply via email to

[Prev in Thread] Current Thread [Next in Thread]