gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 238/324: mq-impl/stream: Allow closing queues made with


From: gnunet
Subject: [gnunet-scheme] 238/324: mq-impl/stream: Allow closing queues made with connect-fibers.
Date: Tue, 21 Sep 2021 13:24:38 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit d52234e8113c638acef979933ae4e4ed3e9abdcb
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Tue Sep 7 21:00:00 2021 +0200

    mq-impl/stream: Allow closing queues made with connect-fibers.
    
    * gnu/gnunet/mq-impl/stream.scm
      (connect/fibers): Document new behaviour.
      (connect/fibers)[interrupt-condition]: New variable.
      (connect/fibers)[close*!]: New procedure.
      (connect/fibers)[mq]: Use new procedure.
      (connect/fibers)[allow-interrupt-operation]: New variable.
      (connect/fibers)[sleep*]: New procedure.
      (connect/fibers): Pass new 'sleep*' procedure to 'connect-unix'.
      (connect/fibers): Inject 'connection:interrupted' if appropriate
      instead of 'connection:connected'.
      (connect-unix): Add 'sleep' argument.
    * doc/scheme-gnunet.tm (Disconnecting): New section.
    * tests/mq-stream.scm
      ("can close while still connecting (--> interrupted)")
      ("can close after being connected (--> regular-end-of-file)"):
      New tests.
    * README.org (List of errors): Note existence of 'connectin:interrupted'.
---
 README.org                    |  4 +++
 doc/scheme-gnunet.tm          | 25 ++++++++++++++---
 gnu/gnunet/mq-impl/stream.scm | 39 +++++++++++++++++++++------
 tests/mq-stream.scm           | 62 +++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 119 insertions(+), 11 deletions(-)

diff --git a/README.org b/README.org
index 13e68a1..dbdb3ec 100644
--- a/README.org
+++ b/README.org
@@ -128,6 +128,10 @@
      The message queue has been connected,
      so sent messages shouldn't simply be buffered.
      Injected by (gnu gnunet mq-impl stream).
+   + connection:interrupted
+
+     The message queue has been closed before the connection could
+     be established.  Injected by (gnu gnunet mq-impl stream).
 
    Input errors (decoding)
    + logic:no-handler type . rest
diff --git a/doc/scheme-gnunet.tm b/doc/scheme-gnunet.tm
index b2dbd56..67fcba7 100644
--- a/doc/scheme-gnunet.tm
+++ b/doc/scheme-gnunet.tm
@@ -431,6 +431,11 @@
     The connection to the server has been established.
   </explain>
 
+  <\explain>
+    <scm|connection:interrupted>
+  </explain|The message queue has been closed before the connection to the
+  server could be established.>
+
   <\explain>
     <scm|input:regular-end-of-file>
   <|explain>
@@ -519,14 +524,28 @@
   except the messages that are eventually cancelled.
 
   The errors <scm|logic:no-handler> and <scm|logic:ill-formed> are not fatal:
-  later messages can still be read and handled.
+  later messages can still be read and handled.<space|1em>If
+  <scm|connection:interrupted> is injected, no other errors are ever
+  injected, whether in the past or in the future.<space|1em>This error can
+  only be injected once.
 
   <todo|I/O errors>
 
-  <todo|disconnecting>
-
   <todo|envelopes>
 
+  <subsection|Disconnecting>
+
+  A message queue can be closed with the <scm|close-queue!> procedure from
+  <scm|(gnu gnunet mq)>.<space|1em>In the default message queue
+  implementation, this asynchronuously closes the port and stops associated
+  fibers.<space|1em>Closing ports when they won't be used anymore is
+  important for limiting resource consumption, especially for servers that
+  can have many connections.<space|1em>Closing message queues is an
+  idempotent operation: closing a message queue twice is the same as closing
+  it once.<space|1em> If a message queue is closed before a connection could
+  be formed, <scm|connection:interrupted> is injected instead of
+  <scm|connection:connected> and <scm|connection:regular-end-of-file>.
+
   <section|Estimation of the size of the network>
 
   <index|network size estimation>GNUnet has a service that roughly estimates
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index b122f14..72cd59c 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -57,7 +57,9 @@
          (only (rnrs control)
                when)
          (only (fibers)
-               sleep spawn-fiber)
+               spawn-fiber)
+         (only (fibers timers)
+               sleep-operation)
          (only (fibers io-wakeup)
                wait-until-port-readable-operation
                wait-until-port-writable-operation)
@@ -194,7 +196,7 @@ TODO: closing, destroying @var{mq}, @var{output}."
     (define %path-resolution-errors
       (list EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG))
 
-    (define (connect-unix config service-name)
+    (define (connect-unix config service-name sleep)
       "Try connecting to the server using UNIX domain sockets.
 
 On success, the socket is returned.  If the server has bound
@@ -387,15 +389,36 @@ until they can be sent.  Some fibers may be created with 
@var{spawn}
 When the connection has been established, the error @code{connection:connected}
 (a symbol) is injected into the message queue.  When the connection has been
 closed by the server (e.g. because the server was stopped or is restarting)
-the error @code{input:regular-end-of-file} is injected into the message queue."
+the error @code{input:regular-end-of-file} is injected into the message queue.
+
+If the message queue is closed while establishing the connection, the error
+@code{connection:interrupted} (a symbol) is injected and
+@code{connection:connected} is not injected."
       (define-values (start-fibers interrupt! close!)
        (prepare-port-message-queue spawn))
+      (define interrupt-condition (make-condition))
+      (define (close*!)
+       (signal-condition! interrupt-condition)
+       (close!))
       (define mq
-       (make-message-queue handlers error-handler interrupt! close!))
+       (make-message-queue handlers error-handler interrupt! close*!))
       (spawn (lambda ()
-              ;; XXX interupt the connection forming when closing
-              (define socket (connect-unix config service-name))
-              (inject-error! mq 'connection:connected)
-              (start-fibers mq socket)))
+              ;; Use 'sleep*' to allow 'close*!' to stop the connection
+              ;; forming.
+              (define socket (let/ec escape
+                               (define allow-interrupt-operation
+                                 (wrap-operation
+                                  (wait-operation interrupt-condition)
+                                  (lambda () (escape #f))))
+                               (define (sleep* time)
+                                 (perform-operation
+                                  (choice-operation
+                                   (sleep-operation time)
+                                   allow-interrupt-operation)))
+                               (connect-unix config service-name sleep*)))
+              (if socket
+                  (begin (inject-error! mq 'connection:connected)
+                         (start-fibers mq socket))
+                  (inject-error! mq 'connection:interrupted))))
       mq)))
 
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index f5a56e6..6b1850a 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -751,4 +751,66 @@ If an EPIPE system error is raised, return #f."
         #:parallelism 1)
        (port-closed? alpha)))
 
+(test-assert "can close while still connecting (--> interrupted)"
+  (call-with-socket-location
+   (lambda (where config)
+     (call-with-spawner/wait
+      (lambda (spawn)
+       (define interrupted? #f)
+       (define cond (make-condition))
+       (define (error-handler . e)
+         (match e
+           ('(connection:interrupted)
+            (begin
+              (pk 'interrupted)
+              (assert (not interrupted?))
+              (set! interrupted? #t)
+              (signal-condition! cond)))
+           (_ (error "what ~a" e))))
+       (define mq (connect/fibers config "service" no-handlers error-handler
+                                  #:spawn spawn))
+       (close-queue! mq)
+       (wait cond)
+       #t)))))
+
+(test-assert "can close after being connected (--> regular-end-of-file)"
+  (call-with-socket-location
+   (lambda (where config)
+     (call-with-spawner/wait
+      (lambda (spawn)
+       (define connected? #f)
+       (define connected-condition (make-condition))
+       (define disconnected? #f)
+       (define disconnected-condition (make-condition))
+       (define (error-handler . e)
+         (match e
+           ('(connection:connected)
+            (pk 'connected)
+            (assert (not connected?))
+            (set! connected? #t)
+            (signal-condition! connected-condition))
+           ('(input:regular-end-of-file)
+            (assert connected?)
+            (assert (not disconnected?))
+            (set! disconnected? #t)
+            (signal-condition! disconnected-condition))
+           (_ (error "what ~a" e))))
+       (define mq (connect/fibers config "service" no-handlers error-handler
+                                  #:spawn spawn))
+       (spawn
+        (lambda ()
+          (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+          (bind listening-sock AF_UNIX where)
+          (listen listening-sock 1)
+          ;; Make it non-blocking (because guile-fibers is used)
+          (fcntl listening-sock F_SETFL
+                 (bitwise-ior (fcntl listening-sock F_GETFL) O_NONBLOCK))
+          ;; Not actually interested in the return value
+          (accept listening-sock)))
+       (wait connected-condition)
+       (assert (not disconnected?))
+       (close-queue! mq)
+       (wait disconnected-condition)
+       #t)))))
+
 (test-end "mq-stream")

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]