gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 87/324: mq: define message queue module


From: gnunet
Subject: [gnunet-scheme] 87/324: mq: define message queue module
Date: Tue, 21 Sep 2021 13:22:07 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit a851800e1c471adc51b65f6298907fbc84e9073c
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Thu Mar 4 18:56:33 2021 +0100

    mq: define message queue module
    
    * gnu/gnunet/mq/message-io.scm: new module.
    * tests/message-io.scm: test it.
    * README.org (Modules): document message queues.
    * Makefile.am: compile new module and run its tests.
---
 Makefile.am                  |   4 +-
 README.org                   |   8 ++
 gnu/gnunet/mq/message-io.scm | 238 +++++++++++++++++++++++++++++++++++++++++++
 tests/message-io.scm         | 196 +++++++++++++++++++++++++++++++++++
 4 files changed, 445 insertions(+), 1 deletion(-)

diff --git a/Makefile.am b/Makefile.am
index 3e85312..bb6f9f3 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -41,6 +41,7 @@ modules = \
   gnu/gnunet/util/mq-enum.scm \
   gnu/gnunet/util/mq-handler.scm \
   gnu/gnunet/util/mq.scm \
+  gnu/gnunet/mq/message-io.scm \
   \
   gnu/gnunet/utils/bv-slice.scm \
   gnu/gnunet/utils/hat-let.scm \
@@ -88,7 +89,8 @@ SCM_LOG_DRIVER = \
 SCM_TESTS = \
   tests/envelope.scm \
   tests/message-handler.scm \
-  tests/update.scm
+  tests/update.scm \
+  tests/message-io.scm
 
 SCM_TESTS_ENVIRONMENT = \
   GUILE_AUTO_COMPILE=0 \
diff --git a/README.org b/README.org
index b6ec797..2b41933 100644
--- a/README.org
+++ b/README.org
@@ -52,6 +52,11 @@
     can be waited upon.
 
 ** Message queues
+   Message queues have three parts: the input queue, the output
+   queue and the transport, that are respectively a read+close request
+   capability, a write+close request capability and a capability
+   for all the previous, reacting to a close request and injecting errors.
+
    + gnu/gnunet/util/mq.scm: message priorities & preferences
 
      Preferences: is out-of-order allowed or not,
@@ -66,7 +71,10 @@
      the ambient authority appropriately.
 
      TODO rename to gnu/gnunet/mq/handler.scm
+   + gnu/gnunet/mq/message-io.scm: like soft ports, but using
+     fibers channels and for messages.
    + TODO actual queues?  Maybe we don't need them?
+   + TODO filling the queues
 ** Network structures
    Features:
 
diff --git a/gnu/gnunet/mq/message-io.scm b/gnu/gnunet/mq/message-io.scm
new file mode 100644
index 0000000..b19fb6b
--- /dev/null
+++ b/gnu/gnunet/mq/message-io.scm
@@ -0,0 +1,238 @@
+;; This file is part of scheme-GNUnet.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; scheme-GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; scheme-GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+;; @author Maxime Devos (scheme-GNUnet)
+;;
+;; @brief Generic interface for sending / receiving messages
+;; TODO perhaps some kind of buffering would be useful.
+;;
+;; What's the impact on performance from having to wait on a fiber
+;; when sending / receving a message?  Maybe put the 'queue' in
+;; message queue when it makes sense.
+;;
+;; TODO integrate with message envelopes.  Maybe change the
+;; definition of envelopes.
+(define-library (gnu gnunet mq message-io)
+  (export <message-input> message-transport->input message-input?
+         read-message-operation
+         read-input-error-operation
+         close-input!
+         wait-for-input-closed-operation
+
+         <message-output> message-transport->output message-output?
+         send-message-operation
+         read-output-error-operation
+         wait-for-output-closed-operation
+         close-output!
+
+         <message-transport> make-message-transport message-transport?
+         wait-for-transport-close-operation
+         notice-input-error-operation
+         notice-output-error-operation
+         close-transport!)
+  (import (only (rnrs base)
+               begin let define lambda assert)
+         (only (fibers operations)
+               wrap-operation)
+         (only (fibers conditions)
+               make-condition signal-condition! wait-operation)
+         (only (fibers channels)
+               make-channel get-operation put-operation)
+         (only (ice-9 atomic)
+               make-atomic-box atomic-box-ref atomic-box-set!)
+         (only (rnrs records syntactic)
+               define-record-type)
+         (only (rnrs conditions)
+               define-condition-type
+               &violation
+               make-message-condition
+               make-who-condition))
+  (begin
+    
+    (define-record-type (<message-transport> make-message-transport 
message-transport?)
+      (fields (immutable close? transport-close-condition) ; condition
+             (immutable closed? transport-closed-condition) ; condition 
+             (immutable messages transport-messages) ; fibers channel
+             (immutable input-errors transport-input-errors) ; fibers channel
+             (immutable output-errors transport-output-errors) ; fibers channel
+             ;; TODO I don't think atomic boxes are strictly required here.
+             ;; atomic box of (#f or the errors)
+             (immutable input-errors/close transport-input-errors/close)
+             ;; atomic box of (#f or the errors)
+             (immutable output-errors/close transport-output-errors/close))
+      (protocol
+       (lambda (%make)
+        (lambda ()
+          "Return a fresh message transport.
+
+Messages will be sent from the output half-pipe to the input half-pipe.
+By default, closing the half-pipes will do nothing, and the half-pipes
+will remain marked as open.  Use @code{wait-for-transport-close-operation}
+and @code{close-transport!} to react to close requests.
+
+Errors can be sent with @code{notice-input-error-operation} and
+@code{notice-output-error-operation}.  Note that input and output
+errors are separated.
+
+No restrictions are placed upon the types of messages sent."
+          (%make (make-condition)
+                 (make-condition)
+                 (make-channel)
+                 (make-channel)
+                 (make-channel)
+                 (make-atomic-box #f)
+                 (make-atomic-box #f)))))
+      (opaque #t)
+      (sealed #f))
+
+    (define-record-type (<message-input> message-transport->input 
message-input?)
+      (fields (immutable transport message-input-transport))
+      (protocol
+       (lambda (%make)
+        (lambda (transport)
+          "Return an input queue corresponding to the transport
+@var{transport}.  Currently, this is a fresh object, but that might
+change in the future."
+          (assert (message-transport? transport))
+          (%make transport))))
+      (opaque #t)
+      (sealed #f))
+
+    (define-record-type (<message-output> message-transport->output 
message-output?)
+      (fields (immutable transport message-output-transport))
+      (protocol
+       (lambda (%make)
+        (lambda (transport)
+          "Return an output queue corresponding to the transport
+@var{transport}.  Currently, this is a fresh object, but that might
+change in the future."
+          (assert (message-transport? transport))
+          (%make transport))))
+      (opaque #t)
+      (sealed #f))
+
+    (define (close-input! in)
+      "Close the input queue @var{in} (asynchronuous).
+@code{wait-for-input-closed-operation} can be used to wait
+until the queue has been closed.  This has the same effect
+as @code{close-output!} on the output queue."
+      (assert (message-input? in))
+      (signal-condition!
+       (transport-close-condition (message-input-transport in))))
+
+    (define (close-output! out)
+      "Close the output queue @var{in} (asynchronuous).
+@code{wait-for-output-closed-operation} can be used to wait
+until the queue has been closed.  This has the same effect
+as @code{close-input!} on the input queue."
+      (assert (message-output? out))
+      (signal-condition!
+       (transport-close-condition (message-output-transport out))))
+
+    (define (read-message-operation in)
+      "Return an operation for reading a message from the input queue @var{in}.
+
+The operation will block until a message has been read, so this should probably
+be combined with @code{wait-for-input-closed-operation} and
+@code{read-input-error-operation}."
+      (assert (message-input? in))
+      (get-operation (transport-messages (message-input-transport in))))
+
+    (define (send-message-operation out msg)
+      "Make an operation for sending a message @var{msg} into the output queue
+@var{out}.
+
+The operation will block until the message has been sent (though it may take
+some time before it ends up on the other side of the network, and some kind
+of output error could happen in-between), so this should probably be combined
+with @code{wait-for-output-closed-operation} and 
@code{read-output-error-operation}"
+      (assert (message-output? out))
+      (put-operation (transport-messages (message-output-transport out)) msg))
+
+    (define (read-input-error-operation in)
+      "Return an operation for reading the next input error from the
+input queue @var{in}."
+      (assert (message-input? in))
+      (get-operation (transport-input-errors (message-input-transport in))))
+
+    (define (read-output-error-operation out)
+      "Return an operation for reading the next output error from the
+output queue @var{out}."
+      (assert (message-output? out))
+      (get-operation (transport-output-errors (message-output-transport out))))
+
+    (define (wait-for-transport-close-operation transport)
+      "Return an operation for waiting upon a close request
+from the input or output queue."
+      (assert (message-transport? transport))
+      (wait-operation (transport-close-condition transport)))
+
+    (define (close-transport! transport input-errors output-errors)
+      "Close the transport @var{transport}, with some closing input errors
+and closing output errors @var{input-errors} and @var{output-errors}.
+This marks the input and output queues as closed.
+XXX double closes probably should be detected."
+      (assert (message-transport? transport))
+      (atomic-box-set! (transport-input-errors/close transport)
+                      input-errors)
+      (atomic-box-set! (transport-output-errors/close transport)
+                      output-errors)
+      (signal-condition!
+       (transport-closed-condition transport)))
+
+    (define (notice-input-error-operation transport error)
+      "Return an operation for indicating the transport @var{transport}
+noticed an input error @var{error}.  This will block if no fiber is waiting
+for an input error, so this procedure should probably not be used after
+the transport has been closed."
+      (assert (message-transport? transport))
+      (put-operation (transport-input-errors transport) error))
+
+    (define (notice-output-error-operation transport error)
+      "Return an operation for indicating the transport @var{transport}
+noticed an output error @var{error}.   This will block if no fiber is waiting
+for an output error, so this procedure should probably not be used after
+the transport has been closed."
+      (assert (message-transport? transport))
+      (put-operation (transport-output-errors transport) error))
+
+    (define (wait-for-output-closed-operation out)
+      "Return an operation for waiting until the output queue @var{out}
+has been closed.  This has the same effect as waiting until the corresponding
+input queue has been closed, except the return values are presumably different.
+Any output errors happening during the closing are returned in a data structure
+according to the transport."
+      (assert (message-output? out))
+      (let ((transport (message-output-transport out)))
+       (wrap-operation
+        (wait-operation (transport-closed-condition transport))
+        (lambda ()
+          (atomic-box-ref (transport-output-errors/close transport))))))
+
+    (define (wait-for-input-closed-operation in)
+      "Return an operation for waiting until the input queue @var{in}
+has been closed.  This has the same effect as waiting until the corresponding
+output queue has been closed, except the return values are presumably 
different.
+Any input errors happening during the closing are returned in a data structure
+according to the transport."
+      (assert (message-input? in))
+      (let ((transport (message-input-transport in)))
+       (wrap-operation
+        (wait-operation (transport-closed-condition transport))
+        (lambda ()
+          (atomic-box-ref (transport-input-errors/close transport))))))))
diff --git a/tests/message-io.scm b/tests/message-io.scm
new file mode 100644
index 0000000..f283855
--- /dev/null
+++ b/tests/message-io.scm
@@ -0,0 +1,196 @@
+;; This file is part of scheme-GNUnet.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; scheme-GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; scheme-GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+(use-modules (gnu gnunet mq message-io)
+            (rnrs base)
+            (fibers)
+            ((fibers internal)
+             #:select (yield-current-fiber))
+            (fibers conditions)
+            (fibers operations))
+
+;; Test whether inputs, outputs and transports are disjoint.
+;;
+;; It would not be unreasonable if transport actually was a subtype
+;; of input and output --- transport implies input and output, after all.
+(define %transport (make-message-transport))
+
+(test-eq "message-transport? predicate"
+  #t
+  (message-transport? %transport))
+(test-eq "message-input? predicate"
+  #t
+  (message-input? (message-transport->input %transport)))
+(test-eq "message-output? predicate"
+  #t
+  (message-output? (message-transport->output %transport)))
+
+(test-equal "message-transport? -> not message-input? / message-output?"
+  '(#f . #f)
+  (cons (message-input? %transport)
+       (message-output? %transport)))
+
+(test-equal "message-input? -> not message-output? / message-transport?"
+  '(#f . #f)
+  (let ((i (message-transport->input %transport)))
+    (cons (message-output? i)
+         (message-transport? i))))
+
+(test-equal "message-output? -> not message-input? / message-transport?"
+  '(#f . #f)
+  (let ((o (message-transport->output %transport)))
+    (cons (message-input? o)
+         (message-transport? o))))
+
+
+
+;; Test whether inputs and outputs are connected
+
+;; Non-preempting, in order to be able to detect
+;; blocking later.
+(define (run-fibers* thunk)
+  (run-fibers thunk
+             #:install-suspendable-ports? #f
+             #:parallelism 1
+             #:hz 0))
+
+(define-syntax-rule (with-fibers exp exp* ...)
+  (run-fibers* (lambda () exp exp* ...)))
+
+(define-syntax-rule (async exp exp* ...)
+  (spawn-fiber (lambda () exp exp* ...)))
+
+(test-eq "send-message-operation & read-message-operation"
+  'stuff
+  (let* ((transport (make-message-transport))
+        (input (message-transport->input transport))
+        (output (message-transport->output transport))
+        (send (send-message-operation output 'stuff))
+        (input (read-message-operation input)))
+    (with-fibers
+     (async (perform-operation send))
+     (perform-operation input))))
+
+(define-syntax-rule (test-notice/i/o-error str transport->i/o notice-error 
read-error)
+  (test-eq str
+    'oops
+    (let* ((transport (make-message-transport))
+          (i/o (transport->i/o transport))
+          (notice-oops (notice-error transport 'oops))
+          (wait-for-oops (read-error i/o)))
+      (with-fibers
+       (async (perform-operation notice-oops))
+       (perform-operation wait-for-oops)))))
+
+(test-notice/i/o-error "notice-input-error-operation & 
read-input-error-operation"
+                      message-transport->input
+                      notice-input-error-operation
+                      read-input-error-operation)
+
+(test-notice/i/o-error "notice-output-error-operation & 
read-output-error-operation"
+                      message-transport->output
+                      notice-output-error-operation
+                      read-output-error-operation)
+
+
+;; Test closing
+
+(define (make-detect-blocking-operation loops)
+  "Return a condition that returns when a newly-spawned
+fiber is yields LOOPS times"
+  (assert (and (exact-integer? loops)
+              (>= loops 0)))
+  (let ((op (make-condition)))
+    (async
+     (let loop ((n loops))
+       (if (>= n 0)
+          (begin (assert (yield-current-fiber))
+                 (loop (- n 1)))
+          (signal-condition! op))))
+    (wait-operation op)))
+(define *loops* 16) ; should be enough
+
+(define-syntax-rule (test-i/o-open str transport->i/o wait-i/o-closed)
+  (test-eq str
+    'blocking
+    (let* ((transport (make-message-transport))
+          (i/o (transport->i/o transport))
+          (wait-closed (wait-i/o-closed i/o)))
+      (with-fibers
+       (let ((is-blocking (make-detect-blocking-operation *loops*)))
+        (perform-operation
+         (choice-operation
+          (wrap-operation is-blocking (const 'blocking))
+          (wrap-operation wait-closed (const 'closed)))))))))
+
+(test-i/o-open "transports start open (input)"
+              message-transport->input wait-for-input-closed-operation)
+
+(test-i/o-open "transports start open (output)"
+              message-transport->output wait-for-output-closed-operation)
+
+(test-i/o-open "transports start non-closing"
+              identity wait-for-transport-close-operation)
+
+(define (test-input-closed x)
+  (perform-operation
+   (choice-operation
+    (wrap-operation (wait-for-input-closed-operation
+                    (message-transport->input x))
+                   (lambda _ _))
+    (wrap-operation (make-detect-blocking-operation *loops*)
+                   (lambda () '())))))
+
+(define (test-output-closed x)
+  (perform-operation
+   (choice-operation
+    (wrap-operation (wait-for-output-closed-operation
+                    (message-transport->output x))
+                   (lambda _ _))
+    (wrap-operation (make-detect-blocking-operation *loops*)
+                   (lambda () '())))))
+
+(test-equal "input & output are closed by close-transport!"
+  '((input-errors) . (output-errors))
+  (let* ((transport (make-message-transport)))
+    (with-fibers
+     (close-transport! transport 'input-errors 'output-errors)
+     (cons (test-input-closed transport)
+          (test-output-closed transport)))))
+
+(define-syntax-rule
+    (test-i/o-close-request str transport->i/o close-i/o! test-i/o-closed)
+  (test-equal str
+    '(close-me)
+    (let* ((transport (make-message-transport)))
+      (with-fibers
+       (close-i/o! (transport->i/o transport))
+       (cons (perform-operation
+             (choice-operation
+              (wrap-operation (wait-for-transport-close-operation transport)
+                              (lambda () 'close-me))
+              (wrap-operation (make-detect-blocking-operation *loops*)
+                              (lambda () 'not-yet))))
+            (test-i/o-closed transport))))))
+
+(test-i/o-close-request
+ "only wait-for-transport-close succeeds after a close request (input)"
+ message-transport->input close-input! test-input-closed)
+(test-i/o-close-request
+ "only wait-for-transport-close succeeds after a close request (output)"
+ message-transport->output close-output! test-output-closed)

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]