gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] branch master updated (8751c9c -> e037695)


From: gnunet
Subject: [gnunet-scheme] branch master updated (8751c9c -> e037695)
Date: Wed, 16 Feb 2022 22:19:06 +0100

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a change to branch master
in repository gnunet-scheme.

    from 8751c9c  tests: Test reconnecting and 'start-get!'.
     new bee15c1  dht/client: Remove resolved TODO.
     new 9eba8c6  dht/client: Put weak references in id->operation-map.
     new 7ac903a  concurrency/lost-and-found: Make guarding objects optional.
     new e037695  dht/client: Cancel unreachable non-lingering search 
operations.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 doc/distributed-hash-table.tm             |  12 ++-
 gnu/gnunet/concurrency/lost-and-found.scm |  12 +--
 gnu/gnunet/dht/client.scm                 | 139 +++++++++++++++++++++++-------
 3 files changed, 126 insertions(+), 37 deletions(-)

diff --git a/doc/distributed-hash-table.tm b/doc/distributed-hash-table.tm
index 784c804..b5b41f0 100644
--- a/doc/distributed-hash-table.tm
+++ b/doc/distributed-hash-table.tm
@@ -136,7 +136,8 @@
   used.<index|searching the DHT><index|inserting data into the DHT>
 
   <\explain>
-    <scm|(start-get! <var|server> <var|query> <var|found>)><index|start-get!>
+    <scm|(start-get! <var|server> <var|query> <var|found>
+    <var|#:linger?>=#true)><index|start-get!>
   <|explain>
     Search for data matching <var|query> in the DHT. When a datum is found,
     call the unary procedure <var|found> on the search result. It is possible
@@ -154,6 +155,15 @@
     internal buffers for the slices passed to <var|found>, which could be
     overwritten after the call to <var|found>. As such, it might be necessary
     to make a copy of the search result, using <scm|copy-search-result>.
+
+    When the boolean <var|linger?> is false, the search is automatically
+    cancelled when the search object becomes unreachable according to the GC.
+    <todo|actually implement this>
+
+    <\warning>
+      Guile currently (3.0.8) uses a conservative GC, so it cannot always
+      detect unreachability when it should.
+    </warning>
   </explain>
 
   <\explain>
diff --git a/gnu/gnunet/concurrency/lost-and-found.scm 
b/gnu/gnunet/concurrency/lost-and-found.scm
index 8975240..050bd4a 100644
--- a/gnu/gnunet/concurrency/lost-and-found.scm
+++ b/gnu/gnunet/concurrency/lost-and-found.scm
@@ -216,15 +216,17 @@ wakeups where the empty list is returned are possible."
 
     (define *guard* (make-guardian))
 
+    ;; TODO: test the 'lost-and-found=#false' case.
     (define-record-type (<losable> make-losable losable?)
       (fields (immutable lost-and-found losable-lost-and-found))
       (sealed #false)
       (protocol (lambda (%make)
-                 (lambda (lost-and-found)
-                   (assert (lost-and-found? lost-and-found))
-                   (let ((object (%make lost-and-found)))
-                     (*guard* object)
-                     object)))))
+                 (lambda (lost-and-found)
+                   (let ((object (%make lost-and-found)))
+                     (when lost-and-found
+                       (assert (lost-and-found? lost-and-found))
+                       (*guard* object))
+                     object)))))
 
     (define (collect-lost)
       (define object (*guard*))
diff --git a/gnu/gnunet/dht/client.scm b/gnu/gnunet/dht/client.scm
index e6ccc9c..9d37f0f 100644
--- a/gnu/gnunet/dht/client.scm
+++ b/gnu/gnunet/dht/client.scm
@@ -79,11 +79,14 @@
          (only (guile)
                pk define-syntax-rule define* lambda* error
                make-hash-table hashq-set! hashq-remove! hashv-set! hashv-ref
-               hashv-remove! hash-clear! hash-map->list)
+               hashv-remove! hash-clear! hash-map->list
+               ->bool and=>)
          (only (ice-9 atomic)
                make-atomic-box atomic-box-ref atomic-box-set!)
          (only (ice-9 match)
                match)
+         (only (ice-9 weak-vector)
+               weak-vector weak-vector-ref weak-vector?)
          (only (gnu extractor enum)
                symbol-value)
          (only (fibers)
@@ -96,6 +99,8 @@
                make-channel put-operation get-operation get-message 
put-message)
          (only (gnu gnunet mq error-reporting)
                report-error)
+         (only (gnu gnunet concurrency lost-and-found)
+               make-lost-and-found collect-lost-and-found-operation)
          (gnu gnunet dht struct)
          (only (gnu gnunet message protocols)
                message-type)
@@ -109,10 +114,10 @@
                cut-syntax)
          (only (rnrs base)
                and >= = quote * / + - define begin ... let*
-               quote case else values apply let cond if >
+               quote case else values apply let cond if > eq?
                <= expt assert exact? integer? lambda for-each
                not expt min max div-and-mod positive? define-syntax
-               vector)
+               vector cons)
          (only (rnrs control)
                unless when)
          (only (rnrs records syntactic)
@@ -579,6 +584,7 @@ currently unsupported."
       ;; terminal-condition: a disconnect has been requested
       (fields (immutable terminal-condition server-terminal-condition)
              (immutable control-channel server-control-channel)
+             (immutable lost-and-found server-lost-and-found)
              ;; Atomic box holding an unsigned 64-bit integer.
              (immutable next-unique-id/box server-next-unique-id/box)))
 
@@ -599,11 +605,28 @@ do anything if @var{server} has been permanently 
disconnected."
             (server-control-channel server) message))
 
     (define-record-type (<get> %make-get get?)
+      (parent <losable>)
       (fields (immutable server get:server)
              (immutable found get:iterator) ; procedure accepting 
<search-result>
              (immutable query get:query) ; <query>
              (immutable unique-id get:unique-id)
-             (immutable options get:options)))
+             (immutable options get:options)
+             ;; TODO: test if non-lingering actually works.
+             ;;
+             ;; If #false, 'reconnect' does not keep a strong reference to the
+             ;; search object and 'reconnect' will automatically cancel the
+             ;; search when the search object becomes unreachable.
+             ;;
+             ;; If #true, the search will not be automatically cancelled;
+             ;; 'reconnect' keeps a strong reference.
+             (immutable linger? get:linger?))
+      (protocol (lambda (%make)
+                 (lambda (server found query unique-id options linger?)
+                   ;; When not lingering, add this search object to the lost
+                   ;; and found, such that it will eventually be cancelled.
+                   ((%make (and (not linger?)
+                                (server-lost-and-found server)))
+                    server found query unique-id options linger?)))))
 
     (define-record-type (<put> %make-put put?)
       (fields (immutable server put:server)
@@ -654,7 +677,8 @@ do anything if @var{server} has been permanently 
disconnected."
             (assert (block-type? type))
             (value->index type))))
 
-    (define* (start-get! server query found)
+    ;; TODO(tests): Disable lingering by default, test linger?=#false
+    (define* (start-get! server query found #:key (linger? #true))
       "Search for data matching query in the DHT. When a datum is found, call
 the unary procedure @var{found} on the search result. It is possible to find
 multiple data matching a query. In that case, found is called multiple times.
@@ -669,12 +693,16 @@ to a separate fiber.
 To avoid expensive copies, the implementation can choose to reuse internal
 buffers for the slices passed to @var{found}, which could be overwritten after
 the call to @var{found}. As such, it might be necessary to make a copy of the
-search result, using @lisp{copy-search-result}."
+search result, using @lisp{copy-search-result}.
+
+When the boolean @var{linger?} is false, the search is automatically
+cancelled when the search object becomes unreachable according to the GC.
+TODO: implement this behaviour!"
       ;; TODO: options, xquery ...
       (define id (fresh-id server))
-      (define handle (%make-get server found query id 0)) ; TODO: options
-      ;; TODO: send 'found', 'id' and 'options' instead of sending the handle
-      ;; that has a reference to 'server' (for guardian reasons, TODO).
+      (define options 0) ; TODO: allow setting some options
+      (when linger? (assert (eq? linger? #true)))
+      (define handle (%make-get server found query id 0 (->bool linger?)))
       (maybe-send-control-message! server 'start-get! handle)
       handle)
 
@@ -735,18 +763,29 @@ code automatically tries to reconnect, so @var{connected} 
can be called after
       (define terminal-condition (make-condition))
       (define old-id->operation-map (make-hash-table))
       (define control-channel (make-channel))
+      (define lost-and-found (make-lost-and-found))
       (reconnect terminal-condition config
-                old-id->operation-map control-channel
+                old-id->operation-map control-channel lost-and-found
                 #:connected connected
                 #:disconnected disconnected
                 #:spawn spawn)
-      (%make-server terminal-condition control-channel
+      (%make-server terminal-condition control-channel lost-and-found
                    ;; Any ‘small’ exact natural number will do.
                    (make-atomic-box 0)))
 
-    ;; TODO(id->operation-map): Hash tables are thread-unsafe.
+    ;; TODO: put in new module?
+    (define (make-weak-reference to)
+      (weak-vector to))
+    (define (make-strong-reference to)
+      (assert (not (weak-vector? to)))
+      to)
+    (define (dereference reference)
+      (if (weak-vector? reference)
+         (weak-vector-ref reference 0)
+         reference))
+
     (define* (reconnect terminal-condition config
-                       old-id->operation-map control-channel
+                       old-id->operation-map control-channel lost-and-found
                        #:key (spawn spawn-fiber)
                        connected disconnected
                        #:rest rest)
@@ -758,6 +797,16 @@ code automatically tries to reconnect, so @var{connected} 
can be called after
       ;;
       ;; To avoid races, 'id->operation-map' and 'old-id->operation-map'
       ;; are only accessed from 'control'.
+      ;;
+      ;; To allow cancelling operations when they become unreachable, 
operations
+      ;; are wrapped in a weak reference (unless linger? is #true).  Otherwise,
+      ;; they won't ever become unreachable.  Keep in mind that, at least in
+      ;; Guile 3.0.7, weak references are broken when the object is returned
+      ;; from the guardian (and probably earlier) -- this seems to be a
+      ;; difficult to fix bug.
+      ;;
+      ;; This code is written to support both the correct and incorrect 
behaviour
+      ;; of guardians+weak vectors.
       (define id->operation-map (make-hash-table))
       (define (request-search-result-iterator unique-id)
        "Ask @code{control} what is the iterator for the get operation with
@@ -822,9 +871,10 @@ operation is cancelled, return @code{#false} instead."
                       (analyse-client-result slice))
                  (! handle (request-search-result-iterator unique-id))
                  (? (not handle)
-                    ;; Where did this unique id come from?
-                    (pk 'unique-id unique-id)
-                    TODO-error-reporting/1)
+                    ;; Perhaps the search object became unreachable;
+                    ;; 'process-stop-search' (see next commit) will be
+                    ;; called soon to inform the DHT service.
+                    (values))
                  (? (get? handle)
                     ;; TODO might not be true once monitoring operations
                     ;; are supported.
@@ -862,6 +912,24 @@ operation is cancelled, return @code{#false} instead."
       (define mq (connect/fibers config "dht" handlers error-handler
                                 #:spawn spawn))
       (signal-condition! mq-defined)
+      (define (handle-lost handle)
+       ;; TODO: monitoring operations, put operations ...
+       (match handle
+         ((? get? get) (process-stop-search get))))
+      (define (process-stop-search get)
+       ;; TODO: tests!
+       ;; TODO: cancel outstanding messages to the DHT services for this
+       ;; get operation (including the request to start searching), if
+       ;; any.
+       (hashv-remove! old-id->operation-map (get:unique-id get))
+       (when (hashv-ref id->operation-map (get:unique-id get))
+         (hashv-remove! id->operation-map (get:unique-id get))
+         (send-stop-get! mq get)))
+      (define loop-operation
+       (choice-operation
+        (get-operation control-channel)
+        (wrap-operation (collect-lost-and-found-operation lost-and-found)
+                        (lambda (lost) (cons 'lost lost)))))
       (define (control)
        "The main event loop."
        (match (perform-operation (get-operation control-channel))
@@ -885,24 +953,20 @@ operation is cancelled, return @code{#false} instead."
          (('reconnect!)
           ;; Restart the loop with a new message queue.
           (apply reconnect terminal-condition config id->operation-map
-                 control-channel rest))
+                 control-channel lost-and-found rest))
          (('start-get! get)
           ;; Register the new get operation, such that we remember
           ;; where to send responses to.
-          (hashv-set! id->operation-map (get:unique-id get) get)
+          (hashv-set! id->operation-map (get:unique-id get)
+                      ((if (get:linger? get)
+                          make-strong-reference
+                          make-weak-reference) get))
           ;; (Asynchronuously) send the GET message.
           (send-get! mq get)
           ;; Continue!
           (control))
          (('stop-search! get)
-          ;; TODO: tests!
-          ;; TODO: cancel outstanding messages to the DHT services for this
-          ;; get operation (including the request to start searching), if
-          ;; any.
-          (hashv-remove! old-id->operation-map (get:unique-id get))
-          (when (hashv-ref id->operation-map (get:unique-id get))
-            (hashv-remove! id->operation-map (get:unique-id get))
-            (send-stop-get! mq get))
+          (process-stop-search get)
           ;; Continue!
           (control))
          (('put! put)
@@ -912,7 +976,9 @@ operation is cancelled, return @code{#false} instead."
           (control))
          ;; Send by @code{request-search-result-iterator}.
          (#('request-search-result-iterator unique-id response-channel)
-          (put-message response-channel (hashv-ref id->operation-map 
unique-id))
+          (put-message response-channel
+                       (and=> (hashv-ref id->operation-map unique-id)
+                              dereference))
           ;; Continue!
           (control))
          (('resend-old-operations!)
@@ -920,16 +986,27 @@ operation is cancelled, return @code{#false} instead."
           ;; again.
           ;;
           ;; TODO: restarting monitoring operations
-          (for-each (lambda (get)
-                      (hashv-set! id->operation-map (get:unique-id get) get)
-                      (send-get! mq get))
+          (for-each (lambda (reference)
+                      (define get (dereference reference))
+                      ;; If the (weak) reference is broken, that means the
+                      ;; operation is unreachable, so then there is no point
+                      ;; to resending the get operation.
+                      (when get
+                        (hashv-set! id->operation-map (get:unique-id get)
+                                    reference)
+                        (send-get! mq get)))
                     ;; XXX: @code{hash-for-each} forms a continuation barrier,
                     ;; so turn the hash table into a list before iterating.
-                    (hash-map->list (lambda (x handle) handle)
+                    (hash-map->list (lambda (x reference) reference)
                                     old-id->operation-map))
           ;; Free some memory.
           (hash-clear! old-id->operation-map)
           ;; Continue!
+          (control))
+         ;; Some handles became unreachable and can be cancelled.
+         (('lost lost)
+          (for-each handle-lost lost)
+          ;; Continue!
           (control))))
       ;; Start the main event loop.
       (spawn control))))

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]