gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r22875 - in Extractor/src: include main


From: gnunet
Subject: [GNUnet-SVN] r22875 - in Extractor/src: include main
Date: Tue, 24 Jul 2012 17:30:17 +0200

Author: grothoff
Date: 2012-07-24 17:30:16 +0200 (Tue, 24 Jul 2012)
New Revision: 22875

Modified:
   Extractor/src/include/extractor.h
   Extractor/src/main/extractor_plugin_main.c
Log:
-hxing

Modified: Extractor/src/include/extractor.h
===================================================================
--- Extractor/src/include/extractor.h   2012-07-24 15:05:21 UTC (rev 22874)
+++ Extractor/src/include/extractor.h   2012-07-24 15:30:16 UTC (rev 22875)
@@ -418,12 +418,12 @@
    * @param cls the 'cls' member of this struct
    * @param pos position to seek (see 'man lseek')
    * @param whence how to see (absolute to start, relative, absolute to end)
-   * @return new absolute position, UINT64_MAX on error (i.e. desired position
+   * @return new absolute position, -1 on error (i.e. desired position
    *         does not exist)
    */ 
-  uint64_t (*seek) (void *cls,
-                   int64_t pos,
-                   int whence);
+  int64_t (*seek) (void *cls,
+                  int64_t pos,
+                  int whence);
 
 
   /**

Modified: Extractor/src/main/extractor_plugin_main.c
===================================================================
--- Extractor/src/main/extractor_plugin_main.c  2012-07-24 15:05:21 UTC (rev 
22874)
+++ Extractor/src/main/extractor_plugin_main.c  2012-07-24 15:30:16 UTC (rev 
22875)
@@ -27,6 +27,7 @@
 #include "plibc.h"
 #include "extractor.h"
 #include "extractor_datasource.h"
+#include "extractor_plugins.h"
 #include "extractor_ipc.h"
 #include "extractor_plugin_main.h"
 #include <dirent.h>
@@ -36,28 +37,64 @@
 #include <signal.h>
 
 
-
 /**
- * Opens a file (for later mmapping).
- * This is POSIX variant of the plugin_open_* function.
- * Closes a file is already opened, closes it before opening a new one.
- * Destroy shared memory area.
- *
- * @param plugin plugin context
- * @param shm_name name of the file to open.
- * @return file id (-1 on error). That is, the result of open() syscall.
- */ 
-static int
-plugin_open_file (struct EXTRACTOR_PluginList *plugin, 
-                 const char *shm_name)
+ * Closure we use for processing requests inside the helper process.
+ */
+struct ProcessingContext 
 {
-  if (plugin->shm_id != -1)
-    close (plugin->shm_id);
-  plugin->shm_id = open (shm_name, O_RDONLY, 0);
-  return plugin->shm_id;
-}
+  /**
+   * Our plugin handle.
+   */
+  struct EXTRACTOR_PluginList *plugin;
 
+  /**
+   * Shared memory area.
+   */
+  void *shm;
 
+  /**
+   * Overall size of the file.
+   */
+  uint64_t file_size;
+
+  /**
+   * Current read offset when reading from the SHM.
+   */
+  uint64_t read_position;
+
+  /**
+   * Current offset of the SHM in the file.
+   */
+  uint64_t shm_off;
+
+  /**
+   * Handle to the shared memory.
+   */
+  int shm_id;
+  
+  /**
+   * Size of the shared memory map.
+   */
+  uint32_t shm_map_size;
+
+  /**
+   * Number of bytes ready in SHM.
+   */
+  uint32_t shm_ready_bytes;
+
+  /**
+   * Input stream.
+   */
+  int in;
+  
+  /**
+   * Output stream.
+   */ 
+  int out;
+};
+
+
+
 /**
  * Moves current absolute buffer position to @pos in @whence mode.
  * Will move logical position withouth shifting the buffer, if possible.
@@ -69,8 +106,12 @@
  * @return new absolute position, -1 on error
  */
 static int64_t
-pl_seek (struct EXTRACTOR_PluginList *plugin, int64_t pos, int whence)
+plugin_env_seek (void *cls,
+                int64_t pos, 
+                int whence)
 {
+  struct ProcessingContext *pc = cls;
+
   switch (whence)
   {
   case SEEK_CUR:
@@ -123,13 +164,6 @@
 }
 
 
-static int64_t
-pl_get_fsize (struct EXTRACTOR_PluginList *plugin)
-{
-  return plugin->fsize;
-}
-
-
 /**
  * Fills @data with a pointer to the data buffer.
  * Equivalent to read(), except you don't have to allocate and free
@@ -141,9 +175,12 @@
  * @param count number of bytes to read
  * @return number of bytes (<= count) avalable in @data, -1 on error
  */
-static int64_t
-pl_read (struct EXTRACTOR_PluginList *plugin, unsigned char **data, size_t 
count)
+static ssize_t
+plugin_env_read (void *cls,
+                unsigned char **data, size_t count)
 {
+  struct ProcessingContext *pc = cls;
+  
   *data = NULL;
   if (count > MAX_READ)
     return -1;
@@ -166,41 +203,11 @@
 }
 
 
-/**
- * Initializes an extracting session for a plugin.
- *   opens the file/shm (only in OPMODE_FILE)
- *   sets shm_ptr to NULL (unmaps it, if it was mapped)
- *   sets position to 0
- *   initializes file size to 'fsize' (may be -1)
- *   sets seek request to 0
- *
- * @param plugin plugin context
- * @param operation_mode the mode of operation (OPMODE_*)
- * @param fsize size of the source file (may be -1)
- * @param shm_name name of the shm or file to open
- * @return 0 on success, non-0 on error.
- */ 
-static int
-init_state_method (struct EXTRACTOR_PluginList *plugin, 
-                  uint8_t operation_mode, 
-                  int64_t fsize, 
-                  const char *shm_name)
+static uint64_t
+plugin_env_get_size (void *cls)
 {
-  plugin->seek_request = 0;
-  if (plugin->shm_ptr != NULL)
-    munmap (plugin->shm_ptr, plugin->map_size);
-  plugin->shm_ptr = NULL;
-  if (operation_mode == OPMODE_FILE)
-  {
-    if (-1 == plugin_open_file (plugin, shm_name))
-      return 1;
-  }
-  else if (-1 == plugin_open_shm (plugin, shm_name))
-    return 1;
-  plugin->fsize = fsize;
-  plugin->shm_pos = 0;
-  plugin->fpos = 0;
-  return 0;
+  struct ProcessingContext *pc = cls;
+  return pc->file_size;
 }
 
 
@@ -208,7 +215,7 @@
  * Function called by a plugin in a child process.  Transmits
  * the meta data back to the parent process.
  *
- * @param cls closure, "int*" of the FD for transmission
+ * @param cls closure, "struct ProcessingContext" with the FD for transmission
  * @param plugin_name name of the plugin that produced this value;
  *        special values can be used (i.e. '<zlib>' for zlib being
  *        used in the main libextractor library and yielding
@@ -222,16 +229,17 @@
  * @return 0 to continue extracting, 1 to abort (transmission error)
  */ 
 static int
-transmit_reply (void *cls,
-               const char *plugin_name,
-               enum EXTRACTOR_MetaType type,
-               enum EXTRACTOR_MetaFormat format,
-               const char *data_mime_type,
-               const char *data,
-               size_t data_len)
+plugin_env_send_proc (void *cls,
+                     const char *plugin_name,
+                     enum EXTRACTOR_MetaType type,
+                     enum EXTRACTOR_MetaFormat format,
+                     const char *data_mime_type,
+                     const char *data,
+                     size_t data_len)
 {
+  struct ProcessingContext *pc = cls;
   static const unsigned char meta_byte = MESSAGE_META;
-  int *cpipe_out = cls;
+  int cpipe_out = pc->out;
   struct IpcHeader hdr;
   size_t mime_len;
 
@@ -263,132 +271,126 @@
 
 
 /**
- * Main loop function for plugins.  Reads a message from the plugin
- * input pipe and acts on it.
+ * Handle an init message.  The opcode itself has already been read.
  *
- * @param plugin plugin context
- * @param in input stream with incoming requests
- * @param out output stream for sending responses
- */ 
-static void
-process_requests (struct EXTRACTOR_PluginList *plugin,
-                 int in,
-                 int out)
+ * @param pc processing context
+ * @return 0 on success, -1 on error
+ */
+static int
+handle_init_message (struct ProcessingContext *pc)
 {
-  int read_result1;
-  int read_result2;
-  int read_result3;
-  int read_result4;
-  unsigned char code;
-  char *shm_name = NULL;
-  size_t shm_name_len;
-  int extract_reply;
-  struct IpcHeader hdr;
-  int do_break;
-#ifdef WINDOWS
-  HANDLE map;
-  MEMORY_BASIC_INFORMATION mi;
-#endif
+  struct InitMessage init;
 
-  /* The point of recursing into this function is to request
-   * a seek from LE server and wait for a reply. This snipper
-   * requests a seek.
-   */
-  if (plugin->waiting_for_update == 1)
+  if (NULL != pc->shm)
+    return -1;
+  if (sizeof (struct InitMessage) - 1
+      != read (pc->in,
+              &init.reserved,
+              sizeof (struct InitMessage) - 1))
+    return -1;
+  if (init.shm_name_length > MAX_SHM_NAME)
+    return -1;
   {
-    unsigned char seek_byte = MESSAGE_SEEK;
-    if (write (out, &seek_byte, 1) != 1)
+    char shm_name[init.shm_name_length + 1];
+
+    if (init.shm_name_length 
+       != read (pc->in,
+                shm_name,
+                init.shm_name_length))
       return -1;
-    if (write (out, &plugin->seek_request, sizeof (int64_t)) != sizeof 
(int64_t))
+    shm_name[init.shm_name_length] = '\0';
+
+    pc->shm_map_size = init.shm_map_size;
+#if WINDOWS
+    pc->shm_ptr = MapViewOfFile (pc->shm_id, FILE_MAP_READ, 0, 0, 0);
+    if (NULL == pc->shm_ptr)
       return -1;
+#else
+    pc->shm_id = open (shm_name, O_RDONLY, 0);
+    if (-1 == pc->shm_id)
+      return -1;
+    pc->shm = mmap (NULL,
+                   pc->shm_map_size,
+                   PROT_READ,
+                   MAP_SHARED,
+                   pc->shm_id, 0);
+    if ( ((void*) -1) == pc->shm)
+      return -1;
+#endif
   }
+  return 0;
+}
 
-  memset (&hdr, 0, sizeof (hdr));
-  do_break = 0;
-  while (!do_break)
-  {
-    read_result1 = read (in, &code, 1);
-    if (read_result1 <= 0)
-      break;
-    switch (code)
+
+/**
+ * Handle a start message.  The opcode itself has already been read.
+ *
+ * @param pc processing context
+ * @return 0 on success, -1 on error
+ */
+static int
+handle_start_message (struct ProcessingContext *pc)
+{
+  struct StartMessage start;
+  struct EXTRACTOR_ExtractContext ec;
+
+  if (sizeof (struct StartMessage) - 1
+      != read (pc->in,
+              &start.reserved,
+              sizeof (struct StartMessage) - 1))
+    return -1;
+  pc->shm_ready_bytes = start.shm_ready_bytes;
+  pc->file_size = start.shm_file_size;
+  pc->read_position = 0;
+  pc->shm_off = 0;
+  ec.cls = pc;
+  ec.config = pc->plugin->plugin_options;
+  ec.read = &plugin_env_read;
+  ec.seek = &plugin_env_seek;
+  ec.get_size = &plugin_env_get_size;
+  ec.proc = &plugin_env_send_proc;
+  pc->plugin->extract_method (&ec);
+}
+
+
+/**
+ * Main loop function for plugins.  Reads a message from the plugin
+ * input pipe and acts on it.
+ *
+ * @param pc processing context
+ */ 
+static void
+process_requests (struct ProcessingContext *pc)
+{
+  while (1)
     {
-    case MESSAGE_INIT_STATE:
-      read_result2 = read (in, &plugin->operation_mode, sizeof (uint8_t));
-      read_result3 = read (in, &plugin->fsize, sizeof (int64_t));
-      read_result4 = read (in, &shm_name_len, sizeof (size_t));
-      if ((read_result2 < sizeof (uint8_t)) ||
-          (read_result3 < sizeof (int64_t)) ||
-          (read_result4 < sizeof (size_t)))
-      {
-        do_break = 1;
-        break;
-      }
-      if (plugin->operation_mode != OPMODE_MEMORY &&
-          plugin->operation_mode != OPMODE_DECOMPRESS &&
-          plugin->operation_mode != OPMODE_FILE)
-      {
-        do_break = 1;
-        break;
-      }
-      if ((plugin->operation_mode == OPMODE_MEMORY ||
-          plugin->operation_mode == OPMODE_DECOMPRESS) &&
-          shm_name_len > MAX_SHM_NAME)
-      {
-        do_break = 1;
-        break;
-      }
-      /* Fsize may be -1 only in decompression mode */
-      if (plugin->operation_mode != OPMODE_DECOMPRESS && plugin->fsize <= 0)
-      {
-        do_break = 1;
-        break;
-      }
-      if (shm_name != NULL)
-        free (shm_name);
-      shm_name = malloc (shm_name_len);
-      if (shm_name == NULL)
-      {
-        do_break = 1;
-        break;
-      }
-      read_result2 = read (in, shm_name, shm_name_len);
-      if (read_result2 < shm_name_len)
-      {
-        do_break = 1;
-        break;
-      }
-      shm_name[shm_name_len - 1] = '\0';
-      do_break = init_state_method (plugin, plugin->operation_mode, 
plugin->fsize, shm_name);
-      /* in OPMODE_MEMORY and OPMODE_FILE we can start extracting right away,
-       * there won't be UPDATED_SHM message, and we don't need it
-       */
-      if (!do_break && (plugin->operation_mode == OPMODE_MEMORY ||
-          plugin->operation_mode == OPMODE_FILE))
-      {
-        extract_reply = plugin->extract_method (plugin, transmit_reply, &out);
-        unsigned char done_byte = MESSAGE_DONE;
-        if (write (out, &done_byte, 1) != 1)
-        {
-          do_break = 1;
-          break;
-        }
-        if ((plugin->specials != NULL) &&
-            (NULL != strstr (plugin->specials, "force-kill")))
-        {
-          /* we're required to die after each file since this
-             plugin only supports a single file at a time */
-#if !WINDOWS
-          fsync (out);
-#else
-          _commit (out);
-#endif
-          _exit (0);
-        }
-      }
-      break;
-    case MESSAGE_DISCARD_STATE:
-      discard_state_method (plugin);
-      break;
+      unsigned char code;
+  
+      if (1 != read (pc->in, &code, 1))
+       break;
+      switch (code)
+       {
+       case MESSAGE_INIT_STATE:
+         if (0 != handle_init_message (pc))
+           return;
+         break;
+       case MSG_EXTRACT_START:
+         if (0 != handle_start_message (pc))
+           return;
+       case MSG_UPDATED_SHM:
+         /* not allowed here, we're not waiting for SHM to move! */
+         return;
+       case MSG_DISCARD_STATE:
+         /* odd, we're already in the start state... */
+         continue;
+       default:
+         /* error, unexpected message */
+         return;
+       }
+    }
+}
+
+#if 0
     case MESSAGE_UPDATED_SHM:
       if (plugin->operation_mode == OPMODE_DECOMPRESS)
       {
@@ -413,7 +415,7 @@
         }
 #else
         if ((plugin->map_handle == 0) ||
-           (NULL == (plugin->shm_ptr = MapViewOfFile (plugin->map_handle, 
FILE_MAP_READ, 0, 0, 0))))
+           (NULL == (plugin->shm_ptr = 
         {
           do_break = 1;
           break;
@@ -478,20 +480,8 @@
           }
         }
       }
-      else
-      {
-        /* This is mostly to safely skip unrelated messages */
-        int64_t t;
-        size_t t2;
-        read_result2 = read (in, &t, sizeof (int64_t));
-        read_result3 = read (in, &t2, sizeof (size_t));
-        read_result4 = read (in, &t, sizeof (int64_t));
-      }
-      break;
-    }
-  }
-  return 0;
 }
+#endif
 
 
 #ifndef WINDOWS
@@ -536,6 +526,8 @@
 EXTRACTOR_plugin_main_ (struct EXTRACTOR_PluginList *plugin, 
                        int in, int out)
 {
+  struct ProcessingContext pc;
+
   if (0 != EXTRACTOR_plugin_load_ (plugin))
     {
 #if DEBUG
@@ -560,7 +552,23 @@
       open_dev_null (1, O_WRONLY);
 #endif
     }
-  process_requests (plugin, in, out);
+  pc.plugin = plugin;
+  pc.in = in;
+  pc.out = out;
+  pc.shm_id = -1;
+  pc.shm = NULL;
+  pc.shm_map_size = 0;
+  process_requests (&pc);
+#if WINDOWS
+  if (NULL != pc.shm_ptr)
+    UnmapViewOfFile (pc.shm_ptr);
+#else
+  if ( (NULL != pc.shm_ptr) &&
+       (((void*) 1) != pc.shm_ptr) )
+    munmap (pc.shm_ptr, pc.shm_map_size);
+  if (-1 != pc.shm_id)
+    (void) close (pc.shm_id);
+#endif
 }
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]