gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] branch master updated: dht/client: Simplify concurrency.


From: gnunet
Subject: [gnunet-scheme] branch master updated: dht/client: Simplify concurrency.
Date: Wed, 02 Feb 2022 21:52:27 +0100

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

The following commit(s) were added to refs/heads/master by this push:
     new 57fd1ae  dht/client: Simplify concurrency.
57fd1ae is described below

commit 57fd1ae5ec5d0e48745ccdb9cce7c29755f0b79c
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Wed Feb 2 20:47:52 2022 +0000

    dht/client: Simplify concurrency.
    
    A TODO: eventually stop the control fiber.
    
    * gnu/gnunet/dht/client.scm (<server>): Remove the
      'new-get-operations', 'new-get-operation-trigger',
      'new-put-operations' and 'new-put-operation-trigger' fields.  Add a
      'control-channel' field.
      (maybe-send-control-message!): New procedure.
      (start-get!): Don't use 'new-get-operations' and friends, instead send a
      message to the control channel.
      (put!): Likewise.
      (connect): Don't set the removed fields and adjust the call to
      %make-server.
      (reconnect): Remove removed fields.  Add 'mq-defined' variable.
      Restart old requests in the 'connection:connected' error handler.
      Eliminate the 'process-new-get-operations' and
      'process-new-put-operations' fiber, move the functionality into a
      new and much simpler 'control' fiber.
---
 gnu/gnunet/dht/client.scm | 143 +++++++++++++++++++---------------------------
 1 file changed, 58 insertions(+), 85 deletions(-)

diff --git a/gnu/gnunet/dht/client.scm b/gnu/gnunet/dht/client.scm
index 1a29531..8e6527d 100644
--- a/gnu/gnunet/dht/client.scm
+++ b/gnu/gnunet/dht/client.scm
@@ -69,7 +69,6 @@
          stop-monitor!)
   (import (gnu extractor enum)
          (gnu gnunet block)
-         (gnu gnunet concurrency repeated-condition)
          (gnu gnunet hashcode struct)
          (gnu gnunet mq)
          (gnu gnunet mq handler)
@@ -81,14 +80,18 @@
                hash-map->list)
          (only (ice-9 atomic)
                make-atomic-box atomic-box-ref atomic-box-set!)
+         (only (ice-9 match)
+               match)
          (only (gnu extractor enum)
                symbol-value)
          (only (fibers)
                spawn-fiber)
          (only (fibers conditions)
-               make-condition signal-condition! wait-operation)
+               make-condition signal-condition! wait-operation wait)
          (only (fibers operations)
                perform-operation choice-operation)
+         (only (fibers channels)
+               make-channel put-operation get-operation)
          (only (gnu gnunet mq error-reporting)
                report-error)
          (gnu gnunet dht struct)
@@ -548,40 +551,30 @@ currently unsupported."
 
     
 
-    ;; New get or put operations are initially in new-get-operations or
-    ;; new-put-operation, and not in id->operation-map.  They are moved
-    ;; in the background by 'process-new-get-operations' and
-    ;; 'process-new-put-operations'.
-    ;;
+    ;; New operations are communicated to the main event loop
+    ;; via the control channel, using 'maybe-send-control-message!'.
     ;; Operations must be put in id->operation-map before sending them
     ;; to the service!
     (define-record-type (<server> %make-server server?)
       (fields (immutable request-close?/box server-request-close?/box)
              (immutable request-close-condition
                         server-request-close-condition)
-             ;; Hash table from new <get> to #true.  These get operations
-             ;; are not yet sent to the services, and not yet queued for
-             ;; sending.  Guile's hash tables are thread safe, so no locking
-             ;; is required.
-             (immutable new-get-operations server-new-get-operations)
-             ;; After adding new entries to 'new-get-operations', this
-             ;; ‘repeated condition’ is triggered to interrupt the fiber
-             ;; responsible for processing the new get operations.
-             (immutable new-get-operaton-trigger
-                        server-new-get-operation-trigger)
-             ;; Hash table from new <put> to #true.  These put operations
-             ;; are not yet sent to the service, and not yet queued for
-             ;; sending.
-             (immutable new-put-operations
-                        server-new-put-operations)
-             (immutable new-put-operation-trigger
-                        server-new-put-operation-trigger)
+             (immutable control-channel server-control-channel)
              ;; Atomic box holding an unsigned 64-bit integer.
              (immutable next-unique-id/box server-next-unique-id/box)
              ;; Hash table from operation ids to their corresponding
              ;; <get> object.
              (immutable id->operation-map server-id->operation-map)))
 
+    (define (maybe-send-control-message! server . message)
+      "Send @var{message} to the control channel of @var{server}, or don't
+do anything if @var{server} has been permanently disconnected."
+      (perform-operation
+       (choice-operation
+       ;; Nothing to do when the permanently disconnected!
+       (wait-operation (server-request-close-condition server))
+       (put-operation (server-control-channel server) message))))
+
     (define-record-type (<get> %make-get get?)
       (fields (immutable server get:server)
              (immutable found get:iterator) ; procedure accepting 
<search-result>
@@ -650,10 +643,9 @@ search result, using @lisp{copy-search-result}."
       ;; TODO: options, xquery ...
       (define id (fresh-id server))
       (define handle (%make-get server found query id 0)) ; TODO: options
-      ;; Tell 'process-new-get-operations' about the new get operation.
-      ;; That fiber will take care of putting it into the operation map.
-      (hashq-set! (server-new-get-operations server) handle #t)
-      (trigger-condition! (server-new-get-operation-trigger server))
+      ;; TODO: send 'found', 'id' and 'options' instead of sending the handle
+      ;; that has a reference to 'server' (for guardian reasons, TODO).
+      (maybe-send-control-message! server 'start-get! handle)
       handle)
 
     (define* (put! server insertion #:key (confirmed values))
@@ -665,8 +657,8 @@ TODO actually call @var{confirmed}"
       ;; Prepare the message to send.  TODO: options
       (define message (construct-client-put insertion))
       (define handle (%make-put server confirmed message))
-      (hashq-set! (server-new-put-operations server) handle #t)
-      (trigger-condition! (server-new-put-operation-trigger server))
+      ;; TODO: see start-get!
+      (maybe-send-control-message! server 'put! handle)
       handle)
 
     (define-syntax-rule (well-formed?/path-length slice type (field ...) 
compare)
@@ -700,28 +692,20 @@ even if not connected.  This is an idempotent operation."
       "Connect to the DHT service in the background."
       (define request-close?/box (make-atomic-box #f))
       (define request-close-condition (make-condition))
-      (define new-get-operation-trigger (make-repeated-condition))
-      (define new-get-operations (make-hash-table))
-      (define new-put-operation-trigger (make-repeated-condition))
-      (define new-put-operations (make-hash-table))
       (define id->operation-map (make-hash-table))
-      (reconnect new-get-operations new-get-operation-trigger
-                new-put-operations new-put-operation-trigger
-                request-close?/box request-close-condition config
-                id->operation-map
+      (define control-channel (make-channel))
+      (reconnect request-close?/box request-close-condition config
+                id->operation-map control-channel
                 #:connected connected
                 #:spawn spawn)
       (%make-server request-close?/box request-close-condition
-                   new-get-operations new-get-operation-trigger
-                   new-put-operations new-put-operation-trigger
+                   control-channel
                    ;; Any ‘small’ exact natural number will do.
                    (make-atomic-box 0)
                    id->operation-map))
 
-    (define* (reconnect new-get-operations new-get-operation-trigger
-                       new-put-operations new-put-operation-trigger
-                       request-close?/box request-close-condition config
-                       id->operation-map
+    (define* (reconnect request-close?/box request-close-condition config
+                       id->operation-map control-channel
                        #:key (spawn spawn-fiber)
                        connected
                        #:rest rest)
@@ -780,22 +764,30 @@ even if not connected.  This is an idempotent operation."
                 TODO-error-reporting/2)))))
       ;; TODO: abstract duplication in (gnu gnunet nse client)
       (define mq-closed (make-condition))
+      (define mq-defined (make-condition))
       (define (error-handler error . arguments)
        (case error
          ((connection:connected)
           (connected)
-          ;; TODO: resume old requests
-          (pk 'todo-connected)
-          'todo)
+          (wait mq-defined)
+          ;; Resume old requests.  Only get operations need to be submitted
+          ;; again.
+          ;;
+          ;; TODO: restarting monitoring operations
+          (for-each (lambda (get)
+                      (send-get! mq get))
+                    ;; XXX: @code{hash-for-each} forms a continuation barrier,
+                    ;; so turn the hash table into a list before iterating.
+                    (hash-map->list (lambda (x handle) handle)
+                                    id->operation-map))
+          (values))
          ;; TODO: signal (and wait for) current fibers to stop?
          ((input:regular-end-of-file input:premature-end-of-file)
           (signal-condition! mq-closed)
           (unless (atomic-box-ref request-close?/box)
             (apply reconnect
-                   new-get-operations new-get-operation-trigger
-                   new-put-operations new-put-operation-trigger
                    request-close?/box request-close-condition
-                   config id->operation-map rest)))
+                   config id->operation-map control-channel rest)))
          ;; TODO: is this cargo-copying from (gnu gnunet nse client)
          ;; correct?
          ((connection:interrupted)
@@ -810,40 +802,21 @@ even if not connected.  This is an idempotent operation."
          ;; Make sure the fiber exits after a reconnect.
          (wait-operation mq-closed)))
        (close-queue! mq))
-      (define (process-new-get-operations)
-       "Process newly-added get operations, that still need to be communicate
-to the DHT service."
-       (await-trigger! new-get-operation-trigger)
-       (pk 'newstuff!)
-       ;; Extract the latest new operations ...
-       (define new (hash-map->list (lambda (get _) get) new-get-operations))
-       ;; remove them from the list of new operations and add them
-       ;; to the hash table of operations ...
-       (for-each (lambda (get)
-                   (hashq-remove! new-get-operations get)
-                   (hashq-set! id->operation-map (get:unique-id get) get))
-                 new)
-       ;; and (asynchronuously) sent the GET message
-       (for-each (lambda (get) (send-get! mq get)) new)
-       ;; TODO reconnection, closing queues and cancelling get operations,
-       ;; processing answers ...
-       (process-new-get-operations))
-      ;; TODO: remove duplication with process-new-get-operations
-      (define (process-new-put-operations)
-       (await-trigger! new-put-operation-trigger)
-       ;; Extract the latest new put operations
-       (define new (hash-map->list (lambda (put _) put) new-put-operations))
-       ;; And remove them from the hash table
-       (for-each (lambda (put) (hashq-remove! new-put-operations put)) new)
-       ;; and (asynchronuously) sent the PUT message
-       (for-each (lambda (put) (send-message! mq (put:message put))) new)
-       ;; TODO notify-sent callbacks, closing queues, cancelling put 
operations,
-       ;; processing answers ...
-       (process-new-put-operations))
       (define mq (connect/fibers config "dht" handlers error-handler
                                 #:spawn spawn))
-      (spawn request-close-handler)
-      (spawn process-new-get-operations)
-      (spawn process-new-put-operations)
-      ;; TODO: use new-get-operations
-      'todo)))
+      (signal-condition! mq-defined)
+      (define (control)
+       "The main event loop."
+       (match (perform-operation (get-operation control-channel))
+         (('start-get! get)
+          ;; Register the new get operation, such that we remember
+          ;; where to send responses to.
+          (hashv-set! id->operation-map (get:unique-id get) get)
+          ;; (Asynchronuously) send the GET message
+          (send-get! mq get))
+         (('put! put)
+          ;; Send the put operation to the DHT service.
+          (send-message! mq (put:message put))))
+       (control))
+      ;; Start the main event loop.
+      (spawn control))))

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]