gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 237/324: mq-impl/stream: Allow closing the queue on requ


From: gnunet
Subject: [gnunet-scheme] 237/324: mq-impl/stream: Allow closing the queue on request.
Date: Tue, 21 Sep 2021 13:24:37 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit fb8b1e5b3df15b246cd4404dcaa3bb5bf5e90634
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Mon Sep 6 21:53:56 2021 +0200

    mq-impl/stream: Allow closing the queue on request.
    
    * gnu/gnunet/mq.scm
      (make-message-queue): Accept optional 'closer' argument.
      (close-queue!): New procedure.
    * gnu/gnunet/mq-impl/stream.scm
      (prepare-port-message-queue): Remove fixed TODO.  Also return
      'close!'.
      (prepare-port-message-queue)[request-close-condition]: New variable.
      (prepare-port-message-queue)[start-reader!]{wait-op}: Wait on
      request-close-condition and escape.
      (prepare-port-message-queue)[close!]: New procedure.
      (port->message-queue): Accept 'close!' from
      'prepare-port-message-queue'.  Pass it to the message queue
      constructor.
      (connect/fibers): Accept 'close!' from
      'prepare-port-message-queue'.  Pass it to 'make-message-queue'.
      Note a TODO.
    * tests/mq-stream.scm
      ("fibers stop and port closed after close! (directly after creation)")
      ("fibers stop and port closed after close! (some times passes)"):
      New tests.
---
 gnu/gnunet/mq-impl/stream.scm | 30 ++++++++++++++++++++++--------
 gnu/gnunet/mq.scm             | 28 +++++++++++++++++++++-------
 tests/mq-stream.scm           | 22 ++++++++++++++++++++++
 3 files changed, 65 insertions(+), 15 deletions(-)

diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index 05f6c5a..b122f14 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -261,7 +261,6 @@ an appropriate @code{&undefined-key-error} is raised."
 
     ;; See 'port->message-queue'.  Also used by connect/fibers.
     (define* (prepare-port-message-queue spawn)
-      ;; TODO: closing message queues
       (define rcvar (make-repeated-condition))
       (define (interrupt! mq)
        (trigger-condition! rcvar))
@@ -275,12 +274,21 @@ an appropriate @code{&undefined-key-error} is raised."
       ;; on closed-condition returns #f, as in that case, the other fiber has
       ;; already done all its I/O and won't need the port anymore.
       (define closed-condition (make-condition))
+      ;; 'request-close-condition' is used by 'close!' to stop the read fiber,
+      ;; such that it will signal 'closed-condition', then the 'write-fiber'
+      ;; will also stop and close the port.  Closing it from the write fiber
+      ;; would also be possible, but in this implementation, closing happens
+      ;; in the read fiber.
+      (define request-close-condition (make-condition))
       (define (start-reader! mq port)
        (define-values (key . rest)
          (let/ec escape
            (define wait-op
              (choice-operation
               (wait-until-port-readable-operation port)
+              (wrap-operation (wait-operation request-close-condition)
+                              (lambda ()
+                                (escape 'input:regular-end-of-file)))
               (wrap-operation (wait-operation closed-condition)
                               (lambda ()
                                 (escape 'input:regular-end-fof-file)))))
@@ -337,11 +345,14 @@ an appropriate @code{&undefined-key-error} is raised."
            (inject-error! mq 'input:regular-end-of-file)
            ;; TODO: close-port can block!
            (close-port port)))
+      (define (close!)
+       (signal-condition! request-close-condition))
       (values (lambda (mq port)
                (spawn (lambda () (start-reader! mq port)))
                (spawn (lambda () (start-writer! mq port))))
              ;; Pass this to make-message-queue as the 'sender'.
-             interrupt!))
+             interrupt!
+             close!))
 
     (define* (port->message-queue port handlers error-handler
                                  #:key (spawn spawn-fiber))
@@ -354,11 +365,14 @@ All fibers will complete when the end-of-file has been 
encountered.
 When the connection is broken, the error @code{input:regular-end-of-file}
 is injected.  A half-duplex port is treated as a broken connection.
 XXX: half-duplex connections cannot always be detected
-XXX: Likewise for connect/fibers?"
-      (define-values (start-fibers interrupt!)
+XXX: Likewise for connect/fibers?
+
+The port will be closed when the connection is broken or after
+@ode{close-queue!} is called."
+      (define-values (start-fibers interrupt! close!)
        (prepare-port-message-queue spawn))
       (define mq
-       (make-message-queue handlers error-handler interrupt!))
+       (make-message-queue handlers error-handler interrupt! close!))
       (start-fibers mq port)
       mq)
 
@@ -374,12 +388,12 @@ When the connection has been established, the error 
@code{connection:connected}
 (a symbol) is injected into the message queue.  When the connection has been
 closed by the server (e.g. because the server was stopped or is restarting)
 the error @code{input:regular-end-of-file} is injected into the message queue."
-      ;; TODO closing message queues
-      (define-values (start-fibers interrupt!)
+      (define-values (start-fibers interrupt! close!)
        (prepare-port-message-queue spawn))
       (define mq
-       (make-message-queue handlers error-handler interrupt!))
+       (make-message-queue handlers error-handler interrupt! close!))
       (spawn (lambda ()
+              ;; XXX interupt the connection forming when closing
               (define socket (connect-unix config service-name))
               (inject-error! mq 'connection:connected)
               (start-fibers mq socket)))
diff --git a/gnu/gnunet/mq.scm b/gnu/gnunet/mq.scm
index 2746020..e1be5d4 100644
--- a/gnu/gnunet/mq.scm
+++ b/gnu/gnunet/mq.scm
@@ -25,8 +25,6 @@
 ;; A message queue for GNUnet messages.
 ;; Messages are made of bytes. In particular,
 ;; messages must be prefixed by a /:message-header.
-;;
-;; TODO closing ports
 (define-library (gnu gnunet mq)
   (export <message-queue> make-message-queue message-queue?
          make-one-by-one-sender
@@ -34,6 +32,7 @@
          message-queue-length
          %message-queue-garbagitude
          try-send-again!
+         close-queue!
 
          &missing-header-error make-missing-header-error
          missing-header-error? missing-header-error-received-size
@@ -57,7 +56,7 @@
                /:message-header)
          (only (gnu gnunet netstruct syntactic)
                sizeof read%)
-         (only (guile) define* exact-integer?)
+         (only (guile) lambda* define* exact-integer?)
          (only (ice-9 weak-vector)
                weak-vector weak-vector-ref)
          (only (ice-9 atomic)
@@ -104,10 +103,14 @@
              ;; right now, though it probably should send them
              ;; soonish. It can be run at any time, with
              ;; @code{try-send-again!}.
-             (immutable sender message-queue-sender))
+             (immutable sender message-queue-sender)
+             ;; A thunk to ‘close’ the message queue, usually telling the
+             ;; remote peer that no additional data will be transmitted
+             ;; in either direction and stopping associated threads.
+             (immutable closer message-queue-closer))
       (protocol
        (lambda (%make)
-        (lambda (handlers error-handler sender)
+        (lambda* (handlers error-handler sender #:optional (closer values))
           "Make a message queue with message handlers @var{handlers}.
 
 The message handlers are expected to handle bytevector slices
@@ -120,13 +123,17 @@ Injected errors are passed to @var{error-handler}, a 
variadic procedure.
 A list of possible errors can be found in @file{README.org}.
 
 Messages are sent with @var{sender}. It can be created with
-@code{make-one-by-one-sender}."
+@code{make-one-by-one-sender}.  Optionally, a @var{closer} procedure can
+be passed.  Such a procedure is expected to be idempotent, see
+@code{close-queue!} for details."
           ;; Predicate does not exist yet ...
           #;(assert (message-handlers? handlers))
           #;(assert (message-handler? error-handler))
+          (assert (procedure? closer))
           (%make handlers error-handler
                  (make-atomic-box (cons (pfds:make-queue) 0))
-                 sender)))))
+                 sender
+                 closer)))))
 
     (define (make-one-by-one-sender proc)
       "Make a message sender, sending messages one-by-one with @var{proc}.
@@ -332,6 +339,13 @@ perhaps be sent."
 This is expected to be called from the message queue implementation."
       ((message-queue-sender mq) mq))
 
+    (define (close-queue! mq)
+      "Close the message queue @var{mq}.  The exact semantics are
+implementation-dependent.  Conventionally, this frees resources such as
+sockets and threads and is idempotent and non-blocking.  There is no
+guarantee that all messages in the queue will be sent before closing."
+      ((message-queue-closer mq)))
+
     (define (queue-filter ? queue)
       "Construct a queue, based on @var{queue}, restricted to elements
 satisfying the predicate @var{?}."
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index e16aa1b..f5a56e6 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -729,4 +729,26 @@ If an EPIPE system error is raised, return #f."
      (port-closed? alpha))
    #:parallelism 1)) ; to make the use of yield-many less fragile
 
+(test-assert "fibers stop and port closed after close! (directly after 
creation)"
+  (let^ ((<-- (alpha beta) (two-sockets)))
+       (call-with-spawner/wait
+        (lambda (spawn)
+          (define q (port->message-queue alpha no-handlers 
error-handler/regular
+                                         #:spawn spawn))
+          (close-queue! q))
+        #:parallelism 1)
+       (port-closed? alpha)))
+
+(test-assert "fibers stop and port closed after close! (some times passes)"
+  (let^ ((<-- (alpha beta) (two-sockets)))
+       (call-with-spawner/wait
+        (lambda (spawn)
+          (define q (port->message-queue alpha no-handlers 
error-handler/regular
+                                         #:spawn spawn))
+          (yield-many)
+          (sleep 0.01)
+          (close-queue! q))
+        #:parallelism 1)
+       (port-closed? alpha)))
+
 (test-end "mq-stream")

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]