[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 237/324: mq-impl/stream: Allow closing the queue on requ
From: |
gnunet |
Subject: |
[gnunet-scheme] 237/324: mq-impl/stream: Allow closing the queue on request. |
Date: |
Tue, 21 Sep 2021 13:24:37 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit fb8b1e5b3df15b246cd4404dcaa3bb5bf5e90634
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Mon Sep 6 21:53:56 2021 +0200
mq-impl/stream: Allow closing the queue on request.
* gnu/gnunet/mq.scm
(make-message-queue): Accept optional 'closer' argument.
(close-queue!): New procedure.
* gnu/gnunet/mq-impl/stream.scm
(prepare-port-message-queue): Remove fixed TODO. Also return
'close!'.
(prepare-port-message-queue)[request-close-condition]: New variable.
(prepare-port-message-queue)[start-reader!]{wait-op}: Wait on
request-close-condition and escape.
(prepare-port-message-queue)[close!]: New procedure.
(port->message-queue): Accept 'close!' from
'prepare-port-message-queue'. Pass it to the message queue
constructor.
(connect/fibers): Accept 'close!' from
'prepare-port-message-queue'. Pass it to 'make-message-queue'.
Note a TODO.
* tests/mq-stream.scm
("fibers stop and port closed after close! (directly after creation)")
("fibers stop and port closed after close! (some times passes)"):
New tests.
---
gnu/gnunet/mq-impl/stream.scm | 30 ++++++++++++++++++++++--------
gnu/gnunet/mq.scm | 28 +++++++++++++++++++++-------
tests/mq-stream.scm | 22 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 15 deletions(-)
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index 05f6c5a..b122f14 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -261,7 +261,6 @@ an appropriate @code{&undefined-key-error} is raised."
;; See 'port->message-queue'. Also used by connect/fibers.
(define* (prepare-port-message-queue spawn)
- ;; TODO: closing message queues
(define rcvar (make-repeated-condition))
(define (interrupt! mq)
(trigger-condition! rcvar))
@@ -275,12 +274,21 @@ an appropriate @code{&undefined-key-error} is raised."
;; on closed-condition returns #f, as in that case, the other fiber has
;; already done all its I/O and won't need the port anymore.
(define closed-condition (make-condition))
+ ;; 'request-close-condition' is used by 'close!' to stop the read fiber,
+ ;; such that it will signal 'closed-condition', then the 'write-fiber'
+ ;; will also stop and close the port. Closing it from the write fiber
+ ;; would also be possible, but in this implementation, closing happens
+ ;; in the read fiber.
+ (define request-close-condition (make-condition))
(define (start-reader! mq port)
(define-values (key . rest)
(let/ec escape
(define wait-op
(choice-operation
(wait-until-port-readable-operation port)
+ (wrap-operation (wait-operation request-close-condition)
+ (lambda ()
+ (escape 'input:regular-end-of-file)))
(wrap-operation (wait-operation closed-condition)
(lambda ()
(escape 'input:regular-end-fof-file)))))
@@ -337,11 +345,14 @@ an appropriate @code{&undefined-key-error} is raised."
(inject-error! mq 'input:regular-end-of-file)
;; TODO: close-port can block!
(close-port port)))
+ (define (close!)
+ (signal-condition! request-close-condition))
(values (lambda (mq port)
(spawn (lambda () (start-reader! mq port)))
(spawn (lambda () (start-writer! mq port))))
;; Pass this to make-message-queue as the 'sender'.
- interrupt!))
+ interrupt!
+ close!))
(define* (port->message-queue port handlers error-handler
#:key (spawn spawn-fiber))
@@ -354,11 +365,14 @@ All fibers will complete when the end-of-file has been
encountered.
When the connection is broken, the error @code{input:regular-end-of-file}
is injected. A half-duplex port is treated as a broken connection.
XXX: half-duplex connections cannot always be detected
-XXX: Likewise for connect/fibers?"
- (define-values (start-fibers interrupt!)
+XXX: Likewise for connect/fibers?
+
+The port will be closed when the connection is broken or after
+@ode{close-queue!} is called."
+ (define-values (start-fibers interrupt! close!)
(prepare-port-message-queue spawn))
(define mq
- (make-message-queue handlers error-handler interrupt!))
+ (make-message-queue handlers error-handler interrupt! close!))
(start-fibers mq port)
mq)
@@ -374,12 +388,12 @@ When the connection has been established, the error
@code{connection:connected}
(a symbol) is injected into the message queue. When the connection has been
closed by the server (e.g. because the server was stopped or is restarting)
the error @code{input:regular-end-of-file} is injected into the message queue."
- ;; TODO closing message queues
- (define-values (start-fibers interrupt!)
+ (define-values (start-fibers interrupt! close!)
(prepare-port-message-queue spawn))
(define mq
- (make-message-queue handlers error-handler interrupt!))
+ (make-message-queue handlers error-handler interrupt! close!))
(spawn (lambda ()
+ ;; XXX interupt the connection forming when closing
(define socket (connect-unix config service-name))
(inject-error! mq 'connection:connected)
(start-fibers mq socket)))
diff --git a/gnu/gnunet/mq.scm b/gnu/gnunet/mq.scm
index 2746020..e1be5d4 100644
--- a/gnu/gnunet/mq.scm
+++ b/gnu/gnunet/mq.scm
@@ -25,8 +25,6 @@
;; A message queue for GNUnet messages.
;; Messages are made of bytes. In particular,
;; messages must be prefixed by a /:message-header.
-;;
-;; TODO closing ports
(define-library (gnu gnunet mq)
(export <message-queue> make-message-queue message-queue?
make-one-by-one-sender
@@ -34,6 +32,7 @@
message-queue-length
%message-queue-garbagitude
try-send-again!
+ close-queue!
&missing-header-error make-missing-header-error
missing-header-error? missing-header-error-received-size
@@ -57,7 +56,7 @@
/:message-header)
(only (gnu gnunet netstruct syntactic)
sizeof read%)
- (only (guile) define* exact-integer?)
+ (only (guile) lambda* define* exact-integer?)
(only (ice-9 weak-vector)
weak-vector weak-vector-ref)
(only (ice-9 atomic)
@@ -104,10 +103,14 @@
;; right now, though it probably should send them
;; soonish. It can be run at any time, with
;; @code{try-send-again!}.
- (immutable sender message-queue-sender))
+ (immutable sender message-queue-sender)
+ ;; A thunk to ‘close’ the message queue, usually telling the
+ ;; remote peer that no additional data will be transmitted
+ ;; in either direction and stopping associated threads.
+ (immutable closer message-queue-closer))
(protocol
(lambda (%make)
- (lambda (handlers error-handler sender)
+ (lambda* (handlers error-handler sender #:optional (closer values))
"Make a message queue with message handlers @var{handlers}.
The message handlers are expected to handle bytevector slices
@@ -120,13 +123,17 @@ Injected errors are passed to @var{error-handler}, a
variadic procedure.
A list of possible errors can be found in @file{README.org}.
Messages are sent with @var{sender}. It can be created with
-@code{make-one-by-one-sender}."
+@code{make-one-by-one-sender}. Optionally, a @var{closer} procedure can
+be passed. Such a procedure is expected to be idempotent, see
+@code{close-queue!} for details."
;; Predicate does not exist yet ...
#;(assert (message-handlers? handlers))
#;(assert (message-handler? error-handler))
+ (assert (procedure? closer))
(%make handlers error-handler
(make-atomic-box (cons (pfds:make-queue) 0))
- sender)))))
+ sender
+ closer)))))
(define (make-one-by-one-sender proc)
"Make a message sender, sending messages one-by-one with @var{proc}.
@@ -332,6 +339,13 @@ perhaps be sent."
This is expected to be called from the message queue implementation."
((message-queue-sender mq) mq))
+ (define (close-queue! mq)
+ "Close the message queue @var{mq}. The exact semantics are
+implementation-dependent. Conventionally, this frees resources such as
+sockets and threads and is idempotent and non-blocking. There is no
+guarantee that all messages in the queue will be sent before closing."
+ ((message-queue-closer mq)))
+
(define (queue-filter ? queue)
"Construct a queue, based on @var{queue}, restricted to elements
satisfying the predicate @var{?}."
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index e16aa1b..f5a56e6 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -729,4 +729,26 @@ If an EPIPE system error is raised, return #f."
(port-closed? alpha))
#:parallelism 1)) ; to make the use of yield-many less fragile
+(test-assert "fibers stop and port closed after close! (directly after
creation)"
+ (let^ ((<-- (alpha beta) (two-sockets)))
+ (call-with-spawner/wait
+ (lambda (spawn)
+ (define q (port->message-queue alpha no-handlers
error-handler/regular
+ #:spawn spawn))
+ (close-queue! q))
+ #:parallelism 1)
+ (port-closed? alpha)))
+
+(test-assert "fibers stop and port closed after close! (some times passes)"
+ (let^ ((<-- (alpha beta) (two-sockets)))
+ (call-with-spawner/wait
+ (lambda (spawn)
+ (define q (port->message-queue alpha no-handlers
error-handler/regular
+ #:spawn spawn))
+ (yield-many)
+ (sleep 0.01)
+ (close-queue! q))
+ #:parallelism 1)
+ (port-closed? alpha)))
+
(test-end "mq-stream")
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 244/324: doc: Document the 'disconnected' callback., (continued)
- [gnunet-scheme] 244/324: doc: Document the 'disconnected' callback., gnunet, 2021/09/21
- [gnunet-scheme] 246/324: mq,mq-impl: Remove TODOs about allocating memory., gnunet, 2021/09/21
- [gnunet-scheme] 245/324: mq-impl/stream: Eliminate 'return' argument of 'handle-input!'., gnunet, 2021/09/21
- [gnunet-scheme] 251/324: tests/utils: Move call-with-spawner from tests/mq-stream.scm., gnunet, 2021/09/21
- [gnunet-scheme] 221/324: mq-impl/stream: Delay knowing the port., gnunet, 2021/09/21
- [gnunet-scheme] 228/324: doc/fdl: Correct ‘quote’ typography., gnunet, 2021/09/21
- [gnunet-scheme] 231/324: doc: Document message verifiers., gnunet, 2021/09/21
- [gnunet-scheme] 236/324: mq-impl/stream: Close the port when stopping the fibers., gnunet, 2021/09/21
- [gnunet-scheme] 242/324: nse/client: Remove unused fields., gnunet, 2021/09/21
- [gnunet-scheme] 243/324: doc/scheme-gnunet.tm: Correct use of 'connected' and 'updated'., gnunet, 2021/09/21
- [gnunet-scheme] 237/324: mq-impl/stream: Allow closing the queue on request.,
gnunet <=
- [gnunet-scheme] 247/324: nse/client: Only call 'send-start!' after 'mq' has been defined., gnunet, 2021/09/21
- [gnunet-scheme] 248/324: tests/mq-stream: Add missing parenthesis., gnunet, 2021/09/21
- [gnunet-scheme] 249/324: nse/client: Prepare for auto-reconnecting., gnunet, 2021/09/21
- [gnunet-scheme] 250/324: doc: Document that (gnu gnunet nse client) reconnects., gnunet, 2021/09/21
- [gnunet-scheme] 253/324: nse/client: Correct type documentation of <server>., gnunet, 2021/09/21
- [gnunet-scheme] 254/324: doc: Document how to disconnect from the NSE server., gnunet, 2021/09/21
- [gnunet-scheme] 258/324: build: Install compiled Guile modules in appropriate location., gnunet, 2021/09/21
- [gnunet-scheme] 256/324: nse/client: Verify positivity of estimate., gnunet, 2021/09/21
- [gnunet-scheme] 257/324: git: Mark .scm as Scheme files for diffing purposes., gnunet, 2021/09/21
- [gnunet-scheme] 261/324: git: Ignore some files created by Emacs., gnunet, 2021/09/21