[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] branch master updated: dht/client: Simplify concurrency.
From: |
gnunet |
Subject: |
[gnunet-scheme] branch master updated: dht/client: Simplify concurrency. |
Date: |
Wed, 02 Feb 2022 21:52:27 +0100 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
The following commit(s) were added to refs/heads/master by this push:
new 57fd1ae dht/client: Simplify concurrency.
57fd1ae is described below
commit 57fd1ae5ec5d0e48745ccdb9cce7c29755f0b79c
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Wed Feb 2 20:47:52 2022 +0000
dht/client: Simplify concurrency.
A TODO: eventually stop the control fiber.
* gnu/gnunet/dht/client.scm (<server>): Remove the
'new-get-operations', 'new-get-operation-trigger',
'new-put-operations' and 'new-put-operation-trigger' fields. Add a
'control-channel' field.
(maybe-send-control-message!): New procedure.
(start-get!): Don't use 'new-get-operations' and friends, instead send a
message to the control channel.
(put!): Likewise.
(connect): Don't set the removed fields and adjust the call to
%make-server.
(reconnect): Remove removed fields. Add 'mq-defined' variable.
Restart old requests in the 'connection:connected' error handler.
Eliminate the 'process-new-get-operations' and
'process-new-put-operations' fiber, move the functionality into a
new and much simpler 'control' fiber.
---
gnu/gnunet/dht/client.scm | 143 +++++++++++++++++++---------------------------
1 file changed, 58 insertions(+), 85 deletions(-)
diff --git a/gnu/gnunet/dht/client.scm b/gnu/gnunet/dht/client.scm
index 1a29531..8e6527d 100644
--- a/gnu/gnunet/dht/client.scm
+++ b/gnu/gnunet/dht/client.scm
@@ -69,7 +69,6 @@
stop-monitor!)
(import (gnu extractor enum)
(gnu gnunet block)
- (gnu gnunet concurrency repeated-condition)
(gnu gnunet hashcode struct)
(gnu gnunet mq)
(gnu gnunet mq handler)
@@ -81,14 +80,18 @@
hash-map->list)
(only (ice-9 atomic)
make-atomic-box atomic-box-ref atomic-box-set!)
+ (only (ice-9 match)
+ match)
(only (gnu extractor enum)
symbol-value)
(only (fibers)
spawn-fiber)
(only (fibers conditions)
- make-condition signal-condition! wait-operation)
+ make-condition signal-condition! wait-operation wait)
(only (fibers operations)
perform-operation choice-operation)
+ (only (fibers channels)
+ make-channel put-operation get-operation)
(only (gnu gnunet mq error-reporting)
report-error)
(gnu gnunet dht struct)
@@ -548,40 +551,30 @@ currently unsupported."
- ;; New get or put operations are initially in new-get-operations or
- ;; new-put-operation, and not in id->operation-map. They are moved
- ;; in the background by 'process-new-get-operations' and
- ;; 'process-new-put-operations'.
- ;;
+ ;; New operations are communicated to the main event loop
+ ;; via the control channel, using 'maybe-send-control-message!'.
;; Operations must be put in id->operation-map before sending them
;; to the service!
(define-record-type (<server> %make-server server?)
(fields (immutable request-close?/box server-request-close?/box)
(immutable request-close-condition
server-request-close-condition)
- ;; Hash table from new <get> to #true. These get operations
- ;; are not yet sent to the services, and not yet queued for
- ;; sending. Guile's hash tables are thread safe, so no locking
- ;; is required.
- (immutable new-get-operations server-new-get-operations)
- ;; After adding new entries to 'new-get-operations', this
- ;; ‘repeated condition’ is triggered to interrupt the fiber
- ;; responsible for processing the new get operations.
- (immutable new-get-operaton-trigger
- server-new-get-operation-trigger)
- ;; Hash table from new <put> to #true. These put operations
- ;; are not yet sent to the service, and not yet queued for
- ;; sending.
- (immutable new-put-operations
- server-new-put-operations)
- (immutable new-put-operation-trigger
- server-new-put-operation-trigger)
+ (immutable control-channel server-control-channel)
;; Atomic box holding an unsigned 64-bit integer.
(immutable next-unique-id/box server-next-unique-id/box)
;; Hash table from operation ids to their corresponding
;; <get> object.
(immutable id->operation-map server-id->operation-map)))
+ (define (maybe-send-control-message! server . message)
+ "Send @var{message} to the control channel of @var{server}, or don't
+do anything if @var{server} has been permanently disconnected."
+ (perform-operation
+ (choice-operation
+ ;; Nothing to do when the permanently disconnected!
+ (wait-operation (server-request-close-condition server))
+ (put-operation (server-control-channel server) message))))
+
(define-record-type (<get> %make-get get?)
(fields (immutable server get:server)
(immutable found get:iterator) ; procedure accepting
<search-result>
@@ -650,10 +643,9 @@ search result, using @lisp{copy-search-result}."
;; TODO: options, xquery ...
(define id (fresh-id server))
(define handle (%make-get server found query id 0)) ; TODO: options
- ;; Tell 'process-new-get-operations' about the new get operation.
- ;; That fiber will take care of putting it into the operation map.
- (hashq-set! (server-new-get-operations server) handle #t)
- (trigger-condition! (server-new-get-operation-trigger server))
+ ;; TODO: send 'found', 'id' and 'options' instead of sending the handle
+ ;; that has a reference to 'server' (for guardian reasons, TODO).
+ (maybe-send-control-message! server 'start-get! handle)
handle)
(define* (put! server insertion #:key (confirmed values))
@@ -665,8 +657,8 @@ TODO actually call @var{confirmed}"
;; Prepare the message to send. TODO: options
(define message (construct-client-put insertion))
(define handle (%make-put server confirmed message))
- (hashq-set! (server-new-put-operations server) handle #t)
- (trigger-condition! (server-new-put-operation-trigger server))
+ ;; TODO: see start-get!
+ (maybe-send-control-message! server 'put! handle)
handle)
(define-syntax-rule (well-formed?/path-length slice type (field ...)
compare)
@@ -700,28 +692,20 @@ even if not connected. This is an idempotent operation."
"Connect to the DHT service in the background."
(define request-close?/box (make-atomic-box #f))
(define request-close-condition (make-condition))
- (define new-get-operation-trigger (make-repeated-condition))
- (define new-get-operations (make-hash-table))
- (define new-put-operation-trigger (make-repeated-condition))
- (define new-put-operations (make-hash-table))
(define id->operation-map (make-hash-table))
- (reconnect new-get-operations new-get-operation-trigger
- new-put-operations new-put-operation-trigger
- request-close?/box request-close-condition config
- id->operation-map
+ (define control-channel (make-channel))
+ (reconnect request-close?/box request-close-condition config
+ id->operation-map control-channel
#:connected connected
#:spawn spawn)
(%make-server request-close?/box request-close-condition
- new-get-operations new-get-operation-trigger
- new-put-operations new-put-operation-trigger
+ control-channel
;; Any ‘small’ exact natural number will do.
(make-atomic-box 0)
id->operation-map))
- (define* (reconnect new-get-operations new-get-operation-trigger
- new-put-operations new-put-operation-trigger
- request-close?/box request-close-condition config
- id->operation-map
+ (define* (reconnect request-close?/box request-close-condition config
+ id->operation-map control-channel
#:key (spawn spawn-fiber)
connected
#:rest rest)
@@ -780,22 +764,30 @@ even if not connected. This is an idempotent operation."
TODO-error-reporting/2)))))
;; TODO: abstract duplication in (gnu gnunet nse client)
(define mq-closed (make-condition))
+ (define mq-defined (make-condition))
(define (error-handler error . arguments)
(case error
((connection:connected)
(connected)
- ;; TODO: resume old requests
- (pk 'todo-connected)
- 'todo)
+ (wait mq-defined)
+ ;; Resume old requests. Only get operations need to be submitted
+ ;; again.
+ ;;
+ ;; TODO: restarting monitoring operations
+ (for-each (lambda (get)
+ (send-get! mq get))
+ ;; XXX: @code{hash-for-each} forms a continuation barrier,
+ ;; so turn the hash table into a list before iterating.
+ (hash-map->list (lambda (x handle) handle)
+ id->operation-map))
+ (values))
;; TODO: signal (and wait for) current fibers to stop?
((input:regular-end-of-file input:premature-end-of-file)
(signal-condition! mq-closed)
(unless (atomic-box-ref request-close?/box)
(apply reconnect
- new-get-operations new-get-operation-trigger
- new-put-operations new-put-operation-trigger
request-close?/box request-close-condition
- config id->operation-map rest)))
+ config id->operation-map control-channel rest)))
;; TODO: is this cargo-copying from (gnu gnunet nse client)
;; correct?
((connection:interrupted)
@@ -810,40 +802,21 @@ even if not connected. This is an idempotent operation."
;; Make sure the fiber exits after a reconnect.
(wait-operation mq-closed)))
(close-queue! mq))
- (define (process-new-get-operations)
- "Process newly-added get operations, that still need to be communicate
-to the DHT service."
- (await-trigger! new-get-operation-trigger)
- (pk 'newstuff!)
- ;; Extract the latest new operations ...
- (define new (hash-map->list (lambda (get _) get) new-get-operations))
- ;; remove them from the list of new operations and add them
- ;; to the hash table of operations ...
- (for-each (lambda (get)
- (hashq-remove! new-get-operations get)
- (hashq-set! id->operation-map (get:unique-id get) get))
- new)
- ;; and (asynchronuously) sent the GET message
- (for-each (lambda (get) (send-get! mq get)) new)
- ;; TODO reconnection, closing queues and cancelling get operations,
- ;; processing answers ...
- (process-new-get-operations))
- ;; TODO: remove duplication with process-new-get-operations
- (define (process-new-put-operations)
- (await-trigger! new-put-operation-trigger)
- ;; Extract the latest new put operations
- (define new (hash-map->list (lambda (put _) put) new-put-operations))
- ;; And remove them from the hash table
- (for-each (lambda (put) (hashq-remove! new-put-operations put)) new)
- ;; and (asynchronuously) sent the PUT message
- (for-each (lambda (put) (send-message! mq (put:message put))) new)
- ;; TODO notify-sent callbacks, closing queues, cancelling put
operations,
- ;; processing answers ...
- (process-new-put-operations))
(define mq (connect/fibers config "dht" handlers error-handler
#:spawn spawn))
- (spawn request-close-handler)
- (spawn process-new-get-operations)
- (spawn process-new-put-operations)
- ;; TODO: use new-get-operations
- 'todo)))
+ (signal-condition! mq-defined)
+ (define (control)
+ "The main event loop."
+ (match (perform-operation (get-operation control-channel))
+ (('start-get! get)
+ ;; Register the new get operation, such that we remember
+ ;; where to send responses to.
+ (hashv-set! id->operation-map (get:unique-id get) get)
+ ;; (Asynchronuously) send the GET message
+ (send-get! mq get))
+ (('put! put)
+ ;; Send the put operation to the DHT service.
+ (send-message! mq (put:message put))))
+ (control))
+ ;; Start the main event loop.
+ (spawn control))))
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnunet-scheme] branch master updated: dht/client: Simplify concurrency.,
gnunet <=