gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 196/324: mq-impl/stream: Stop all fibers when EOF is rea


From: gnunet
Subject: [gnunet-scheme] 196/324: mq-impl/stream: Stop all fibers when EOF is reached (part 1).
Date: Tue, 21 Sep 2021 13:23:56 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit 264a4d7ad10931974c9597daf486996387b14882
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Sat Aug 21 15:14:13 2021 +0200

    mq-impl/stream: Stop all fibers when EOF is reached (part 1).
    
    * gnu/gnunet/mq-impl/stream.scm
      (port->message-queue): Stop the 'handle-output!' fiber when
      'handle-input!' completes.
    * tests/mq-stream.scm
      (call-with-spawner, two-sockets): New procedures.
      ("input eof detected --> handle/input/output!
      stops (port->message-queue)"): New test.
---
 gnu/gnunet/mq-impl/stream.scm | 30 ++++++++++++++++-----
 tests/mq-stream.scm           | 62 ++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 85 insertions(+), 7 deletions(-)

diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index 8f88d1f..5120bbb 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -30,7 +30,8 @@
                make-one-by-one-sender inject-message! inject-error!
                make-message-queue)
          (only (gnu gnunet concurrency repeated-condition)
-               make-repeated-condition await-trigger! trigger-condition!)
+               make-repeated-condition await-trigger! prepare-await-trigger!
+               trigger-condition!)
          (only (gnu gnunet utils bv-slice)
                slice-bv slice-offset slice-length
                slice-readable? bv-slice/read-write
@@ -56,7 +57,9 @@
          (only (fibers)
                sleep spawn-fiber)
          (only (fibers conditions)
-               make-condition signal-condition! wait)
+               make-condition signal-condition! wait-operation)
+         (only (fibers operations)
+               wrap-operation perform-operation choice-operation)
          (srfi srfi-26)
          (only (guile)
                error define* identity
@@ -71,6 +74,8 @@
                put-bytevector)
          (only (ice-9 atomic)
                make-atomic-box atomic-box-set! atomic-box-ref)
+         (only (ice-9 control)
+               let/ec)
          (only (srfi srfi-1)
                memv list-ref)
          (srfi srfi-26))
@@ -233,17 +238,30 @@ an appropriate @code{&undefined-key-error} is raised."
       "Create a message queue sending and receiving data over @var{port}.
 
 This creates some fibers with @var{spawn} (@code{spawn-fiber} by default).
-As such, @var{port} must be non-blocking if @code{spawn-fiber} is used."
+As such, @var{port} must be non-blocking if @code{spawn-fiber} is used.
+All fibers will complete when the end-of-file has been encountered.
+
+XXX: likewise for both reading and writing? Likewise for connect/fibers?"
       ;; TODO: closing message queues
       (define rcvar (make-repeated-condition))
       (define (interrupt! mq)
        (trigger-condition! rcvar))
-      (define wait! (cut await-trigger! rcvar))
+      (define closed-condition (make-condition))
       (define mq (make-message-queue handlers error-handler interrupt!))
       (spawn (lambda ()
-              (handle-input! mq port)))
+              (handle-input! mq port)
+              (signal-condition! closed-condition)))
       (spawn (lambda ()
-              (handle-output! mq port wait!)))
+              (let/ec escape
+                (define escape-when-closed-operation
+                  (wrap-operation (wait-operation closed-condition)
+                                  escape))
+                (define (wait!)
+                  (perform-operation
+                   (choice-operation
+                    (prepare-await-trigger! rcvar)
+                    escape-when-closed-operation)))
+                (handle-output! mq port wait!))))
       mq)
 
     (define* (connect/fibers config service-name handlers error-handler
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index dc1e015..a4ca313 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -34,11 +34,13 @@
             (srfi srfi-26)
             (srfi srfi-43)
             (rnrs io ports)
+            (ice-9 atomic)
             (ice-9 binary-ports)
             (ice-9 suspendable-ports)
             (ice-9 control)
             (ice-9 match)
-            (ice-9 threads))
+            (ice-9 threads)
+            (tests utils))
 
 (define (no-sender . _)
   (error "no sender!"))
@@ -449,4 +451,62 @@
      (wait done/1)
      #t)))
 
+(define* (call-with-spawner proc . args)
+  (apply run-fibers
+        (lambda ()
+          (call-with-services
+           '()
+           (lambda (config spawn)
+             (proc spawn))))
+        args))
+
+(define (two-sockets)
+  (define sp (socketpair AF_UNIX SOCK_STREAM 0))
+  (fcntl (car sp) F_SETFL
+        (bitwise-ior (fcntl (car sp) F_GETFL) O_NONBLOCK))
+  (fcntl (cdr sp) F_SETFL
+        (bitwise-ior (fcntl (cdr sp) F_GETFL) O_NONBLOCK))
+  (values (car sp) (cdr sp)))
+
+(test-assert "input eof detected --> handle-input/output! stops 
(port->message-queue)"
+  (call-with-spawner
+   (lambda (spawn)
+     (define-values (alpha beta) (two-sockets))
+     (define end-of-file (make-condition))
+     (define (error-handler . e)
+       (assert (equal? e '(input:regular-end-of-file)))
+       ;; only one end-of-file notification
+       (assert (signal-condition! end-of-file)))
+     (define mq/alpha
+       (port->message-queue alpha no-handlers error-handler
+                           #:spawn spawn))
+     ;; Give the fibers started by 'port->message-queue' a chance to block.
+     (yield-many)
+     ;; Let 'beta' stop writing, such that 'alpha' receives an end-of-file.
+     ;; But keep the 'write' end of 'alpha' / 'read' end of 'beta' open to
+     ;; complicate matters.
+     (shutdown beta 1)
+     (wait end-of-file)
+     (define sent? (make-atomic-box #f))
+     ;; Attempt to write a message, even though the connection is 
(half-)closed.
+     ;; It should not actually be sent.
+     (send-message! mq/alpha (bv-slice/read-write #vu8(0 4 0 0))
+                   #:notify-sent!
+                   (lambda ()
+                     ;; strictly speaking, this does not mean the message was
+                     ;; sent, but it's close enough for this test's purposes.
+                     (atomic-box-set! sent? #t)))
+     ;; Give 'handle-output!' a chance to (faultively) sent the message.
+     (yield-many)
+     (sleep 0.1) ; the yield-many above is apparently insufficient
+     (assert (not (atomic-box-ref sent?)))
+     ;; If it didn't try to sent the message, that presumably means the
+     ;; 'handle-output!' fiber has completed.
+     #t)
+   ;; Wait until all runnable fibers complete
+   ;; -- TODO this does not seem to include blocking handle-input/output!.
+   #:drain? #t
+   ;; drain? is broken when parallelism is enabled, see <???>.
+   #:parallelism 1))
+
 (test-end "mq-stream")

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]