[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r22875 - in Extractor/src: include main
From: |
gnunet |
Subject: |
[GNUnet-SVN] r22875 - in Extractor/src: include main |
Date: |
Tue, 24 Jul 2012 17:30:17 +0200 |
Author: grothoff
Date: 2012-07-24 17:30:16 +0200 (Tue, 24 Jul 2012)
New Revision: 22875
Modified:
Extractor/src/include/extractor.h
Extractor/src/main/extractor_plugin_main.c
Log:
-hxing
Modified: Extractor/src/include/extractor.h
===================================================================
--- Extractor/src/include/extractor.h 2012-07-24 15:05:21 UTC (rev 22874)
+++ Extractor/src/include/extractor.h 2012-07-24 15:30:16 UTC (rev 22875)
@@ -418,12 +418,12 @@
* @param cls the 'cls' member of this struct
* @param pos position to seek (see 'man lseek')
* @param whence how to see (absolute to start, relative, absolute to end)
- * @return new absolute position, UINT64_MAX on error (i.e. desired position
+ * @return new absolute position, -1 on error (i.e. desired position
* does not exist)
*/
- uint64_t (*seek) (void *cls,
- int64_t pos,
- int whence);
+ int64_t (*seek) (void *cls,
+ int64_t pos,
+ int whence);
/**
Modified: Extractor/src/main/extractor_plugin_main.c
===================================================================
--- Extractor/src/main/extractor_plugin_main.c 2012-07-24 15:05:21 UTC (rev
22874)
+++ Extractor/src/main/extractor_plugin_main.c 2012-07-24 15:30:16 UTC (rev
22875)
@@ -27,6 +27,7 @@
#include "plibc.h"
#include "extractor.h"
#include "extractor_datasource.h"
+#include "extractor_plugins.h"
#include "extractor_ipc.h"
#include "extractor_plugin_main.h"
#include <dirent.h>
@@ -36,28 +37,64 @@
#include <signal.h>
-
/**
- * Opens a file (for later mmapping).
- * This is POSIX variant of the plugin_open_* function.
- * Closes a file is already opened, closes it before opening a new one.
- * Destroy shared memory area.
- *
- * @param plugin plugin context
- * @param shm_name name of the file to open.
- * @return file id (-1 on error). That is, the result of open() syscall.
- */
-static int
-plugin_open_file (struct EXTRACTOR_PluginList *plugin,
- const char *shm_name)
+ * Closure we use for processing requests inside the helper process.
+ */
+struct ProcessingContext
{
- if (plugin->shm_id != -1)
- close (plugin->shm_id);
- plugin->shm_id = open (shm_name, O_RDONLY, 0);
- return plugin->shm_id;
-}
+ /**
+ * Our plugin handle.
+ */
+ struct EXTRACTOR_PluginList *plugin;
+ /**
+ * Shared memory area.
+ */
+ void *shm;
+ /**
+ * Overall size of the file.
+ */
+ uint64_t file_size;
+
+ /**
+ * Current read offset when reading from the SHM.
+ */
+ uint64_t read_position;
+
+ /**
+ * Current offset of the SHM in the file.
+ */
+ uint64_t shm_off;
+
+ /**
+ * Handle to the shared memory.
+ */
+ int shm_id;
+
+ /**
+ * Size of the shared memory map.
+ */
+ uint32_t shm_map_size;
+
+ /**
+ * Number of bytes ready in SHM.
+ */
+ uint32_t shm_ready_bytes;
+
+ /**
+ * Input stream.
+ */
+ int in;
+
+ /**
+ * Output stream.
+ */
+ int out;
+};
+
+
+
/**
* Moves current absolute buffer position to @pos in @whence mode.
* Will move logical position withouth shifting the buffer, if possible.
@@ -69,8 +106,12 @@
* @return new absolute position, -1 on error
*/
static int64_t
-pl_seek (struct EXTRACTOR_PluginList *plugin, int64_t pos, int whence)
+plugin_env_seek (void *cls,
+ int64_t pos,
+ int whence)
{
+ struct ProcessingContext *pc = cls;
+
switch (whence)
{
case SEEK_CUR:
@@ -123,13 +164,6 @@
}
-static int64_t
-pl_get_fsize (struct EXTRACTOR_PluginList *plugin)
-{
- return plugin->fsize;
-}
-
-
/**
* Fills @data with a pointer to the data buffer.
* Equivalent to read(), except you don't have to allocate and free
@@ -141,9 +175,12 @@
* @param count number of bytes to read
* @return number of bytes (<= count) avalable in @data, -1 on error
*/
-static int64_t
-pl_read (struct EXTRACTOR_PluginList *plugin, unsigned char **data, size_t
count)
+static ssize_t
+plugin_env_read (void *cls,
+ unsigned char **data, size_t count)
{
+ struct ProcessingContext *pc = cls;
+
*data = NULL;
if (count > MAX_READ)
return -1;
@@ -166,41 +203,11 @@
}
-/**
- * Initializes an extracting session for a plugin.
- * opens the file/shm (only in OPMODE_FILE)
- * sets shm_ptr to NULL (unmaps it, if it was mapped)
- * sets position to 0
- * initializes file size to 'fsize' (may be -1)
- * sets seek request to 0
- *
- * @param plugin plugin context
- * @param operation_mode the mode of operation (OPMODE_*)
- * @param fsize size of the source file (may be -1)
- * @param shm_name name of the shm or file to open
- * @return 0 on success, non-0 on error.
- */
-static int
-init_state_method (struct EXTRACTOR_PluginList *plugin,
- uint8_t operation_mode,
- int64_t fsize,
- const char *shm_name)
+static uint64_t
+plugin_env_get_size (void *cls)
{
- plugin->seek_request = 0;
- if (plugin->shm_ptr != NULL)
- munmap (plugin->shm_ptr, plugin->map_size);
- plugin->shm_ptr = NULL;
- if (operation_mode == OPMODE_FILE)
- {
- if (-1 == plugin_open_file (plugin, shm_name))
- return 1;
- }
- else if (-1 == plugin_open_shm (plugin, shm_name))
- return 1;
- plugin->fsize = fsize;
- plugin->shm_pos = 0;
- plugin->fpos = 0;
- return 0;
+ struct ProcessingContext *pc = cls;
+ return pc->file_size;
}
@@ -208,7 +215,7 @@
* Function called by a plugin in a child process. Transmits
* the meta data back to the parent process.
*
- * @param cls closure, "int*" of the FD for transmission
+ * @param cls closure, "struct ProcessingContext" with the FD for transmission
* @param plugin_name name of the plugin that produced this value;
* special values can be used (i.e. '<zlib>' for zlib being
* used in the main libextractor library and yielding
@@ -222,16 +229,17 @@
* @return 0 to continue extracting, 1 to abort (transmission error)
*/
static int
-transmit_reply (void *cls,
- const char *plugin_name,
- enum EXTRACTOR_MetaType type,
- enum EXTRACTOR_MetaFormat format,
- const char *data_mime_type,
- const char *data,
- size_t data_len)
+plugin_env_send_proc (void *cls,
+ const char *plugin_name,
+ enum EXTRACTOR_MetaType type,
+ enum EXTRACTOR_MetaFormat format,
+ const char *data_mime_type,
+ const char *data,
+ size_t data_len)
{
+ struct ProcessingContext *pc = cls;
static const unsigned char meta_byte = MESSAGE_META;
- int *cpipe_out = cls;
+ int cpipe_out = pc->out;
struct IpcHeader hdr;
size_t mime_len;
@@ -263,132 +271,126 @@
/**
- * Main loop function for plugins. Reads a message from the plugin
- * input pipe and acts on it.
+ * Handle an init message. The opcode itself has already been read.
*
- * @param plugin plugin context
- * @param in input stream with incoming requests
- * @param out output stream for sending responses
- */
-static void
-process_requests (struct EXTRACTOR_PluginList *plugin,
- int in,
- int out)
+ * @param pc processing context
+ * @return 0 on success, -1 on error
+ */
+static int
+handle_init_message (struct ProcessingContext *pc)
{
- int read_result1;
- int read_result2;
- int read_result3;
- int read_result4;
- unsigned char code;
- char *shm_name = NULL;
- size_t shm_name_len;
- int extract_reply;
- struct IpcHeader hdr;
- int do_break;
-#ifdef WINDOWS
- HANDLE map;
- MEMORY_BASIC_INFORMATION mi;
-#endif
+ struct InitMessage init;
- /* The point of recursing into this function is to request
- * a seek from LE server and wait for a reply. This snipper
- * requests a seek.
- */
- if (plugin->waiting_for_update == 1)
+ if (NULL != pc->shm)
+ return -1;
+ if (sizeof (struct InitMessage) - 1
+ != read (pc->in,
+ &init.reserved,
+ sizeof (struct InitMessage) - 1))
+ return -1;
+ if (init.shm_name_length > MAX_SHM_NAME)
+ return -1;
{
- unsigned char seek_byte = MESSAGE_SEEK;
- if (write (out, &seek_byte, 1) != 1)
+ char shm_name[init.shm_name_length + 1];
+
+ if (init.shm_name_length
+ != read (pc->in,
+ shm_name,
+ init.shm_name_length))
return -1;
- if (write (out, &plugin->seek_request, sizeof (int64_t)) != sizeof
(int64_t))
+ shm_name[init.shm_name_length] = '\0';
+
+ pc->shm_map_size = init.shm_map_size;
+#if WINDOWS
+ pc->shm_ptr = MapViewOfFile (pc->shm_id, FILE_MAP_READ, 0, 0, 0);
+ if (NULL == pc->shm_ptr)
return -1;
+#else
+ pc->shm_id = open (shm_name, O_RDONLY, 0);
+ if (-1 == pc->shm_id)
+ return -1;
+ pc->shm = mmap (NULL,
+ pc->shm_map_size,
+ PROT_READ,
+ MAP_SHARED,
+ pc->shm_id, 0);
+ if ( ((void*) -1) == pc->shm)
+ return -1;
+#endif
}
+ return 0;
+}
- memset (&hdr, 0, sizeof (hdr));
- do_break = 0;
- while (!do_break)
- {
- read_result1 = read (in, &code, 1);
- if (read_result1 <= 0)
- break;
- switch (code)
+
+/**
+ * Handle a start message. The opcode itself has already been read.
+ *
+ * @param pc processing context
+ * @return 0 on success, -1 on error
+ */
+static int
+handle_start_message (struct ProcessingContext *pc)
+{
+ struct StartMessage start;
+ struct EXTRACTOR_ExtractContext ec;
+
+ if (sizeof (struct StartMessage) - 1
+ != read (pc->in,
+ &start.reserved,
+ sizeof (struct StartMessage) - 1))
+ return -1;
+ pc->shm_ready_bytes = start.shm_ready_bytes;
+ pc->file_size = start.shm_file_size;
+ pc->read_position = 0;
+ pc->shm_off = 0;
+ ec.cls = pc;
+ ec.config = pc->plugin->plugin_options;
+ ec.read = &plugin_env_read;
+ ec.seek = &plugin_env_seek;
+ ec.get_size = &plugin_env_get_size;
+ ec.proc = &plugin_env_send_proc;
+ pc->plugin->extract_method (&ec);
+}
+
+
+/**
+ * Main loop function for plugins. Reads a message from the plugin
+ * input pipe and acts on it.
+ *
+ * @param pc processing context
+ */
+static void
+process_requests (struct ProcessingContext *pc)
+{
+ while (1)
{
- case MESSAGE_INIT_STATE:
- read_result2 = read (in, &plugin->operation_mode, sizeof (uint8_t));
- read_result3 = read (in, &plugin->fsize, sizeof (int64_t));
- read_result4 = read (in, &shm_name_len, sizeof (size_t));
- if ((read_result2 < sizeof (uint8_t)) ||
- (read_result3 < sizeof (int64_t)) ||
- (read_result4 < sizeof (size_t)))
- {
- do_break = 1;
- break;
- }
- if (plugin->operation_mode != OPMODE_MEMORY &&
- plugin->operation_mode != OPMODE_DECOMPRESS &&
- plugin->operation_mode != OPMODE_FILE)
- {
- do_break = 1;
- break;
- }
- if ((plugin->operation_mode == OPMODE_MEMORY ||
- plugin->operation_mode == OPMODE_DECOMPRESS) &&
- shm_name_len > MAX_SHM_NAME)
- {
- do_break = 1;
- break;
- }
- /* Fsize may be -1 only in decompression mode */
- if (plugin->operation_mode != OPMODE_DECOMPRESS && plugin->fsize <= 0)
- {
- do_break = 1;
- break;
- }
- if (shm_name != NULL)
- free (shm_name);
- shm_name = malloc (shm_name_len);
- if (shm_name == NULL)
- {
- do_break = 1;
- break;
- }
- read_result2 = read (in, shm_name, shm_name_len);
- if (read_result2 < shm_name_len)
- {
- do_break = 1;
- break;
- }
- shm_name[shm_name_len - 1] = '\0';
- do_break = init_state_method (plugin, plugin->operation_mode,
plugin->fsize, shm_name);
- /* in OPMODE_MEMORY and OPMODE_FILE we can start extracting right away,
- * there won't be UPDATED_SHM message, and we don't need it
- */
- if (!do_break && (plugin->operation_mode == OPMODE_MEMORY ||
- plugin->operation_mode == OPMODE_FILE))
- {
- extract_reply = plugin->extract_method (plugin, transmit_reply, &out);
- unsigned char done_byte = MESSAGE_DONE;
- if (write (out, &done_byte, 1) != 1)
- {
- do_break = 1;
- break;
- }
- if ((plugin->specials != NULL) &&
- (NULL != strstr (plugin->specials, "force-kill")))
- {
- /* we're required to die after each file since this
- plugin only supports a single file at a time */
-#if !WINDOWS
- fsync (out);
-#else
- _commit (out);
-#endif
- _exit (0);
- }
- }
- break;
- case MESSAGE_DISCARD_STATE:
- discard_state_method (plugin);
- break;
+ unsigned char code;
+
+ if (1 != read (pc->in, &code, 1))
+ break;
+ switch (code)
+ {
+ case MESSAGE_INIT_STATE:
+ if (0 != handle_init_message (pc))
+ return;
+ break;
+ case MSG_EXTRACT_START:
+ if (0 != handle_start_message (pc))
+ return;
+ case MSG_UPDATED_SHM:
+ /* not allowed here, we're not waiting for SHM to move! */
+ return;
+ case MSG_DISCARD_STATE:
+ /* odd, we're already in the start state... */
+ continue;
+ default:
+ /* error, unexpected message */
+ return;
+ }
+ }
+}
+
+#if 0
case MESSAGE_UPDATED_SHM:
if (plugin->operation_mode == OPMODE_DECOMPRESS)
{
@@ -413,7 +415,7 @@
}
#else
if ((plugin->map_handle == 0) ||
- (NULL == (plugin->shm_ptr = MapViewOfFile (plugin->map_handle,
FILE_MAP_READ, 0, 0, 0))))
+ (NULL == (plugin->shm_ptr =
{
do_break = 1;
break;
@@ -478,20 +480,8 @@
}
}
}
- else
- {
- /* This is mostly to safely skip unrelated messages */
- int64_t t;
- size_t t2;
- read_result2 = read (in, &t, sizeof (int64_t));
- read_result3 = read (in, &t2, sizeof (size_t));
- read_result4 = read (in, &t, sizeof (int64_t));
- }
- break;
- }
- }
- return 0;
}
+#endif
#ifndef WINDOWS
@@ -536,6 +526,8 @@
EXTRACTOR_plugin_main_ (struct EXTRACTOR_PluginList *plugin,
int in, int out)
{
+ struct ProcessingContext pc;
+
if (0 != EXTRACTOR_plugin_load_ (plugin))
{
#if DEBUG
@@ -560,7 +552,23 @@
open_dev_null (1, O_WRONLY);
#endif
}
- process_requests (plugin, in, out);
+ pc.plugin = plugin;
+ pc.in = in;
+ pc.out = out;
+ pc.shm_id = -1;
+ pc.shm = NULL;
+ pc.shm_map_size = 0;
+ process_requests (&pc);
+#if WINDOWS
+ if (NULL != pc.shm_ptr)
+ UnmapViewOfFile (pc.shm_ptr);
+#else
+ if ( (NULL != pc.shm_ptr) &&
+ (((void*) 1) != pc.shm_ptr) )
+ munmap (pc.shm_ptr, pc.shm_map_size);
+ if (-1 != pc.shm_id)
+ (void) close (pc.shm_id);
+#endif
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r22875 - in Extractor/src: include main,
gnunet <=