gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 156/324: mq-impl/stream: Implement on top of ports.


From: gnunet
Subject: [gnunet-scheme] 156/324: mq-impl/stream: Implement on top of ports.
Date: Tue, 21 Sep 2021 13:23:16 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit 1980948c623556395e735d9efa178a2107795cf1
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Sat Jul 17 16:52:19 2021 +0200

    mq-impl/stream: Implement on top of ports.
    
    * gnu/gnunet/mq-impl/stream.scm
      (write-envelope!, handle-input!, handle-output!): New procedures.
    * tests/mq-stream.scm
      (no-sender, no-handlers, no-error-handler, check-slice-equal)
      (simple-handler, blocking-output-port, blocking-output-port): New
      procedures.
      ("messages + eof are injected in-order")
      ("overly small message is detected (--> stop)")
      ("premature eof is detected (--> stop)")
      ("envelopes are written (no blocking)")
      ("repeatable conditions can be used (blocking)"): New tests.
---
 README.org                    |   2 +
 gnu/gnunet/mq-impl/stream.scm | 132 ++++++++++++++++++++++++
 tests/mq-stream.scm           | 234 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 368 insertions(+)

diff --git a/README.org b/README.org
index 757b965..21bb600 100644
--- a/README.org
+++ b/README.org
@@ -86,6 +86,8 @@
      capabilities; the interposition can be used to adjust
      the ambient authority appropriately.
    + gnu/gnunet/mq.scm: the message queue itself!
+   + gnu/gnunet/mq-impl/stream.scm: generic implementation on top of
+     Guile's port abstraction.
 
    + TODO actual queues?  Maybe we don't need them?
    + TODO filling the queues
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
new file mode 100644
index 0000000..dcc5706
--- /dev/null
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -0,0 +1,132 @@
+;; This file is part of scheme-GNUnet.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; scheme-GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; scheme-GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+;; C source: src/util/client.c (not completely ported).
+;; The Scheme implementation is rather different from the C implementation
+;; though.
+;;
+;; This module allows communication between GNUnet services using stream
+;; sockets.
+
+(define-library (gnu gnunet mq-impl stream)
+  (export write-envelope! handle-input! handle-output!)
+  (import (only (gnu gnunet mq)
+               make-one-by-one-sender inject-message! inject-error!)
+         (only (gnu gnunet utils bv-slice)
+               slice-bv slice-offset slice-length
+               slice-readable? bv-slice/read-write
+               slice/read-only)
+         (only (gnu gnunet mq envelope)
+               attempt-irrevocable-sent!)
+         (only (gnu gnunet utils tokeniser)
+               make-tokeniser add-from-port!)
+         (only (gnu gnunet utils hat-let)
+               let^)
+         (only (rnrs base)
+               begin define let values quote
+               assert)
+         (only (guile)
+               error)
+         (only (rnrs io ports)
+               put-bytevector)
+         (srfi srfi-26))
+  (begin
+    (define (write-envelope! output envelope)
+      "Write the envelope @var{envelope} to the output port @var{output},
+unless it is cancelled.  @var{envelope} may not be already sent.  This
+can block and raise I/O errors, depending on the port @var{output} and
+(in Guile) the current write waiter.  As such, the caller might need to
+parameterise the current write waiter and install exception handlers."
+      (attempt-irrevocable-sent!
+       envelope
+       ((go message priority)
+       (assert (slice-readable? message))
+       (put-bytevector output (slice-bv message)
+                       (slice-offset message) (slice-length message))
+       (values))
+       ((cancelled) (values))
+       ((already-sent) (error "tried to send an envelope twice"))))
+
+    ;; TODO: maybe note that this procedure blocks?
+    (define (handle-input! mq input)
+      "Keep reading message from the input port @var{input}.
+
+Feed each read message in-order to @var{mq} with @code{inject-message!}.
+This procedure might inject errors by its own as usual (e.g. when
+no appropriate message handler exists).
+
+If a message with an overly small message size it its header
+is encountered, inject the error @code{input:overly-small type size}
+into @var{mq}, where @var{type} is the message type as an integer
+(or @code{#f} if it could not be determined) and @var{size} is the
+message size in the header.
+
+When the first [1] end-of-file has been reached, inject the error
+@code{input:regular-end-of-file} into @var{mq}.  If the end-of-file
+happened while inside a (partial) message, inject
+@code{input:premature-end-of-file} instead.  In case of an I/O error,
+TODO.
+
+In these exceptional cases, the call to this procedure also returns
+after injecting the error. TODO closing message queues."
+      (let^ ((! tok (make-tokeniser))
+            (! (handle/message bv offset length)
+               (inject-message!
+                mq
+                ;; TODO: this allocates memory
+                (slice/read-only
+                 (bv-slice/read-write bv offset length))))
+            (! (return/overly-small type size)
+               (inject-error! mq 'input:overly-small type size)
+               (values))
+            (! (return/premature-eof)
+               (inject-error! mq 'input:premature-end-of-file)
+               (values))
+            (! (return/done-eof)
+               (inject-error! mq 'input:regular-end-of-file)
+               (values)))
+           (add-from-port! tok input handle/message return/overly-small
+                           return/done-eof return/premature-eof)))
+
+    (define (handle-output! mq output wait!)
+      "Keep sending message envelopes over the output port @var{output}.
+
+The messages to send are taken in-order from the message queue @var{mq}.
+In case of an I/O error, ???.  When the message queue is (temporarily)
+empty, the thunk @var{wait!} is called.  It should return when messages
+have been added to the queue.
+
+When using guile-fibers, @var{wait!} can be implemented with
+@code{await-trigger!} and by calling @code{trigger-condition!}
+from the ‘message sender’ of @var{mq}.
+
+TODO: closing, destroying @var{mq}, @var{output}."
+      (define (one-by-one-proc ev)
+       (write-envelope! output ev))
+      (define send-round
+       (cute (make-one-by-one-sender one-by-one-proc)
+             mq))
+      (let loop ()
+       ;; Doing 'wait!' or 'send-round' the other way around
+       ;; should be acceptable as well.
+       (send-round)
+       (wait!)
+       (loop)))
+
+    ;; TODO connecting to TCP ports, Unix domain sockets ...?
+    ))
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
new file mode 100644
index 0000000..fa70a1d
--- /dev/null
+++ b/tests/mq-stream.scm
@@ -0,0 +1,234 @@
+;; This file is part of scheme-GNUnet.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; scheme-GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; scheme-GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+(use-modules (gnu gnunet mq-impl stream)
+            (gnu gnunet mq)
+            (gnu gnunet mq handler)
+            (gnu gnunet utils hat-let)
+            (gnu gnunet utils bv-slice)
+            (gnu gnunet concurrency repeated-condition)
+            (fibers conditions)
+            (fibers operations)
+            (fibers)
+            (rnrs bytevectors)
+            ((rnrs io ports) #:select (open-bytevector-input-port))
+            ((rnrs base) #:select (assert))
+            (srfi srfi-26)
+            (srfi srfi-43)
+            (rnrs io ports)
+            (ice-9 binary-ports)
+            (ice-9 suspendable-ports)
+            (ice-9 control))
+
+(define (no-sender . _)
+  (error "no sender!"))
+
+(define no-handlers (message-handlers))
+
+(define (no-error-handler . _)
+  (error "no error handler!"))
+
+(test-begin "mq-stream")
+
+(define (check-slice-equal slice bv)
+  (let^ ((!! (assert (= (slice-length slice)
+                       (bytevector-length bv))))
+        (! slice-copy (make-bytevector (slice-length slice)))
+        (! copy (bv-slice/read-write slice-copy))
+        (<-- () (slice-copy! slice copy))
+        (!! (bytevector=? slice-copy bv)))
+       (values)))
+
+;; Without interposition, and the verifier always
+;; returns #t.
+(define (simple-handler type handle)
+  (make-message-handler
+   type
+   (lambda (thunk) (thunk))
+   (const #t)
+   handle))
+
+(test-assert "messages + eof are injected in-order"
+  (let^ ((! input/bv #vu8(0 4 0 1 ; Message type 1, size 4
+                           0 5 0 2 1 ; Message type 2, size 6
+                           0 6 0 3 2 1)) ; Message type 3, size 7
+        (! input (open-bytevector-input-port input/bv))
+        (! received 0)
+        (! (make-handler type expected-received expected-bv)
+           (simple-handler
+            type
+            (lambda (slice)
+              (assert (equal? received expected-received))
+              (check-slice-equal slice expected-bv)
+              (set! received (+ 1 received)))))
+        (! handler/1 (make-handler 1 0 #vu8(0 4 0 1)))
+        (! handler/2 (make-handler 2 1 #vu8(0 5 0 2 1)))
+        (! handler/3 (make-handler 3 2 #vu8(0 6 0 3 2 1)))
+        (! handlers
+           (message-handlers handler/1 handler/2 handler/3))
+        (! (error-handler . arguments)
+           (assert (equal? received 3))
+           (assert (equal? arguments '(input:regular-end-of-file)))
+           (set! received 'end-of-file))
+        (! mq (make-message-queue handlers error-handler no-sender))
+        (<-- () (handle-input! mq input)))
+       ;; TODO: should the port be closed?
+       (assert (equal? received 'end-of-file))))
+
+(test-assert "overly small message is detected (--> stop)"
+  (let^ ((! input/bv #vu8(0 4 0 0 ; Message type 0, size 4
+                           0 3 9 ; Overly small message, size 3, type != 0
+                           0 4 0 1)) ; Message type 1, size 4
+        ;; The first message is well-formatted and should therefore
+        ;; be injected.  The second one isn't, so an appropriate error should
+        ;; injected.  Then the message stream is broken, so the third
+        ;; message shouldn't be injected.
+        (! input (open-bytevector-input-port input/bv))
+        (! received 0)
+        (! handler/0
+           (simple-handler 0
+                           (lambda (slice)
+                             (assert (equal? received 0))
+                             (check-slice-equal slice #vu8(0 4 0 0))
+                             (set! received 1))))
+        (! handlers
+           (message-handlers handler/0))
+        (! (error-handler . arguments)
+           (assert (equal? received 1))
+           ;; Whether this malformed even has a message type is dubious,
+           ;; but if it has one, it will be (* 256 9).
+           (assert (equal? arguments `(input:overly-small ,(* 256 9) 3)))
+           (set! received 'overly-small))
+        (! mq (make-message-queue handlers error-handler no-sender))
+        (<-- () (handle-input! mq input)))
+       (assert (equal? received 'overly-small))))
+
+(test-assert "premature eof is detected (--> stop)"
+  (let^ ((! input/bv #vu8(0 8 7 6 5 4))
+        (! input (open-bytevector-input-port input/bv))
+        (! received #f)
+        (! (error-handler . arguments)
+           (assert (eq? received #f))
+           (assert (equal? arguments '(input:premature-end-of-file)))
+           (set! received #t))
+        (! mq (make-message-queue no-handlers error-handler no-sender))
+        (<-- () (handle-input! mq input)))
+       (assert (equal? received #t))))
+
+(test-equal "envelopes are written (no blocking)"
+  ;; Three messages
+  #vu8(0 4 0 1
+        0 4 0 2
+        0 4 0 3)
+  (let^ ((! messages #(#vu8(0 4 0 1)
+                          #vu8(0 4 0 2)
+                          #vu8(0 4 0 3)))
+        (<-- (port get-bytevector) (open-bytevector-output-port))
+        (! mq (make-message-queue no-handlers no-error-handler
+                                  (lambda (_) (values))))
+        (! (insert-message index message)
+           (send-message! mq (slice/read-only (bv-slice/read-write message))))
+        (<-- ()
+             (begin
+               (vector-for-each insert-message messages)
+               (values)))
+        (<-- ()
+             ;; The implementation detail that 'send-round'
+             ;; is called before 'wait!' is assumed here.
+             (let/ec ec
+               (handle-output! mq port ec)
+               (error "unreachable"))))
+       (get-bytevector)))
+
+(define (blocking-output-port port . block-positions)
+  (define (close)
+    (close-port port))
+  (define (write! bv index length)
+    (define p (port-position port))
+    (if (or (null? block-positions)
+           (< (+ p length) (car block-positions)))
+       (begin (put-bytevector port bv index length) length)
+       (let ((short (- (car block-positions) p)))
+         (put-bytevector port bv index short)
+         ((current-write-waiter) port/blocking)
+         (set! block-positions (cdr block-positions))
+         short)))
+  (define port/blocking
+    (make-custom-binary-output-port "" write! #f #f close))
+  (setvbuf port/blocking 'none)
+  port/blocking)
+
+;; The ‘blocking’ is to make this test case more interesting.
+;; It does not currently have any effect, but it is expected
+;; that the implementation of handle-output! will be changed
+;; to react to blocking, for implementing message queue
+;; shutdown.
+
+(test-equal "repeatable conditions can be used (blocking)"
+  '(#vu8(0 4 0 1   0 4 0 2) . 4) ; 4: number of times writing blocks
+  (let^ ((! rcvar (make-repeated-condition))
+        (! stop? (make-condition))
+        (! stopped? (make-condition))
+        (! (interrupt! mq)
+           (trigger-condition! rcvar))
+        (! escape/output (make-parameter #f))
+        (<-- (out/internal get-bytevector)
+             (open-bytevector-output-port))
+        ;; block writing a few times
+        (! out (blocking-output-port out/internal 0 1 3 7))
+        (! (wait!)
+           (perform-operation
+            (apply choice-operation
+                   (prepare-await-trigger! rcvar)
+                   (if (>= 8 (port-position out/internal))
+                       (list (wrap-operation
+                              (wait-operation stop?)
+                              (lambda () ((escape/output)))))
+                       '()))))
+        (! mq (make-message-queue no-handlers no-error-handler interrupt!))
+        (! n/blocked 0)
+        (! message/1 #vu8(0 4 0 1))
+        (! message/2 #vu8(0 4 0 2)))
+       (run-fibers
+        (lambda ()
+          (spawn-fiber
+           (lambda ()
+             (let/ec ec
+               (parameterize ((escape/output ec)
+                              (current-write-waiter
+                               (lambda (port)
+                                 (cond ((eq? port out)
+                                        (set! n/blocked (+ n/blocked 1)))
+                                       ((file-port? port)
+                                        ;; XXX ‘Attempt to suspend fiber within
+                                        ;; continuaton barrier’
+                                        #;((@@ (fibers) wait-for-writable) 
port)
+                                        (select '() (list port) '()))))))
+                 (handle-output! mq out wait!)))
+             (signal-condition! stopped?)))
+          (send-message! mq (bv-slice/read-write message/1))
+          (sleep 0.001)
+          (send-message! mq (bv-slice/read-write message/2))
+          (sleep 0.001)
+          (signal-condition! stop?)
+          (wait stopped?)
+          (cons (get-bytevector) n/blocked))
+        #:parallelism 1
+        #:hz 0)))
+
+(test-end "mq-stream")

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]