[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 197/324: mq-impl/stream: Stop all fibers when EOF is rea
From: |
gnunet |
Subject: |
[gnunet-scheme] 197/324: mq-impl/stream: Stop all fibers when EOF is reached (part 2). |
Date: |
Tue, 21 Sep 2021 13:23:57 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit 032d7fba309363249e861cc3378c308733aeca0b
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Thu Aug 26 13:57:02 2021 +0200
mq-impl/stream: Stop all fibers when EOF is reached (part 2).
* gnu/gnunet/mq-impl/stream.scm
(handle-input!): Allow returning errors instead of injecting them.
Update documentation.
(handle-output!): Detect EPIPE.
(port->message-queue)[closed-condition]: Document what it is used
for.
(port->message-queue): Exit when the write fiber is done and the
read fiber is blocking. Only inject errors if the other fiber
hasn't done so already.
* tests/mq-stream.scm
("closed for writing --> handle-input!
stops (port->message-queue)"): New test.
---
gnu/gnunet/mq-impl/stream.scm | 91 ++++++++++++++++++++++++++++++++-----------
tests/mq-stream.scm | 45 +++++++++++++++++++++
2 files changed, 113 insertions(+), 23 deletions(-)
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index 5120bbb..a144f96 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -47,24 +47,32 @@
(only (gnu gnunet util time)
standard-back-off time-unit:second)
(only (rnrs base)
- begin define let values quote
+ begin define let values quote apply
assert = or and eq?
list if lambda car /)
(only (rnrs arithmetic bitwise)
bitwise-ior)
(only (rnrs exceptions)
guard)
+ (only (rnrs control)
+ when)
(only (fibers)
sleep spawn-fiber)
+ (only (fibers io-wakeup)
+ wait-until-port-readable-operation)
(only (fibers conditions)
make-condition signal-condition! wait-operation)
(only (fibers operations)
wrap-operation perform-operation choice-operation)
(srfi srfi-26)
+ (only (srfi srfi-39)
+ parameterize)
+ (only (ice-9 suspendable-ports)
+ current-read-waiter)
(only (guile)
- error define* identity
+ error define* identity define-values
EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG EAGAIN ECONNREFUSED
- EPROTOTYPE
+ EPROTOTYPE EPIPE
PF_UNIX SOCK_STREAM F_GETFD F_SETFD F_GETFL F_SETFL FD_CLOEXEC
O_NONBLOCK AF_UNIX
socket connect fcntl
@@ -99,7 +107,12 @@ parameterise the current write waiter and install exception
handlers."
((already-sent) (error "tried to send an envelope twice"))))
;; TODO: maybe note that this procedure blocks?
- (define (handle-input! mq input)
+ ;; TODO: rework callers and documentation
+ ;; to remove 'return' argument
+ (define* (handle-input! mq input #:key
+ (return (lambda error
+ (apply inject-error! mq error)
+ (values))))
"Keep reading message from the input port @var{input}.
Feed each read message in-order to @var{mq} with @code{inject-message!}.
@@ -112,11 +125,12 @@ into @var{mq}, where @var{type} is the message type as an
integer
(or @code{#f} if it could not be determined) and @var{size} is the
message size in the header.
-When the first [1] end-of-file has been reached, inject the error
+When the end-of-file has been reached, inject the error
@code{input:regular-end-of-file} into @var{mq}. If the end-of-file
happened while inside a (partial) message, inject
-@code{input:premature-end-of-file} instead. In case of an I/O error,
-TODO.
+@code{input:premature-end-of-file} instead.
+
+In case of an I/O error, TODO.
In these exceptional cases, the call to this procedure also returns
after injecting the error. TODO closing message queues."
@@ -128,14 +142,11 @@ after injecting the error. TODO closing message queues."
(slice/read-only
(bv-slice/read-write bv offset length))))
(! (return/overly-small type size)
- (inject-error! mq 'input:overly-small type size)
- (values))
+ (return 'input:overly-small type size))
(! (return/premature-eof)
- (inject-error! mq 'input:premature-end-of-file)
- (values))
+ (return 'input:premature-end-of-file))
(! (return/done-eof)
- (inject-error! mq 'input:regular-end-of-file)
- (values)))
+ (return 'input:regular-end-of-file)))
(add-from-port! tok input handle/message return/overly-small
return/done-eof return/premature-eof)))
@@ -151,18 +162,27 @@ When using guile-fibers, @var{wait!} can be implemented
with
@code{await-trigger!} and by calling @code{trigger-condition!}
from the ‘message sender’ of @var{mq}.
+When the port @var{output} has been closed for writing, this procedure
+returns. This is detected with the @code{EPIPE} error, so don't block
+@code{SIGPIPE} signals.
+
+TODO: detect it has been closed even when not actually writing,
+with EPOLLERR -- needs fibers support.
TODO: closing, destroying @var{mq}, @var{output}."
(define (one-by-one-proc ev)
(write-envelope! output ev))
(define send-round
(cute (make-one-by-one-sender one-by-one-proc)
mq))
- (let loop ()
- ;; Doing 'wait!' or 'send-round' the other way around
- ;; should be acceptable as well.
- (send-round)
- (wait!)
- (loop)))
+ (guard (c ((and (eq? 'system-error (exception-kind c))
+ (= EPIPE (car (list-ref (exception-args c) 3))))
+ (values)))
+ (let loop ()
+ ;; Doing 'wait!' or 'send-round' the other way around
+ ;; should be acceptable as well.
+ (send-round)
+ (wait!)
+ (loop))))
;; See, e.g., the Linux man page path_resolution(7).
(define %path-resolution-errors
@@ -241,16 +261,39 @@ This creates some fibers with @var{spawn}
(@code{spawn-fiber} by default).
As such, @var{port} must be non-blocking if @code{spawn-fiber} is used.
All fibers will complete when the end-of-file has been encountered.
-XXX: likewise for both reading and writing? Likewise for connect/fibers?"
+When the connection is broken, the error @code{input:regular-end-of-file}
+is injected. A half-duplex port is treated as a broken connection.
+XXX: half-duplex connections cannot always be detected
+XXX: Likewise for connect/fibers?"
;; TODO: closing message queues
(define rcvar (make-repeated-condition))
(define (interrupt! mq)
(trigger-condition! rcvar))
+ ;; 'closed-condition' is used to coordinate the termination of the
+ ;; two fibers. When one fiber detects an EOF condition (or half-duplex),
+ ;; it informs the other fiber by signalling the condition and injects
+ ;; an appropriate error, unless the other fiber will do it already.
(define closed-condition (make-condition))
(define mq (make-message-queue handlers error-handler interrupt!))
(spawn (lambda ()
- (handle-input! mq port)
- (signal-condition! closed-condition)))
+ (define-values (key . rest)
+ (let/ec escape
+ (define wait-op
+ (choice-operation
+ (wait-until-port-readable-operation port)
+ (wrap-operation (wait-operation closed-condition)
+ (lambda ()
+ (escape 'input:regular-end-fof-file)))))
+ (define (new-waiter . _)
+ (perform-operation wait-op))
+ ;; XXX: if (define-values error ...) is written and
+ ;; 'handle-input!' raises an error (resulting in a
backtrace),
+ ;; a segfault can
+ ;; happen:
<https://debbugs.gnu.org/cgi/bugreport.cgi?bug=50153>.
+ (parameterize ((current-read-waiter new-waiter))
+ (handle-input! mq port #:return values))))
+ (when (signal-condition! closed-condition)
+ (apply inject-error! mq key rest))))
(spawn (lambda ()
(let/ec escape
(define escape-when-closed-operation
@@ -261,7 +304,9 @@ XXX: likewise for both reading and writing? Likewise for
connect/fibers?"
(choice-operation
(prepare-await-trigger! rcvar)
escape-when-closed-operation)))
- (handle-output! mq port wait!))))
+ (handle-output! mq port wait!)
+ (when (signal-condition! closed-condition)
+ (inject-error! mq 'input:regular-end-of-file)))))
mq)
(define* (connect/fibers config service-name handlers error-handler
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index a4ca313..8ad905f 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -509,4 +509,49 @@
;; drain? is broken when parallelism is enabled, see <???>.
#:parallelism 1))
+(test-assert "closed for writing --> handle-input! stops (port->message-queue)"
+ (call-with-spawner
+ (lambda (spawn)
+ (define-values (alpha beta) (two-sockets))
+ (define received? (make-atomic-box #f))
+ (define end-of-file (make-condition))
+ (define (receive! slice)
+ (assert (not (atomic-box-ref received?)))
+ (atomic-box-set! received? #t)
+ (error "shouldn't be received"))
+ (define (error-handler . e)
+ (pk 'e e)
+ (assert (equal? e '(input:regular-end-of-file)))
+ ;; only one end-of-file notification
+ (assert (signal-condition! end-of-file)))
+ (define mq/alpha
+ (port->message-queue alpha
+ (message-handlers
+ (simple-handler 0 receive!))
+ error-handler
+ #:spawn spawn))
+ ;; Give the new fibers a chance to block.
+ (yield-many)
+ ;; Let 'beta' stop reading, such that 'alpha' is closed for writing.
+ ;; But keep the 'read' end of 'alpha' open to complicate matters.
+ (shutdown beta 0)
+ ;; TODO: fibers doesn't have an option for waiting for EPOLLRDHUP
+ ;; or EPOLLERR, so the code cannot immediately detect that 'alpha'
+ ;; cannot be written to anymore. Instead, 'handle-output!' will
+ ;; detect the unwritability when it tries to write something.
+ (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 9)))
+ ;; The end-of-file error should be injected, even though the socket
+ ;; is half-duplex and only the write end is closed, because message
+ ;; queues do not have a notion of half-duplex connections.
+ (pk 'waiting)
+ (wait end-of-file)
+ ;; Attempt to read a message (after buffering a message), even though
+ ;; the connection is half-closed.
+ (put-bytevector beta #vu8(0 4 0 0))
+ ;; As the 'handle-input!' fiber should have exited already, 'receive!'
+ ;; shouldn't be called.
+ (yield-many)
+ (sleep 0.1) ; might not be necessary anymore
+ #t)))
+
(test-end "mq-stream")
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 185/324: tests/utils: Use set-value! instead of hashtable-set!., (continued)
- [gnunet-scheme] 185/324: tests/utils: Use set-value! instead of hashtable-set!., gnunet, 2021/09/21
- [gnunet-scheme] 163/324: util/time: Add time units and implement bounded exponential back-off., gnunet, 2021/09/21
- [gnunet-scheme] 169/324: mq-stream: Allow turning ports into message queues., gnunet, 2021/09/21
- [gnunet-scheme] 177/324: nse/client: Implement connecting to the NSE service., gnunet, 2021/09/21
- [gnunet-scheme] 181/324: ROADMAP: Start a TODO list for version 0.1., gnunet, 2021/09/21
- [gnunet-scheme] 183/324: tests/utils: Use a better hash function., gnunet, 2021/09/21
- [gnunet-scheme] 182/324: doc: Document the asynchronuousity of connecting., gnunet, 2021/09/21
- [gnunet-scheme] 189/324: ROADMAP: Mark ‘Document NSE’ as done, gnunet, 2021/09/21
- [gnunet-scheme] 188/324: nse/client: Document the optionality of callbacks., gnunet, 2021/09/21
- [gnunet-scheme] 187/324: doc: Document the ‘network size estimation’ API., gnunet, 2021/09/21
- [gnunet-scheme] 197/324: mq-impl/stream: Stop all fibers when EOF is reached (part 2).,
gnunet <=
- [gnunet-scheme] 165/324: mq-impl/stream: Implement connecting to unix sockets., gnunet, 2021/09/21
- [gnunet-scheme] 171/324: util/struct: Define /time-absolute., gnunet, 2021/09/21
- [gnunet-scheme] 168/324: README: Remove paragraph about avoiding callbacks., gnunet, 2021/09/21
- [gnunet-scheme] 175/324: mq/handler: Handle the case where no handler exists., gnunet, 2021/09/21
- [gnunet-scheme] 178/324: Makefile.am: Compile with more optimisations., gnunet, 2021/09/21
- [gnunet-scheme] 174/324: nse/struct: Add missing imports., gnunet, 2021/09/21
- [gnunet-scheme] 186/324: nse: Allow 'updated' to be absent., gnunet, 2021/09/21
- [gnunet-scheme] 166/324: guix: Use fixed version of guile., gnunet, 2021/09/21
- [gnunet-scheme] 176/324: tests/utils: New utilities for tests., gnunet, 2021/09/21
- [gnunet-scheme] 172/324: crypto/struct: Define /ecc-signature-purpose., gnunet, 2021/09/21