[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 196/324: mq-impl/stream: Stop all fibers when EOF is rea
From: |
gnunet |
Subject: |
[gnunet-scheme] 196/324: mq-impl/stream: Stop all fibers when EOF is reached (part 1). |
Date: |
Tue, 21 Sep 2021 13:23:56 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit 264a4d7ad10931974c9597daf486996387b14882
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Sat Aug 21 15:14:13 2021 +0200
mq-impl/stream: Stop all fibers when EOF is reached (part 1).
* gnu/gnunet/mq-impl/stream.scm
(port->message-queue): Stop the 'handle-output!' fiber when
'handle-input!' completes.
* tests/mq-stream.scm
(call-with-spawner, two-sockets): New procedures.
("input eof detected --> handle/input/output!
stops (port->message-queue)"): New test.
---
gnu/gnunet/mq-impl/stream.scm | 30 ++++++++++++++++-----
tests/mq-stream.scm | 62 ++++++++++++++++++++++++++++++++++++++++++-
2 files changed, 85 insertions(+), 7 deletions(-)
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index 8f88d1f..5120bbb 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -30,7 +30,8 @@
make-one-by-one-sender inject-message! inject-error!
make-message-queue)
(only (gnu gnunet concurrency repeated-condition)
- make-repeated-condition await-trigger! trigger-condition!)
+ make-repeated-condition await-trigger! prepare-await-trigger!
+ trigger-condition!)
(only (gnu gnunet utils bv-slice)
slice-bv slice-offset slice-length
slice-readable? bv-slice/read-write
@@ -56,7 +57,9 @@
(only (fibers)
sleep spawn-fiber)
(only (fibers conditions)
- make-condition signal-condition! wait)
+ make-condition signal-condition! wait-operation)
+ (only (fibers operations)
+ wrap-operation perform-operation choice-operation)
(srfi srfi-26)
(only (guile)
error define* identity
@@ -71,6 +74,8 @@
put-bytevector)
(only (ice-9 atomic)
make-atomic-box atomic-box-set! atomic-box-ref)
+ (only (ice-9 control)
+ let/ec)
(only (srfi srfi-1)
memv list-ref)
(srfi srfi-26))
@@ -233,17 +238,30 @@ an appropriate @code{&undefined-key-error} is raised."
"Create a message queue sending and receiving data over @var{port}.
This creates some fibers with @var{spawn} (@code{spawn-fiber} by default).
-As such, @var{port} must be non-blocking if @code{spawn-fiber} is used."
+As such, @var{port} must be non-blocking if @code{spawn-fiber} is used.
+All fibers will complete when the end-of-file has been encountered.
+
+XXX: likewise for both reading and writing? Likewise for connect/fibers?"
;; TODO: closing message queues
(define rcvar (make-repeated-condition))
(define (interrupt! mq)
(trigger-condition! rcvar))
- (define wait! (cut await-trigger! rcvar))
+ (define closed-condition (make-condition))
(define mq (make-message-queue handlers error-handler interrupt!))
(spawn (lambda ()
- (handle-input! mq port)))
+ (handle-input! mq port)
+ (signal-condition! closed-condition)))
(spawn (lambda ()
- (handle-output! mq port wait!)))
+ (let/ec escape
+ (define escape-when-closed-operation
+ (wrap-operation (wait-operation closed-condition)
+ escape))
+ (define (wait!)
+ (perform-operation
+ (choice-operation
+ (prepare-await-trigger! rcvar)
+ escape-when-closed-operation)))
+ (handle-output! mq port wait!))))
mq)
(define* (connect/fibers config service-name handlers error-handler
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index dc1e015..a4ca313 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -34,11 +34,13 @@
(srfi srfi-26)
(srfi srfi-43)
(rnrs io ports)
+ (ice-9 atomic)
(ice-9 binary-ports)
(ice-9 suspendable-ports)
(ice-9 control)
(ice-9 match)
- (ice-9 threads))
+ (ice-9 threads)
+ (tests utils))
(define (no-sender . _)
(error "no sender!"))
@@ -449,4 +451,62 @@
(wait done/1)
#t)))
+(define* (call-with-spawner proc . args)
+ (apply run-fibers
+ (lambda ()
+ (call-with-services
+ '()
+ (lambda (config spawn)
+ (proc spawn))))
+ args))
+
+(define (two-sockets)
+ (define sp (socketpair AF_UNIX SOCK_STREAM 0))
+ (fcntl (car sp) F_SETFL
+ (bitwise-ior (fcntl (car sp) F_GETFL) O_NONBLOCK))
+ (fcntl (cdr sp) F_SETFL
+ (bitwise-ior (fcntl (cdr sp) F_GETFL) O_NONBLOCK))
+ (values (car sp) (cdr sp)))
+
+(test-assert "input eof detected --> handle-input/output! stops
(port->message-queue)"
+ (call-with-spawner
+ (lambda (spawn)
+ (define-values (alpha beta) (two-sockets))
+ (define end-of-file (make-condition))
+ (define (error-handler . e)
+ (assert (equal? e '(input:regular-end-of-file)))
+ ;; only one end-of-file notification
+ (assert (signal-condition! end-of-file)))
+ (define mq/alpha
+ (port->message-queue alpha no-handlers error-handler
+ #:spawn spawn))
+ ;; Give the fibers started by 'port->message-queue' a chance to block.
+ (yield-many)
+ ;; Let 'beta' stop writing, such that 'alpha' receives an end-of-file.
+ ;; But keep the 'write' end of 'alpha' / 'read' end of 'beta' open to
+ ;; complicate matters.
+ (shutdown beta 1)
+ (wait end-of-file)
+ (define sent? (make-atomic-box #f))
+ ;; Attempt to write a message, even though the connection is
(half-)closed.
+ ;; It should not actually be sent.
+ (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 0))
+ #:notify-sent!
+ (lambda ()
+ ;; strictly speaking, this does not mean the message was
+ ;; sent, but it's close enough for this test's purposes.
+ (atomic-box-set! sent? #t)))
+ ;; Give 'handle-output!' a chance to (faultively) sent the message.
+ (yield-many)
+ (sleep 0.1) ; the yield-many above is apparently insufficient
+ (assert (not (atomic-box-ref sent?)))
+ ;; If it didn't try to sent the message, that presumably means the
+ ;; 'handle-output!' fiber has completed.
+ #t)
+ ;; Wait until all runnable fibers complete
+ ;; -- TODO this does not seem to include blocking handle-input/output!.
+ #:drain? #t
+ ;; drain? is broken when parallelism is enabled, see <???>.
+ #:parallelism 1))
+
(test-end "mq-stream")
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 208/324: tests/mq-stream: Don't assume setvbuf returns anything., (continued)
- [gnunet-scheme] 208/324: tests/mq-stream: Don't assume setvbuf returns anything., gnunet, 2021/09/21
- [gnunet-scheme] 212/324: mq/handler: Define a macro for constructing handlers., gnunet, 2021/09/21
- [gnunet-scheme] 220/324: mq-impl/stream: Extract code to be shared with connect/fibers., gnunet, 2021/09/21
- [gnunet-scheme] 226/324: doc: Generate PDF and HTML documentation, gnunet, 2021/09/21
- [gnunet-scheme] 227/324: mq: Remove TODOs about hypothetical &malformed-message., gnunet, 2021/09/21
- [gnunet-scheme] 229/324: guix: Import missing module., gnunet, 2021/09/21
- [gnunet-scheme] 194/324: mq-impl/stream: Eliminate atomic box., gnunet, 2021/09/21
- [gnunet-scheme] 191/324: doc: Document message queue error handling., gnunet, 2021/09/21
- [gnunet-scheme] 216/324: mq: Inject errors if no appropriate message handler exists., gnunet, 2021/09/21
- [gnunet-scheme] 190/324: Correct XXX and TODO on input:regular-end-of-file., gnunet, 2021/09/21
- [gnunet-scheme] 196/324: mq-impl/stream: Stop all fibers when EOF is reached (part 1).,
gnunet <=
- [gnunet-scheme] 201/324: tests/mq-stream: Make test more strict., gnunet, 2021/09/21
- [gnunet-scheme] 204/324: mq-impl/stream: Document implementation pitfall., gnunet, 2021/09/21
- [gnunet-scheme] 214/324: tests/mq: Use 'message-handler' macro., gnunet, 2021/09/21
- [gnunet-scheme] 217/324: tests/mq: Spam the log less., gnunet, 2021/09/21
- [gnunet-scheme] 233/324: doc: Document the message type database a little., gnunet, 2021/09/21
- [gnunet-scheme] 232/324: doc: Partially document handler procedures and interposers., gnunet, 2021/09/21
- [gnunet-scheme] 239/324: tests/network-size: Don't let the GC close port., gnunet, 2021/09/21
- [gnunet-scheme] 241/324: nse/client: Add a 'disconnected' callback., gnunet, 2021/09/21
- [gnunet-scheme] 195/324: mq-impl/stream: Reduce nesting., gnunet, 2021/09/21
- [gnunet-scheme] 200/324: mq-impl/stream: Make error injection less unobviously correct., gnunet, 2021/09/21