gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 169/324: mq-stream: Allow turning ports into message que


From: gnunet
Subject: [gnunet-scheme] 169/324: mq-stream: Allow turning ports into message queues.
Date: Tue, 21 Sep 2021 13:23:29 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit 379573af9041f68a47912a08d1b6e824aadb6f4f
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Fri Aug 13 11:51:25 2021 +0200

    mq-stream: Allow turning ports into message queues.
    
    * README.org (Message queues): Note existence of
      'port->message-queue'.
    * gnu/gnunet/mq-impl/stream.scm (port->message-queue): New procedure.
    * tests/mq-stream.scm
      ("port->message-queue, can send/receive between pairs"): New test.
---
 README.org                    |  4 +++-
 gnu/gnunet/mq-impl/stream.scm | 21 ++++++++++++++++++++-
 tests/mq-stream.scm           | 44 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 67 insertions(+), 2 deletions(-)

diff --git a/README.org b/README.org
index a50e590..0ab13ca 100644
--- a/README.org
+++ b/README.org
@@ -96,7 +96,9 @@
      the ambient authority appropriately.
    + gnu/gnunet/mq.scm: the message queue itself!
    + gnu/gnunet/mq-impl/stream.scm: generic implementation on top of
-     Guile's port abstraction.  Use 'connect-unix' to actually connect.
+     Guile's port abstraction.  Use 'connect-unix' to actually connect,
+     or 'port->message-queue' to turn an already existing port into a
+     message queue.'
 
    + TODO actual queues?  Maybe we don't need them?
    + TODO filling the queues
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index 9762fa0..0686df7 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -24,7 +24,8 @@
 ;; sockets.
 
 (define-library (gnu gnunet mq-impl stream)
-  (export write-envelope! handle-input! handle-output! connect/fibers)
+  (export write-envelope! handle-input! handle-output!
+         port->message-queue connect/fibers)
   (import (only (gnu gnunet mq)
                make-one-by-one-sender inject-message! inject-error!
                make-message-queue)
@@ -225,6 +226,24 @@ an appropriate @code{&undefined-key-error} is raised."
                socket
                (retry))))
 
+    (define* (port->message-queue port handlers error-handler
+                                 #:key (spawn spawn-fiber))
+      "Create a message queue sending and receiving data over @var{port}.
+
+This creates some fibers with @var{spawn} (@code{spawn-fiber} by default).
+As such, @var{port} must be non-blocking if @code{spawn-fiber} is used."
+      ;; TODO: closing message queues
+      (define rcvar (make-repeated-condition))
+      (define (interrupt! mq)
+       (trigger-condition! rcvar))
+      (define wait! (cut await-trigger! rcvar))
+      (define mq (make-message-queue handlers error-handler interrupt!))
+      (spawn (lambda ()
+              (handle-input! mq port)))
+      (spawn (lambda ()
+              (handle-output! mq port wait!)))
+      mq)
+
     (define* (connect/fibers config service-name handlers error-handler
                             #:key (spawn spawn-fiber))
       "Create a message queue that will be connected in the background
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index 1fce3f5..ec3db00 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -26,6 +26,7 @@
             (fibers conditions)
             (fibers operations)
             (fibers)
+            ((rnrs arithmetic bitwise) #:select (bitwise-ior))
             (rnrs bytevectors)
             ((rnrs io ports) #:select (open-bytevector-input-port))
             ((rnrs base) #:select (assert))
@@ -402,4 +403,47 @@
      (wait connected?)
      (test-connection mq listening-sock))))
 
+(test-assert "port->message-queue, can send/receive between pairs"
+  (run-fibers
+   (lambda ()
+     ;; Create two message queues connected to each other
+     ;; over a socket pair.  Send '1' over the first message queue
+     ;; and expect to receive it from the second queue, and send '0'
+     ;; over the second message queue and expect to receive it from
+     ;; the first.
+     (define pair (socketpair AF_UNIX SOCK_STREAM 0))
+     ;; As 'fibers' is used instead of POSIX threads, set O_NONBLOCK.
+     (fcntl (car pair) F_SETFL
+           (bitwise-ior (fcntl (car pair) F_GETFL) O_NONBLOCK))
+     (fcntl (cdr pair) F_SETFL
+           (bitwise-ior (fcntl (cdr pair) F_GETFL) O_NONBLOCK))
+     (define received/0 #f)
+     (define received/1 #f)
+     (define done/0 (make-condition))
+     (define done/1 (make-condition))
+     (define handlers/0
+       (message-handlers
+       (simple-handler 0
+                       (lambda (slice)
+                         (assert (not received/0))
+                         (check-slice-equal slice #vu8(0 4 0 0))
+                         (set! received/0 #t)
+                         (signal-condition! done/0)))))
+     (define handlers/1
+       (message-handlers
+       (simple-handler 1
+                       (lambda (slice)
+                         (assert (not received/1))
+                         (check-slice-equal slice #vu8(0 4 0 1))
+                         (set! received/1 #t)
+                         (signal-condition! done/1)))))
+     (define error-handler no-error-handler)
+     (define mq/0 (port->message-queue (car pair) handlers/0 error-handler))
+     (define mq/1 (port->message-queue (cdr pair) handlers/1 error-handler))
+     (send-message! mq/0 (bv-slice/read-write #vu8(0 4 0 1)))
+     (send-message! mq/1 (bv-slice/read-write #vu8(0 4 0 0)))
+     (wait done/0)
+     (wait done/1)
+     #t)))
+
 (test-end "mq-stream")

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]