gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 202/324: mq-impl/stream: Allow the write fiber to stop e


From: gnunet
Subject: [gnunet-scheme] 202/324: mq-impl/stream: Allow the write fiber to stop even if blocking.
Date: Tue, 21 Sep 2021 13:24:02 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit 27d839b48cd39527338f2cd85d8c9fa67755f076
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Fri Aug 27 20:23:05 2021 +0200

    mq-impl/stream: Allow the write fiber to stop even if blocking.
    
    * gnu/gnunet/mq-impl/stream.scm
      (port->message-queue): Interpose a 'current-write-waiter' that exits
      when 'closed-condition' is signalled.
    * tests/mq-stream.scm
      (call-with-spawner/wait): New procedure.
      ("writer blocking and closed for reading --> all fibers stop"): New test.
---
 gnu/gnunet/mq-impl/stream.scm | 25 +++++++++++++++--
 tests/mq-stream.scm           | 65 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 87 insertions(+), 3 deletions(-)

diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index f91df70..14c1931 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -59,7 +59,8 @@
          (only (fibers)
                sleep spawn-fiber)
          (only (fibers io-wakeup)
-               wait-until-port-readable-operation)
+               wait-until-port-readable-operation
+               wait-until-port-writable-operation)
          (only (fibers conditions)
                make-condition signal-condition! wait-operation)
          (only (fibers operations)
@@ -68,7 +69,7 @@
          (only (srfi srfi-39)
                parameterize)
          (only (ice-9 suspendable-ports)
-               current-read-waiter)
+               current-read-waiter current-write-waiter)
          (only (guile)
                error define* identity define-values
                EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG EAGAIN ECONNREFUSED
@@ -296,15 +297,33 @@ XXX: Likewise for connect/fibers?"
                 (apply inject-error! mq key rest))))
       (spawn (lambda ()
               (let/ec escape
+                ;; operation for calling the escape continuation when
+                ;; when the other fiber detected the connection is broken
                 (define escape-when-closed-operation
                   (wrap-operation (wait-operation closed-condition)
                                   escape))
+                ;; operation for waiting until the port is writable
+                ;; or the other fiber detected the connection is broken.
+                (define wait-writable-operation
+                  (choice-operation
+                   escape-when-closed-operation
+                   (wait-until-port-writable-operation port)))
                 (define (wait!)
                   (perform-operation
                    (choice-operation
                     (prepare-await-trigger! rcvar)
                     escape-when-closed-operation)))
-                (handle-output! mq port wait!))
+                (define (wait!/blocking)
+                  (perform-operation wait-writable-operation))
+                (define old-waiter (current-write-waiter))
+                (define (new-waiter p)
+                  (if (eq? p port)
+                      (wait!/blocking)
+                      ;; Maybe a backtrace is being printed,
+                      ;; 'system-async-mark' is used ...
+                      (old-waiter p)))
+                (parameterize ((current-write-waiter new-waiter))
+                  (handle-output! mq port wait!)))
               (when (signal-condition! closed-condition)
                 (inject-error! mq 'input:regular-end-of-file))))
       mq)
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index b013ddc..ce53f2e 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -460,6 +460,27 @@
              (proc spawn))))
         args))
 
+;; When done, wait for every fiber to complete.
+;; Somewhat racy, don't use outside tests.
+(define* (call-with-spawner/wait proc . args)
+  (define h (make-weak-key-hash-table)) ; condition -> nothing in particular
+  (apply call-with-spawner
+        (lambda (spawn/not-waiting)
+          (define (spawn thunk)
+            (define done-condition (make-condition))
+            (hashq-set! h done-condition #f)
+            (spawn/not-waiting
+             (lambda ()
+               (thunk)
+               (signal-condition! done-condition))))
+          (define-values return-values
+            (proc spawn))
+          ;; Make sure every fiber completes before returning.
+          ;; XXX hash-for-each imposes a continuation barrier
+          (for-each wait (hash-map->list (lambda (x y) x) h))
+          (apply values return-values))
+        args))
+
 (define (two-sockets)
   (define sp (socketpair AF_UNIX SOCK_STREAM 0))
   (fcntl (car sp) F_SETFL
@@ -556,4 +577,48 @@
    #:drain? #t
    #:parallelism 1))
 
+;; This detects the absence of the parametrisation of 'current-write-waiter'.
+(test-assert "writer blocking and closed for reading --> all fibers stop"
+  (call-with-spawner/wait
+   (lambda (spawn)
+     (define-values (alpha beta) (two-sockets))
+     ;; Fill the writing pipe, such that the writing fiber will block.
+     #;(fcntl alpha SO_SNDBUF 1) ; doesn't work on sockets on Linux ..
+     ;; Simply writing a byte isn't sufficient, as the kernel can
+     ;; impose a minimum buffer size.
+     (define old-waiter (current-write-waiter))
+     (let/ec ec
+       (parameterize ((current-write-waiter
+                      (lambda (port)
+                        (if (eq? port alpha)
+                            (ec)
+                            ;; maybe a backtrace
+                            (old-waiter port)))))
+        (define bv (make-bytevector 4096))
+        (let loop ()
+          (put-bytevector alpha bv)
+          (loop))))
+     (define closed-condition (make-condition))
+     (define (error-handler e)
+       (assert (eq? e 'input:regular-end-of-file))
+       (unless (signal-condition! closed-condition)
+        (error "already saw end of file")))
+     (define mq (port->message-queue alpha no-handlers error-handler
+                                    #:spawn spawn))
+     (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
+     (define (notify-sent)
+       ;; if the mq-stream implementation implemented buffering by itself
+       ;; this would actually be possible.
+       (error "impossible, should be blocking by now!"))
+     (send-message! mq
+                   (bv-slice/read-write #vu8(0 4 0 0))
+                   #:notify-sent! notify-sent)
+     (pk 'send2)
+     ;; Give the write fiber a chance to block.
+     (yield-many)
+     (sleep 0.1)
+     (shutdown alpha 0)
+     #t)))
+;; ^ if this test blocks, that means not all fibers have stopped
+
 (test-end "mq-stream")

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]