gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r22879 - Extractor/src/main


From: gnunet
Subject: [GNUnet-SVN] r22879 - Extractor/src/main
Date: Tue, 24 Jul 2012 23:45:25 +0200

Author: grothoff
Date: 2012-07-24 23:45:25 +0200 (Tue, 24 Jul 2012)
New Revision: 22879

Modified:
   Extractor/src/main/extractor.c
   Extractor/src/main/extractor_plugins.c
   Extractor/src/main/extractor_plugins.h
Log:
-towards compilation

Modified: Extractor/src/main/extractor.c
===================================================================
--- Extractor/src/main/extractor.c      2012-07-24 19:48:28 UTC (rev 22878)
+++ Extractor/src/main/extractor.c      2012-07-24 21:45:25 UTC (rev 22879)
@@ -31,6 +31,12 @@
 #include "extractor_plugins.h"
 
 
+/**
+ * Size used for the shared memory segment.
+ */
+#define DEFAULT_SHM_SIZE (16 * 1024)
+
+
 #if 0
 /**
  * Open a file
@@ -489,190 +495,132 @@
 #endif
 
 
+/**
+ * Closure for 'process_plugin_reply'
+ */
+struct PluginReplyProcessor
+{
+  /**
+   * Function to call if we receive meta data from the plugin.
+   */
+  EXTRACTOR_MetaDataProcessor proc;
 
+  /**
+   * Closure for 'proc'.
+   */
+  void *proc_cls;
 
+};
+
+
 /**
+ * Handler for a message from one of the plugins.
+ *
+ * @param cls closure with our 'struct PluginReplyProcessor'
+ * @param plugin plugin of the channel sending the message
+ * @param meta_type type of the meta data
+ * @param meta_format format of the meta data
+ * @param value_len number of bytes in 'value'
+ * @param value 'data' send from the plugin
+ * @param mime mime string send from the plugin
+ */
+static void
+process_plugin_reply (void *cls,
+                     struct EXTRACTOR_PluginList *plugin,
+                     enum EXTRACTOR_MetaType meta_type,
+                     enum EXTRACTOR_MetaFormat meta_format,
+                     size_t value_len,
+                     const void *value,
+                     const char *mime)
+{
+  struct PluginReplyProcessor *prp = cls;
+
+  // FIXME...
+}
+
+
+/**
  * Extract keywords using the given set of plugins.
  *
  * @param plugins the list of plugins to use
- * @param data data to process, or NULL if fds is not -1
- * @param fd file to read data from, or -1 if data is not NULL
- * @param filename name of the file to which fd belongs
- * @param cfs compressed file source for compressed stream (may be NULL)
- * @param fsize size of the file or data buffer
+ * @param shm shared memory object used by the plugins (NULL if
+ *        all plugins are in-process)
+ * @param ds data to process
  * @param proc function to call for each meta data item found
  * @param proc_cls cls argument to proc
  */
 static void
 do_extract (struct EXTRACTOR_PluginList *plugins, 
-           const char *data, 
-           int fd,
-           const char *filename, 
-           struct CompressedFileSource *cfs, 
-           int64_t fsize, 
+           struct EXTRACTOR_SharedMemory *shm,
+           struct EXTRACTOR_Datasource *ds,
            EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
 {
-  int operation_mode;
-  int plugin_count = 0;
-  int shm_result;
-  unsigned char *shm_ptr;
-#if !WINDOWS
-  int shm_id;
-#else
-  HANDLE map_handle;
-#endif
-  char shm_name[MAX_SHM_NAME + 1];
+  unsigned int plugin_count;
+  struct EXTRACTOR_PluginList *pos;
+  struct StartMessage start;
+  struct EXTRACTOR_Channel *channel;
+  struct PluginReplyProcessor prp;
+  uint32_t ready;
+  int done;
 
-  struct EXTRACTOR_PluginList *ppos;
-
-  int64_t position = 0;
-  int64_t preserve = 0;
-  size_t map_size;
-  ssize_t read_result;
-  int kill_plugins = 0;
-
-  if (cfs != NULL)
-    operation_mode = OPMODE_DECOMPRESS;
-  else if (data != NULL)
-    operation_mode = OPMODE_MEMORY;
-  else if (fd != -1)
-    operation_mode = OPMODE_FILE;
+  plugin_count = 0;
+  for (pos = plugins; NULL != pos; pos = pos->next)
+    plugin_count++;
+  if (NULL != shm)
+    ready = EXTRACTOR_IPC_shared_memory_set_ (shm, ds, 0, DEFAULT_SHM_SIZE);
   else
-    return;
+    ready = 0;
 
-  map_size = (fd == -1) ? fsize : MAX_READ;
+  prp.proc = proc;
+  prp.proc_cls = proc_cls;
 
-  /* Make a shared memory object. Even if we're running in-process. Simpler 
that way.
-   * This is only for reading-from-memory case. For reading-from-file we will 
use
-   * the file itself; for uncompressing-on-the-fly the decompressor will make 
its own
-   * shared memory object and uncompress into it directly.
-   */
-  if (operation_mode == OPMODE_MEMORY)
+  /* send 'start' message */
+  start.opcode = MESSAGE_EXTRACT_START;
+  start.reserved = 0;
+  start.reserved2 = 0;
+  start.shm_ready_bytes = ready;
+  start.file_size = EXTRACTOR_datasource_get_size_ (ds);
   {
-    operation_mode = OPMODE_MEMORY;
-#if !WINDOWS
-    shm_result = make_shm_posix ((void **) &shm_ptr, &shm_id, shm_name, 
MAX_SHM_NAME,
-        fsize);
-#else  
-    shm_result = make_shm_w32 ((void **) &shm_ptr, &map_handle, shm_name, 
MAX_SHM_NAME,
-        fsize);
-#endif
-    if (shm_result != 0)
-      return;
-    memcpy (shm_ptr, data, fsize);
-  }
-  else if (operation_mode == OPMODE_FILE)
-  {
-#if WINDOWS
-    shm_result = make_file_backed_shm_w32 (&map_handle, (HANDLE) 
_get_osfhandle (fd), shm_name, MAX_SHM_NAME);
-    if (shm_result != 0)
-      return;
-#endif
-  }
+    struct EXTRACTOR_Channel *channels[plugin_count];
 
-  /* This four-loops-instead-of-one construction is intended to increase 
parallelism */
-  for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-  {
-    start_process (ppos);
-    plugin_count += 1;
-  }
-
-  for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-    load_in_process_plugin (ppos);
-
-  for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-    write_plugin_data (ppos);
-
-  if (operation_mode == OPMODE_DECOMPRESS)
-  {
-    for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-      init_plugin_state (ppos, operation_mode, cfs->shm_name, -1);
-  }
-  else if (operation_mode == OPMODE_FILE)
-  {
-    for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-#if !WINDOWS
-      init_plugin_state (ppos, operation_mode, filename, fsize);
-#else
-      init_plugin_state (ppos, operation_mode, shm_name, fsize);
-#endif
-  }
-  else
-  {
-    for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-      init_plugin_state (ppos, operation_mode, shm_name, fsize);
-  }
-
-  if (operation_mode == OPMODE_FILE || operation_mode == OPMODE_MEMORY)
-  {
-    int plugins_not_ready = 0;
-    for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-      plugins_not_ready += give_shm_to_plugin (ppos, position, map_size, 
fsize, operation_mode);
-    for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-      ask_in_process_plugin (ppos, shm_ptr, proc, proc_cls);
-    while (plugins_not_ready > 0 && !kill_plugins)
-    {
-      int ready = wait_for_reply (plugins, proc, proc_cls);
-      if (ready <= 0)
-        kill_plugins = 1;
-      plugins_not_ready -= ready;
-    }
-  }
-  else
-  {
-    read_result = cfs_read (cfs, preserve);
-    if (read_result > 0)
-    while (1)
-    {
-      int plugins_not_ready = 0;
-
-      map_size = cfs->shm_buf_size;
-      for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-        plugins_not_ready += give_shm_to_plugin (ppos, position, map_size, 
cfs->uncompressed_size, operation_mode);
-      /* Can't block in in-process plugins, unless we ONLY have one plugin */
-      if (plugin_count == 1)
-        for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-        {
-          /* Pass this way. we'll need it to call cfs functions later on */
-          /* This is a special case */
-          ppos->pass_cfs = cfs;
-          ask_in_process_plugin (ppos, cfs->shm_ptr, proc, proc_cls);
-        }
-      while (plugins_not_ready > 0 && !kill_plugins)
+    plugin_count = 0;
+    for (pos = plugins; NULL != pos; pos = pos->next)
       {
-        int ready = wait_for_reply (plugins, proc, proc_cls);
-        if (ready <= 0)
-          kill_plugins = 1;
-        plugins_not_ready -= ready;
+       channels[plugin_count] = pos->channel;
+       if ( (NULL != pos->channel) &&
+            (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
+                                                &start,
+                                                sizeof (start)) ) )
+         {
+           channels[plugin_count] = NULL;
+           EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+           pos->channel = NULL;
+         }
+       plugin_count++;
       }
-      if (kill_plugins)
-        break;
-      position = seek_to_new_position (plugins, cfs, position, map_size);
-      if (position < 0 || position == cfs->uncompressed_size)
-        break;
-    }
-  }
+    done = 0;
+    while (! done)
+      {
+       done = 1;
 
-  if (kill_plugins)
-    for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-      stop_process (ppos);
-  for (ppos = plugins; NULL != ppos; ppos = ppos->next)
-    discard_plugin_state (ppos);
-
-  if (operation_mode == OPMODE_MEMORY)
-  {
-#if WINDOWS
-    destroy_shm_w32 (shm_ptr, map_handle);
-#else
-    destroy_shm_posix (shm_ptr, shm_id, (fd == -1) ? fsize : MAX_READ, 
shm_name);
-#endif
+       // FIXME: need to handle 'seek' messages from plugins somewhere
+       if (-1 == 
+           EXTRACTOR_IPC_channel_recv_ (channels,
+                                        plugin_count,
+                                        &process_plugin_reply,
+                                        &prp))
+         break;
+       plugin_count = 0;
+       for (pos = plugins; NULL != pos; pos = pos->next)
+         {
+           channel = channels[plugin_count];
+           // ... FIXME ...
+           plugin_count++;
+         }
+       // FIXME: need to terminate once all plugins are done...
+       done = 0;
+      }
   }
-  else if (operation_mode == OPMODE_FILE)
-  {
-#if WINDOWS
-    destroy_file_backed_shm_w32 (map_handle);
-#endif
-  }
 }
 
 
@@ -699,14 +647,34 @@
                   void *proc_cls)
 {
   struct EXTRACTOR_Datasource *datasource;
+  struct EXTRACTOR_SharedMemory *shm;
+  struct EXTRACTOR_PluginList *pos;
 
+  if (NULL == plugins)
+    return;
   if (NULL == filename)
-    datasource = EXTRACTOR_datasource_create_from_buffer_ (data, size);
+    datasource = EXTRACTOR_datasource_create_from_buffer_ (data, size,
+                                                          proc, proc_cls);
   else
-    datasource = EXTRACTOR_datasource_create_from_file_ (filename);
+    datasource = EXTRACTOR_datasource_create_from_file_ (filename,
+                                                        proc, proc_cls);
   if (NULL == datasource)
-    return;
-  do_extract (plugins, datasource, proc, proc_cls);
+    return;  
+  shm = NULL;
+  for (pos = plugins; NULL != pos; pos = pos->next)
+    if (NULL != (shm = pos->shm))
+      break;
+  if (NULL == shm)
+    shm = EXTRACTOR_IPC_shared_memory_create_ (DEFAULT_SHM_SIZE);
+  for (pos = plugins; NULL != pos; pos = pos->next)
+    if ( (NULL == pos->shm) &&
+        (0 == (pos->flags & EXTRACTOR_OPTION_IN_PROCESS)) )
+      {
+       pos->shm = shm;
+       pos->channel = EXTRACTOR_IPC_channel_create_ (pos,
+                                                     shm);
+      }
+  do_extract (plugins, shm, datasource, proc, proc_cls);
   EXTRACTOR_datasource_destroy_ (datasource);
 }
 
@@ -721,7 +689,6 @@
 
 #if ENABLE_NLS
   BINDTEXTDOMAIN (PACKAGE, LOCALEDIR);
-  BINDTEXTDOMAIN ("iso-639", ISOLOCALEDIR); /* used by wordextractor */
 #endif
   err = lt_dlinit ();
   if (err > 0) 

Modified: Extractor/src/main/extractor_plugins.c
===================================================================
--- Extractor/src/main/extractor_plugins.c      2012-07-24 19:48:28 UTC (rev 
22878)
+++ Extractor/src/main/extractor_plugins.c      2012-07-24 21:45:25 UTC (rev 
22879)
@@ -380,6 +380,8 @@
     prev->next = pos->next;
   if (NULL != pos->channel)
     EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+  // FIXME: need to also destroy pos->shm if this is
+  // the last user; need to add some RC to the SHM!
   free (pos->short_libname);
   free (pos->libname);
   free (pos->plugin_options);

Modified: Extractor/src/main/extractor_plugins.h
===================================================================
--- Extractor/src/main/extractor_plugins.h      2012-07-24 19:48:28 UTC (rev 
22878)
+++ Extractor/src/main/extractor_plugins.h      2012-07-24 21:45:25 UTC (rev 
22879)
@@ -78,11 +78,16 @@
   const char *specials;
 
   /**
-   * Channel to communicate with out-of-process plugin.
+   * Channel to communicate with out-of-process plugin, NULL if not setup.
    */
   struct EXTRACTOR_Channel *channel;
 
   /**
+   * Memory segment shared with the channel of this plugin, NULL for none.
+   */
+  struct EXTRACTOR_SharedMemory *shm;
+
+  /**
    * A position this plugin wants us to seek to. -1 if it's finished.
    * Starts at 0.
    */




reply via email to

[Prev in Thread] Current Thread [Next in Thread]