gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r14622 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r14622 - gnunet/src/fs
Date: Thu, 10 Mar 2011 10:36:50 +0100

Author: grothoff
Date: 2011-03-10 10:36:50 +0100 (Thu, 10 Mar 2011)
New Revision: 14622

Modified:
   gnunet/src/fs/gnunet-service-fs_new.c
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_pr.h
Log:
stuff

Modified: gnunet/src/fs/gnunet-service-fs_new.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_new.c       2011-03-09 19:07:13 UTC (rev 
14621)
+++ gnunet/src/fs/gnunet-service-fs_new.c       2011-03-10 09:36:50 UTC (rev 
14622)
@@ -27,7 +27,7 @@
  * - GSF_plan_get_ (!)
  * - GSF_plan_size_ (?)
  * - GSF_plan_notify_request_done (!)
- * - 
+ * - consider re-issue GSF_dht_lookup_ after non-DHT reply received 
  *
  *
  */
@@ -224,6 +224,49 @@
 
 
 /**
+ * We have a new request, consider forwarding it to the given
+ * peer.
+ *
+ * @param cls the 'struct GSF_PendingRequest'
+ * @param peer identity of the peer
+ * @param cp handle to the connected peer record
+ * @param perf peer performance data
+ */
+static void
+consider_request_for_forwarding (void *cls,
+                                const struct GNUNET_PeerIdentity *peer,
+                                struct GSF_ConnectedPeer *cp,
+                                const struct GSF_PeerPerformanceData *ppd)
+{
+  struct GSF_PendingRequest *pr = cls;
+
+  plan (cp, pr);
+}
+
+
+/**
+ * Function to be called after we're done processing
+ * replies from the local lookup.  If the result status
+ * code indicates that there may be more replies, plan
+ * forwarding the request.
+ *
+ * @param cls closure (NULL)
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
+ */
+static void
+consider_forwarding (void *cls,
+                    struct GSF_PendingRequest *pr,
+                    enum GNUNET_BLOCK_EvaluationResult result)
+{
+  if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
+    return; /* we're done... */
+  GSF_iterate_connected_peers_ (&consider_request_for_forwarding,
+                               pr);
+}
+
+
+/**
  * Handle P2P "GET" request.
  *
  * @param cls closure, always NULL
@@ -244,31 +287,37 @@
 
   pr = GSF_handle_p2p_query_ (other, message);
   if (NULL == pr)
-    return GNUNET_SYSERR;    
-  /* FIXME: local lookup! */
-  /* FIXME: after local lookup, trigger forwarding/routing! */
+    return GNUNET_SYSERR;
+  GSF_local_lookup_ (pr, 
+                    &consider_forwarding,
+                    NULL);
   return GNUNET_OK;
 }
 
 
 /**
- * We have a new request, consider forwarding it to the given
- * peer.
+ * We're done with the local lookup, now consider
+ * P2P processing (depending on request options and
+ * result status).  Also signal that we can now 
+ * receive more request information from the client.
  *
- * @param cls the 'struct GSF_PendingRequest'
- * @param peer identity of the peer
- * @param cp handle to the connected peer record
- * @param perf peer performance data
+ * @param cls the client doing the request ('struct GNUNET_SERVER_Client')
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
  */
 static void
-consider_request_for_forwarding (void *cls,
-                                const struct GNUNET_PeerIdentity *peer,
-                                struct GSF_ConnectedPeer *cp,
-                                const struct GSF_PeerPerformanceData *ppd)
+start_p2p_processing (void *cls,
+                     struct GSF_PendingRequest *pr,
+                     enum GNUNET_BLOCK_EvaluationResult result)
 {
-  struct GSF_PendingRequest *pr = cls;
+  struct GNUNET_SERVER_Client *client = cls;
 
-  plan (cp, pr);
+  GNUNET_SERVER_receive_done (client,
+                             GNUNET_OK);
+  if (GNUNET_BLOCK_EVALUATION_OK_LAST == result)
+    return; /* we're done... */
+  GSF_dht_lookup_ (pr);
+  consider_forwarding (NULL, pr, result);
 }
 
 
@@ -292,28 +341,9 @@
       /* 'GNUNET_SERVER_receive_done was already called! */
       return;
     }
-  /* FIXME: local lookup, then (after DB done!) receive_done: */
-  GNUNET_SERVER_receive_done (client,
-                             GNUNET_OK);
-#if 0
-  /* FIXME: also do DHT lookup */
-  struct GNUNET_DHT_GetHandle *gh;
-  /* store 'gh' with 'pr', cancel it on pr destruction, etc. */
-  gh = GNUNET_DHT_get_start (GSF_dht,
-                            timeout,
-                            type,
-                            key,
-                            des_repl_level,
-                            options,
-                            bf,
-                            bf_mutator,
-                            xquery,
-                            xquery_size,
-                            &GSF_handle_dht_reply_,
-                            pr);
-#endif
-  GSF_iterate_connected_peers_ (&consider_request_for_forwarding,
-                               pr);
+  GSF_local_lookup_ (pr, 
+                    &start_p2p_processing,
+                    client);
 }
 
 

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2011-03-09 19:07:13 UTC (rev 
14621)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2011-03-10 09:36:50 UTC (rev 
14622)
@@ -65,6 +65,16 @@
   struct GNUNET_CONTAINER_HeapNode *hnode;
 
   /**
+   * Datastore queue entry for this request (or NULL for none).
+   */
+  struct GNUNET_DATASTORE_QueueEntry *qe;
+
+  /**
+   * DHT request handle for this request (or NULL for none).
+   */
+  struct GNUNET_DHT_GetHandle *gh;
+
+  /**
    * Identity of the peer that we should use for the 'sender'
    * (recipient of the response) when forwarding (0 for none).
    */
@@ -500,6 +510,10 @@
   if (NULL != pr->hnode)
     GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
                                       pr->hnode);
+  if (NULL != pr->qe)
+    GNUNET_DATASTORE_cancel (pr->qe);
+  if (NULL != pr->gh)
+    GNUNET_DHT_get_stop (pr->gh);
   GNUNET_free (pr);
   return GNUNET_YES;
 }
@@ -713,6 +727,10 @@
                                1,
                                GNUNET_NO);      
     }
+  else
+    {     
+      GSF_dht_lookup_ (pr);
+    }
   prq->priority += pr->public_data.original_priority;
   pr->public_data.priority = 0;
   pr->public_data.original_priority = 0;
@@ -799,15 +817,15 @@
  * @param size number of bytes in data
  * @param data pointer to the result data
  */
-void
-GSF_handle_dht_reply_ (void *cls,
-                      struct GNUNET_TIME_Absolute exp,
-                      const GNUNET_HashCode *key,
-                      const struct GNUNET_PeerIdentity * const *get_path,
-                      const struct GNUNET_PeerIdentity * const *put_path,
-                      enum GNUNET_BLOCK_Type type,
-                      size_t size,
-                      const void *data)
+static void
+handle_dht_reply (void *cls,
+                 struct GNUNET_TIME_Absolute exp,
+                 const GNUNET_HashCode *key,
+                 const struct GNUNET_PeerIdentity * const *get_path,
+                 const struct GNUNET_PeerIdentity * const *put_path,
+                 enum GNUNET_BLOCK_Type type,
+                 size_t size,
+                 const void *data)
 {
   struct GSF_PendingRequest *pr = cls;
   struct ProcessReplyClosure prq;
@@ -843,6 +861,247 @@
 
 
 /**
+ * Consider looking up the data in the DHT (anonymity-level permitting).
+ *
+ * @param pr the pending request to process
+ */
+void
+GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
+{
+  const void *xquery;
+  size_t xquery_size;
+  struct GNUNET_PeerIdentity pi;
+  char buf[sizeof (GNUNET_HashCode) * 2];
+
+  if (0 != pr->public_data.anonymity_level)
+    return;
+  if (NULL != pr->gh)
+    {
+      GNUNET_DHT_get_stop (pr->gh);
+      pr->gh = NULL;
+    }
+  xquery = NULL;
+  xquery_size = 0;
+  if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
+    {
+      xquery = buf;
+      memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode));
+      xquery_size = sizeof (GNUNET_HashCode);
+    }
+  if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
+    {
+      GNUNET_PEER_resolve (pr->sender_pid,
+                          &pi);
+      memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
+      xquery_size += sizeof (struct GNUNET_PeerIdentity);
+    }
+  pr->gh = GNUNET_DHT_get_start (GSF_dht,
+                                GNUNET_TIME_UNIT_FOREVER_REL,
+                                pr->public_data.type,
+                                &pr->public_data.query,
+                                DEFAULT_GET_REPLICATION,
+                                GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
+                                pr->bf,
+                                pr->mingle,
+                                xquery,
+                                xquery_size,
+                                &handle_dht_reply,
+                                pr);
+}
+
+
+/**
+ * We're processing (local) results for a search request
+ * from another peer.  Pass applicable results to the
+ * peer and if we are done either clean up (operation
+ * complete) or forward to other peers (more results possible).
+ *
+ * @param cls our closure (struct LocalGetContext)
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ */
+static void
+process_local_reply (void *cls,
+                    const GNUNET_HashCode * key,
+                    size_t size,
+                    const void *data,
+                    enum GNUNET_BLOCK_Type type,
+                    uint32_t priority,
+                    uint32_t anonymity,
+                    struct GNUNET_TIME_Absolute expiration, 
+                    uint64_t uid)
+{
+#if FIXME
+  struct PendingRequest *pr = cls;
+  struct ProcessReplyClosure prq;
+  struct CheckDuplicateRequestClosure cdrc;
+  GNUNET_HashCode query;
+  unsigned int old_rf;
+  
+  if (NULL == key)
+    {
+#if DEBUG_FS > 1
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Done processing local replies, forwarding request to other 
peers.\n");
+#endif
+      pr->qe = NULL;
+      if (pr->client_request_list != NULL)
+       {
+         GNUNET_SERVER_receive_done 
(pr->client_request_list->client_list->client, 
+                                     GNUNET_YES);
+         /* Figure out if this is a duplicate request and possibly
+            merge 'struct PendingRequest' entries */
+         cdrc.have = NULL;
+         cdrc.pr = pr;
+         GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
+                                                     &pr->query,
+                                                     
&check_duplicate_request_client,
+                                                     &cdrc);
+         if (cdrc.have != NULL)
+           {
+#if DEBUG_FS
+             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                         "Received request for block `%s' twice from client, 
will only request once.\n",
+                         GNUNET_h2s (&pr->query));
+#endif
+             
+             destroy_pending_request (pr);
+             return;
+           }
+       }
+      if (pr->local_only == GNUNET_YES)
+       {
+         destroy_pending_request (pr);
+         return;
+       }
+      /* no more results */
+      if (pr->task == GNUNET_SCHEDULER_NO_TASK)
+       pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
+                                            pr);      
+      return;
+    }
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "New local response to `%s' of type %u.\n",
+             GNUNET_h2s (key),
+             type);
+#endif
+  if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
+    {
+#if DEBUG_FS
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Found ONDEMAND block, performing on-demand encoding\n");
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# on-demand blocks matched 
requests"),
+                               1,
+                               GNUNET_NO);
+      if (GNUNET_OK != 
+         GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
+                                           anonymity, expiration, uid, 
+                                           &process_local_reply,
+                                           pr))
+      if (pr->qe != NULL)
+       {
+         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+       }
+      return;
+    }
+  old_rf = pr->results_found;
+  memset (&prq, 0, sizeof (prq));
+  prq.data = data;
+  prq.expiration = expiration;
+  prq.size = size;  
+  if (GNUNET_OK != 
+      GNUNET_BLOCK_get_key (block_ctx,
+                           type,
+                           data,
+                           size,
+                           &query))
+    {
+      GNUNET_break (0);
+      GNUNET_DATASTORE_remove (dsh,
+                              key,
+                              size, data,
+                              -1, -1, 
+                              GNUNET_TIME_UNIT_FOREVER_REL,
+                              NULL, NULL);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+      return;
+    }
+  prq.type = type;
+  prq.priority = priority;  
+  prq.finished = GNUNET_NO;
+  prq.request_found = GNUNET_NO;
+  prq.anonymity_level = anonymity;
+  if ( (old_rf == 0) &&
+       (pr->results_found == 0) )
+    update_datastore_delays (pr->start_time);
+  process_reply (&prq, key, pr);
+  if (prq.finished == GNUNET_YES)
+    return;
+  if (pr->qe == NULL)
+    return; /* done here */
+  if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
+    {
+      pr->local_only = GNUNET_YES; /* do not forward */
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      return;
+    }
+  if ( (pr->client_request_list == NULL) &&
+       ( (GNUNET_YES == test_get_load_too_high (0)) ||
+        (pr->results_found > 5 + 2 * pr->priority) ) )
+    {
+#if DEBUG_FS > 2
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Load too high, done with request\n");
+#endif
+      GNUNET_STATISTICS_update (stats,
+                               gettext_noop ("# processing result set cut 
short due to load"),
+                               1,
+                               GNUNET_NO);
+      GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
+      return;
+    }
+  GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
+#endif
+}
+
+
+/**
+ * Look up the request in the local datastore.
+ *
+ * @param pr the pending request to process
+ * @param cont function to call at the end
+ * @param cont_cls closure for cont
+ */
+void
+GSF_local_lookup_ (struct GSF_PendingRequest *pr,
+                  GSF_LocalLookupContinuation cont,
+                  void *cont_cls)
+{
+  // FIXME: fix process_local_reply / cont!
+  GNUNET_assert (NULL == pr->gh);
+  pr->qe = GNUNET_DATASTORE_get (GSF_dsh,
+                                &pr->public_data.query,
+                                pr->public_data.type,
+                                1 /* queue priority */,
+                                1 /* max queue size */,
+                                GNUNET_TIME_UNIT_FOREVER_REL,
+                                &process_local_reply,
+                                pr);
+}
+
+
+
+/**
  * Handle P2P "CONTENT" message.  Checks that the message is
  * well-formed and then checks if there are any pending requests for
  * this content and possibly passes it on (to local clients or other

Modified: gnunet/src/fs/gnunet-service-fs_pr.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.h        2011-03-09 19:07:13 UTC (rev 
14621)
+++ gnunet/src/fs/gnunet-service-fs_pr.h        2011-03-10 09:36:50 UTC (rev 
14622)
@@ -299,32 +299,41 @@
 
 
 /**
- * Iterator called on each result obtained for a DHT
- * operation that expects a reply
+ * Consider looking up the data in the DHT (anonymity-level permitting).
  *
- * @param cls closure, the 'struct GSF_PendingRequest *'.
- * @param exp when will this value expire
- * @param key key of the result
- * @param get_path NULL-terminated array of pointers
- *                 to the peers on reverse GET path (or NULL if not recorded)
- * @param put_path NULL-terminated array of pointers
- *                 to the peers on the PUT path (or NULL if not recorded)
- * @param type type of the result
- * @param size number of bytes in data
- * @param data pointer to the result data
+ * @param pr the pending request to process
  */
 void
-GSF_handle_dht_reply_ (void *cls,
-                      struct GNUNET_TIME_Absolute exp,
-                      const GNUNET_HashCode * key,
-                      const struct GNUNET_PeerIdentity * const *get_path,
-                      const struct GNUNET_PeerIdentity * const *put_path,
-                      enum GNUNET_BLOCK_Type type,
-                      size_t size,
-                      const void *data);
+GSF_dht_lookup_ (struct GSF_PendingRequest *pr);
 
 
 /**
+ * Function to be called after we're done processing
+ * replies from the local lookup.
+ *
+ * @param cls closure
+ * @param pr the pending request we were processing
+ * @param result final datastore lookup result
+ */
+typedef void (GSF_LocalLookupContinuation)(void *cls,
+                                          struct GSF_PendingRequest *pr,
+                                          enum GNUNET_BLOCK_EvaluationResult 
result);
+
+
+/**
+ * Look up the request in the local datastore.
+ *
+ * @param pr the pending request to process
+ * @param cont function to call at the end
+ * @param cont_cls closure for cont
+ */
+void
+GSF_local_lookup_ (struct GSF_PendingRequest *pr,
+                  GSF_LocalLookupContinuation cont,
+                  void *cont_cls);
+
+
+/**
  * Setup the subsystem.
  *
  * @param cfg configuration to use




reply via email to

[Prev in Thread] Current Thread [Next in Thread]