gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 197/324: mq-impl/stream: Stop all fibers when EOF is rea


From: gnunet
Subject: [gnunet-scheme] 197/324: mq-impl/stream: Stop all fibers when EOF is reached (part 2).
Date: Tue, 21 Sep 2021 13:23:57 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit 032d7fba309363249e861cc3378c308733aeca0b
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Thu Aug 26 13:57:02 2021 +0200

    mq-impl/stream: Stop all fibers when EOF is reached (part 2).
    
    * gnu/gnunet/mq-impl/stream.scm
      (handle-input!): Allow returning errors instead of injecting them.
      Update documentation.
      (handle-output!): Detect EPIPE.
      (port->message-queue)[closed-condition]: Document what it is used
      for.
      (port->message-queue): Exit when the write fiber is done and the
      read fiber is blocking.  Only inject errors if the other fiber
      hasn't done so already.
    * tests/mq-stream.scm
      ("closed for writing --> handle-input!
      stops (port->message-queue)"): New test.
---
 gnu/gnunet/mq-impl/stream.scm | 91 ++++++++++++++++++++++++++++++++-----------
 tests/mq-stream.scm           | 45 +++++++++++++++++++++
 2 files changed, 113 insertions(+), 23 deletions(-)

diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index 5120bbb..a144f96 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -47,24 +47,32 @@
          (only (gnu gnunet util time)
                standard-back-off time-unit:second)
          (only (rnrs base)
-               begin define let values quote
+               begin define let values quote apply
                assert = or and eq?
                list if lambda car /)
          (only (rnrs arithmetic bitwise)
                bitwise-ior)
          (only (rnrs exceptions)
                guard)
+         (only (rnrs control)
+               when)
          (only (fibers)
                sleep spawn-fiber)
+         (only (fibers io-wakeup)
+               wait-until-port-readable-operation)
          (only (fibers conditions)
                make-condition signal-condition! wait-operation)
          (only (fibers operations)
                wrap-operation perform-operation choice-operation)
          (srfi srfi-26)
+         (only (srfi srfi-39)
+               parameterize)
+         (only (ice-9 suspendable-ports)
+               current-read-waiter)
          (only (guile)
-               error define* identity
+               error define* identity define-values
                EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG EAGAIN ECONNREFUSED
-               EPROTOTYPE
+               EPROTOTYPE EPIPE
                PF_UNIX SOCK_STREAM F_GETFD F_SETFD F_GETFL F_SETFL FD_CLOEXEC
                O_NONBLOCK AF_UNIX
                socket connect fcntl
@@ -99,7 +107,12 @@ parameterise the current write waiter and install exception 
handlers."
        ((already-sent) (error "tried to send an envelope twice"))))
 
     ;; TODO: maybe note that this procedure blocks?
-    (define (handle-input! mq input)
+    ;; TODO: rework callers and documentation
+    ;; to remove 'return' argument
+    (define* (handle-input! mq input #:key
+                           (return (lambda error
+                                     (apply inject-error! mq error)
+                                     (values))))
       "Keep reading message from the input port @var{input}.
 
 Feed each read message in-order to @var{mq} with @code{inject-message!}.
@@ -112,11 +125,12 @@ into @var{mq}, where @var{type} is the message type as an 
integer
 (or @code{#f} if it could not be determined) and @var{size} is the
 message size in the header.
 
-When the first [1] end-of-file has been reached, inject the error
+When the end-of-file has been reached, inject the error
 @code{input:regular-end-of-file} into @var{mq}.  If the end-of-file
 happened while inside a (partial) message, inject
-@code{input:premature-end-of-file} instead.  In case of an I/O error,
-TODO.
+@code{input:premature-end-of-file} instead.
+
+In case of an I/O error, TODO.
 
 In these exceptional cases, the call to this procedure also returns
 after injecting the error. TODO closing message queues."
@@ -128,14 +142,11 @@ after injecting the error. TODO closing message queues."
                 (slice/read-only
                  (bv-slice/read-write bv offset length))))
             (! (return/overly-small type size)
-               (inject-error! mq 'input:overly-small type size)
-               (values))
+               (return 'input:overly-small type size))
             (! (return/premature-eof)
-               (inject-error! mq 'input:premature-end-of-file)
-               (values))
+               (return 'input:premature-end-of-file))
             (! (return/done-eof)
-               (inject-error! mq 'input:regular-end-of-file)
-               (values)))
+               (return 'input:regular-end-of-file)))
            (add-from-port! tok input handle/message return/overly-small
                            return/done-eof return/premature-eof)))
 
@@ -151,18 +162,27 @@ When using guile-fibers, @var{wait!} can be implemented 
with
 @code{await-trigger!} and by calling @code{trigger-condition!}
 from the ‘message sender’ of @var{mq}.
 
+When the port @var{output} has been closed for writing, this procedure
+returns.  This is detected with the @code{EPIPE} error, so don't block
+@code{SIGPIPE} signals.
+
+TODO: detect it has been closed even when not actually writing,
+with EPOLLERR -- needs fibers support.
 TODO: closing, destroying @var{mq}, @var{output}."
       (define (one-by-one-proc ev)
        (write-envelope! output ev))
       (define send-round
        (cute (make-one-by-one-sender one-by-one-proc)
              mq))
-      (let loop ()
-       ;; Doing 'wait!' or 'send-round' the other way around
-       ;; should be acceptable as well.
-       (send-round)
-       (wait!)
-       (loop)))
+      (guard (c ((and (eq? 'system-error (exception-kind c))
+                     (= EPIPE (car (list-ref (exception-args c) 3))))
+                (values)))
+       (let loop ()
+         ;; Doing 'wait!' or 'send-round' the other way around
+         ;; should be acceptable as well.
+         (send-round)
+         (wait!)
+         (loop))))
 
     ;; See, e.g., the Linux man page path_resolution(7).
     (define %path-resolution-errors
@@ -241,16 +261,39 @@ This creates some fibers with @var{spawn} 
(@code{spawn-fiber} by default).
 As such, @var{port} must be non-blocking if @code{spawn-fiber} is used.
 All fibers will complete when the end-of-file has been encountered.
 
-XXX: likewise for both reading and writing? Likewise for connect/fibers?"
+When the connection is broken, the error @code{input:regular-end-of-file}
+is injected.  A half-duplex port is treated as a broken connection.
+XXX: half-duplex connections cannot always be detected
+XXX: Likewise for connect/fibers?"
       ;; TODO: closing message queues
       (define rcvar (make-repeated-condition))
       (define (interrupt! mq)
        (trigger-condition! rcvar))
+      ;; 'closed-condition' is used to coordinate the termination of the
+      ;; two fibers.  When one fiber detects an EOF condition (or half-duplex),
+      ;; it informs the other fiber by signalling the condition and injects
+      ;; an appropriate error, unless the other fiber will do it already.
       (define closed-condition (make-condition))
       (define mq (make-message-queue handlers error-handler interrupt!))
       (spawn (lambda ()
-              (handle-input! mq port)
-              (signal-condition! closed-condition)))
+              (define-values (key . rest)
+                (let/ec escape
+                  (define wait-op
+                    (choice-operation
+                     (wait-until-port-readable-operation port)
+                     (wrap-operation (wait-operation closed-condition)
+                                     (lambda ()
+                                       (escape 'input:regular-end-fof-file)))))
+                  (define (new-waiter . _)
+                    (perform-operation wait-op))
+                  ;; XXX: if (define-values error ...) is written and
+                  ;; 'handle-input!' raises an error (resulting in a 
backtrace),
+                  ;; a segfault can
+                  ;; happen: 
<https://debbugs.gnu.org/cgi/bugreport.cgi?bug=50153>.
+                  (parameterize ((current-read-waiter new-waiter))
+                    (handle-input! mq port #:return values))))
+              (when (signal-condition! closed-condition)
+                (apply inject-error! mq key rest))))
       (spawn (lambda ()
               (let/ec escape
                 (define escape-when-closed-operation
@@ -261,7 +304,9 @@ XXX: likewise for both reading and writing? Likewise for 
connect/fibers?"
                    (choice-operation
                     (prepare-await-trigger! rcvar)
                     escape-when-closed-operation)))
-                (handle-output! mq port wait!))))
+                (handle-output! mq port wait!)
+                (when (signal-condition! closed-condition)
+                  (inject-error! mq 'input:regular-end-of-file)))))
       mq)
 
     (define* (connect/fibers config service-name handlers error-handler
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index a4ca313..8ad905f 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -509,4 +509,49 @@
    ;; drain? is broken when parallelism is enabled, see <???>.
    #:parallelism 1))
 
+(test-assert "closed for writing --> handle-input! stops (port->message-queue)"
+  (call-with-spawner
+   (lambda (spawn)
+     (define-values (alpha beta) (two-sockets))
+     (define received? (make-atomic-box #f))
+     (define end-of-file (make-condition))
+     (define (receive! slice)
+       (assert (not (atomic-box-ref received?)))
+       (atomic-box-set! received? #t)
+       (error "shouldn't be received"))
+     (define (error-handler . e)
+       (pk 'e e)
+       (assert (equal? e '(input:regular-end-of-file)))
+       ;; only one end-of-file notification
+       (assert (signal-condition! end-of-file)))
+     (define mq/alpha
+       (port->message-queue alpha
+                           (message-handlers
+                            (simple-handler 0 receive!))
+                           error-handler
+                           #:spawn spawn))
+     ;; Give the new fibers a chance to block.
+     (yield-many)
+     ;; Let 'beta' stop reading, such that 'alpha' is closed for writing.
+     ;; But keep the 'read' end of 'alpha' open to complicate matters.
+     (shutdown beta 0)
+     ;; TODO: fibers doesn't have an option for waiting for EPOLLRDHUP
+     ;; or EPOLLERR, so the code cannot immediately detect that 'alpha'
+     ;; cannot be written to anymore.  Instead, 'handle-output!' will
+     ;; detect the unwritability when it tries to write something.
+     (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 9)))
+     ;; The end-of-file error should be injected, even though the socket
+     ;; is half-duplex and only the write end is closed, because message
+     ;; queues do not have a notion of half-duplex connections.
+     (pk 'waiting)
+     (wait end-of-file)
+     ;; Attempt to read a message (after buffering a message), even though
+     ;; the connection is half-closed.
+     (put-bytevector beta #vu8(0 4 0 0))
+     ;; As the 'handle-input!' fiber should have exited already, 'receive!'
+     ;; shouldn't be called.
+     (yield-many)
+     (sleep 0.1) ; might not be necessary anymore
+     #t)))
+
 (test-end "mq-stream")

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]