[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 202/324: mq-impl/stream: Allow the write fiber to stop e
From: |
gnunet |
Subject: |
[gnunet-scheme] 202/324: mq-impl/stream: Allow the write fiber to stop even if blocking. |
Date: |
Tue, 21 Sep 2021 13:24:02 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit 27d839b48cd39527338f2cd85d8c9fa67755f076
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Fri Aug 27 20:23:05 2021 +0200
mq-impl/stream: Allow the write fiber to stop even if blocking.
* gnu/gnunet/mq-impl/stream.scm
(port->message-queue): Interpose a 'current-write-waiter' that exits
when 'closed-condition' is signalled.
* tests/mq-stream.scm
(call-with-spawner/wait): New procedure.
("writer blocking and closed for reading --> all fibers stop"): New test.
---
gnu/gnunet/mq-impl/stream.scm | 25 +++++++++++++++--
tests/mq-stream.scm | 65 +++++++++++++++++++++++++++++++++++++++++++
2 files changed, 87 insertions(+), 3 deletions(-)
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index f91df70..14c1931 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -59,7 +59,8 @@
(only (fibers)
sleep spawn-fiber)
(only (fibers io-wakeup)
- wait-until-port-readable-operation)
+ wait-until-port-readable-operation
+ wait-until-port-writable-operation)
(only (fibers conditions)
make-condition signal-condition! wait-operation)
(only (fibers operations)
@@ -68,7 +69,7 @@
(only (srfi srfi-39)
parameterize)
(only (ice-9 suspendable-ports)
- current-read-waiter)
+ current-read-waiter current-write-waiter)
(only (guile)
error define* identity define-values
EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG EAGAIN ECONNREFUSED
@@ -296,15 +297,33 @@ XXX: Likewise for connect/fibers?"
(apply inject-error! mq key rest))))
(spawn (lambda ()
(let/ec escape
+ ;; operation for calling the escape continuation when
+ ;; when the other fiber detected the connection is broken
(define escape-when-closed-operation
(wrap-operation (wait-operation closed-condition)
escape))
+ ;; operation for waiting until the port is writable
+ ;; or the other fiber detected the connection is broken.
+ (define wait-writable-operation
+ (choice-operation
+ escape-when-closed-operation
+ (wait-until-port-writable-operation port)))
(define (wait!)
(perform-operation
(choice-operation
(prepare-await-trigger! rcvar)
escape-when-closed-operation)))
- (handle-output! mq port wait!))
+ (define (wait!/blocking)
+ (perform-operation wait-writable-operation))
+ (define old-waiter (current-write-waiter))
+ (define (new-waiter p)
+ (if (eq? p port)
+ (wait!/blocking)
+ ;; Maybe a backtrace is being printed,
+ ;; 'system-async-mark' is used ...
+ (old-waiter p)))
+ (parameterize ((current-write-waiter new-waiter))
+ (handle-output! mq port wait!)))
(when (signal-condition! closed-condition)
(inject-error! mq 'input:regular-end-of-file))))
mq)
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index b013ddc..ce53f2e 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -460,6 +460,27 @@
(proc spawn))))
args))
+;; When done, wait for every fiber to complete.
+;; Somewhat racy, don't use outside tests.
+(define* (call-with-spawner/wait proc . args)
+ (define h (make-weak-key-hash-table)) ; condition -> nothing in particular
+ (apply call-with-spawner
+ (lambda (spawn/not-waiting)
+ (define (spawn thunk)
+ (define done-condition (make-condition))
+ (hashq-set! h done-condition #f)
+ (spawn/not-waiting
+ (lambda ()
+ (thunk)
+ (signal-condition! done-condition))))
+ (define-values return-values
+ (proc spawn))
+ ;; Make sure every fiber completes before returning.
+ ;; XXX hash-for-each imposes a continuation barrier
+ (for-each wait (hash-map->list (lambda (x y) x) h))
+ (apply values return-values))
+ args))
+
(define (two-sockets)
(define sp (socketpair AF_UNIX SOCK_STREAM 0))
(fcntl (car sp) F_SETFL
@@ -556,4 +577,48 @@
#:drain? #t
#:parallelism 1))
+;; This detects the absence of the parametrisation of 'current-write-waiter'.
+(test-assert "writer blocking and closed for reading --> all fibers stop"
+ (call-with-spawner/wait
+ (lambda (spawn)
+ (define-values (alpha beta) (two-sockets))
+ ;; Fill the writing pipe, such that the writing fiber will block.
+ #;(fcntl alpha SO_SNDBUF 1) ; doesn't work on sockets on Linux ..
+ ;; Simply writing a byte isn't sufficient, as the kernel can
+ ;; impose a minimum buffer size.
+ (define old-waiter (current-write-waiter))
+ (let/ec ec
+ (parameterize ((current-write-waiter
+ (lambda (port)
+ (if (eq? port alpha)
+ (ec)
+ ;; maybe a backtrace
+ (old-waiter port)))))
+ (define bv (make-bytevector 4096))
+ (let loop ()
+ (put-bytevector alpha bv)
+ (loop))))
+ (define closed-condition (make-condition))
+ (define (error-handler e)
+ (assert (eq? e 'input:regular-end-of-file))
+ (unless (signal-condition! closed-condition)
+ (error "already saw end of file")))
+ (define mq (port->message-queue alpha no-handlers error-handler
+ #:spawn spawn))
+ (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
+ (define (notify-sent)
+ ;; if the mq-stream implementation implemented buffering by itself
+ ;; this would actually be possible.
+ (error "impossible, should be blocking by now!"))
+ (send-message! mq
+ (bv-slice/read-write #vu8(0 4 0 0))
+ #:notify-sent! notify-sent)
+ (pk 'send2)
+ ;; Give the write fiber a chance to block.
+ (yield-many)
+ (sleep 0.1)
+ (shutdown alpha 0)
+ #t)))
+;; ^ if this test blocks, that means not all fibers have stopped
+
(test-end "mq-stream")
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 178/324: Makefile.am: Compile with more optimisations., (continued)
- [gnunet-scheme] 178/324: Makefile.am: Compile with more optimisations., gnunet, 2021/09/21
- [gnunet-scheme] 174/324: nse/struct: Add missing imports., gnunet, 2021/09/21
- [gnunet-scheme] 186/324: nse: Allow 'updated' to be absent., gnunet, 2021/09/21
- [gnunet-scheme] 166/324: guix: Use fixed version of guile., gnunet, 2021/09/21
- [gnunet-scheme] 176/324: tests/utils: New utilities for tests., gnunet, 2021/09/21
- [gnunet-scheme] 172/324: crypto/struct: Define /ecc-signature-purpose., gnunet, 2021/09/21
- [gnunet-scheme] 179/324: nse/struct: Document 'timestamp' field of estimates., gnunet, 2021/09/21
- [gnunet-scheme] 192/324: tests/mq-stream: Recognise the 'input:regular-end-of-file' error., gnunet, 2021/09/21
- [gnunet-scheme] 206/324: mq-impl/stream: Flush the output port regularily., gnunet, 2021/09/21
- [gnunet-scheme] 198/324: doc: Document dependencies and how to get the source code., gnunet, 2021/09/21
- [gnunet-scheme] 202/324: mq-impl/stream: Allow the write fiber to stop even if blocking.,
gnunet <=
- [gnunet-scheme] 205/324: tests/mq-stream: Unbreak SIGPIPE signal handler., gnunet, 2021/09/21
- [gnunet-scheme] 203/324: tests/mq-stream: Make tests less fragile., gnunet, 2021/09/21
- [gnunet-scheme] 193/324: mq-impl/stream: Eliminate condition variable., gnunet, 2021/09/21
- [gnunet-scheme] 207/324: hat-let: Allow (dotted) variable lists with <--., gnunet, 2021/09/21
- [gnunet-scheme] 211/324: doc: Correct typo (mesage -> message), gnunet, 2021/09/21
- [gnunet-scheme] 215/324: tests/mq-stream: Use 'message-handler' macro., gnunet, 2021/09/21
- [gnunet-scheme] 218/324: mq-impl/stream: Name the reader and writer thunks., gnunet, 2021/09/21
- [gnunet-scheme] 208/324: tests/mq-stream: Don't assume setvbuf returns anything., gnunet, 2021/09/21
- [gnunet-scheme] 212/324: mq/handler: Define a macro for constructing handlers., gnunet, 2021/09/21
- [gnunet-scheme] 220/324: mq-impl/stream: Extract code to be shared with connect/fibers., gnunet, 2021/09/21