[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 165/324: mq-impl/stream: Implement connecting to unix so
From: |
gnunet |
Subject: |
[gnunet-scheme] 165/324: mq-impl/stream: Implement connecting to unix sockets. |
Date: |
Tue, 21 Sep 2021 13:23:25 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit cb3cd67b31a03d2acb40c7d64528865a69f46ab8
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Thu Aug 12 16:04:44 2021 +0200
mq-impl/stream: Implement connecting to unix sockets.
* README.org
(Message queues): Note existence of 'connect-unix'.
(List of errors): Mention 'connection:connected'.
* gnu/gnunet/mq-impl/stream.scm
(%path-resolution-errors): New variable.
(connect-unix): New procedure.
(connect/fibers): New procedure.
* tests/mq-stream.scm
(call-with-temporary-directory)
(make-config)
(call-with-socket-location)
(connect/test)
(alist->hash-table)
(test-connection)
(yield-many): New procedures.
("connect-unix, can connect when socket is already listening")
("connect-unix, will connect when socket is listening")
("connect-unix, will connect when socket is bound (and listening)")
("connect-unix, will connect even if there's an old socket lying
around")
("connect-unix, will connect even if previous socket is different
type")
("connect-unix, will connect even if permissions are temporarily
wrong"): New tests.
---
README.org | 10 ++-
gnu/gnunet/mq-impl/stream.scm | 138 +++++++++++++++++++++++++++++++--
tests/mq-stream.scm | 173 +++++++++++++++++++++++++++++++++++++++++-
3 files changed, 313 insertions(+), 8 deletions(-)
diff --git a/README.org b/README.org
index a78b48a..2991d4c 100644
--- a/README.org
+++ b/README.org
@@ -96,7 +96,7 @@
the ambient authority appropriately.
+ gnu/gnunet/mq.scm: the message queue itself!
+ gnu/gnunet/mq-impl/stream.scm: generic implementation on top of
- Guile's port abstraction.
+ Guile's port abstraction. Use 'connect-unix' to actually connect.
+ TODO actual queues? Maybe we don't need them?
+ TODO filling the queues
@@ -122,6 +122,14 @@
The message size in the header was smaller than the minimal
message size.
+ Connection errors:
+ + connection:connected
+
+ The message queue has been connected,
+ so sent messages shouldn't simply be buffered.
+ Injected by (gnu gnunet mq-impl stream).
+ + TODO connection:lost
+
Input errors (decoding)
+ TODO verification failed, unknown message type
** Configuration :test:good:
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index dcc5706..9762fa0 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -24,9 +24,12 @@
;; sockets.
(define-library (gnu gnunet mq-impl stream)
- (export write-envelope! handle-input! handle-output!)
+ (export write-envelope! handle-input! handle-output! connect/fibers)
(import (only (gnu gnunet mq)
- make-one-by-one-sender inject-message! inject-error!)
+ make-one-by-one-sender inject-message! inject-error!
+ make-message-queue)
+ (only (gnu gnunet concurrency repeated-condition)
+ make-repeated-condition await-trigger! trigger-condition!)
(only (gnu gnunet utils bv-slice)
slice-bv slice-offset slice-length
slice-readable? bv-slice/read-write
@@ -35,15 +38,40 @@
attempt-irrevocable-sent!)
(only (gnu gnunet utils tokeniser)
make-tokeniser add-from-port!)
+ (only (gnu gnunet config db)
+ read-value)
(only (gnu gnunet utils hat-let)
let^)
+ (only (gnu gnunet util time)
+ standard-back-off time-unit:second)
(only (rnrs base)
begin define let values quote
- assert)
+ assert = or and eq?
+ list if lambda car /)
+ (only (rnrs arithmetic bitwise)
+ bitwise-ior)
+ (only (rnrs exceptions)
+ guard)
+ (only (fibers)
+ sleep spawn-fiber)
+ (only (fibers conditions)
+ make-condition signal-condition! wait)
+ (srfi srfi-26)
(only (guile)
- error)
+ error define* identity
+ EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG EAGAIN ECONNREFUSED
+ EPROTOTYPE
+ PF_UNIX SOCK_STREAM F_GETFD F_SETFD F_GETFL F_SETFL FD_CLOEXEC
+ O_NONBLOCK AF_UNIX
+ socket connect fcntl
+ make-socket-address
+ exception-args exception-kind)
(only (rnrs io ports)
put-bytevector)
+ (only (ice-9 atomic)
+ make-atomic-box atomic-box-set! atomic-box-ref)
+ (only (srfi srfi-1)
+ memv list-ref)
(srfi srfi-26))
(begin
(define (write-envelope! output envelope)
@@ -128,5 +156,103 @@ TODO: closing, destroying @var{mq}, @var{output}."
(wait!)
(loop)))
- ;; TODO connecting to TCP ports, Unix domain sockets ...?
- ))
+ ;; See, e.g., the Linux man page path_resolution(7).
+ (define %path-resolution-errors
+ (list EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG))
+
+ (define (connect-unix config service-name)
+ "Try connecting to the server using UNIX domain sockets.
+
+On success, the socket is returned. If the server has bound
+the socket but is not yet listening, wait a little and retry.
+If the socket file does not yet exist, wait until it does exist
+and retry. It is assumed the file name of the socket is set
+in the configuration @var{config}. If it is not set there,
+an appropriate @code{&undefined-key-error} is raised."
+ ;; TODO: use a mechanism like 'inotify' instead of sleeping
+ ;; when the socket file does not exist.
+ (let^ ((! unix-path
+ (read-value identity config service-name "UNIXPATH"))
+ (! address (make-socket-address PF_UNIX unix-path))
+ ;; TODO: maybe catch ENOMEM, ENOBUFS, EMFILE ...
+ (! socket (socket PF_UNIX SOCK_STREAM 0))
+ (_ (fcntl socket F_SETFD
+ (bitwise-ior (fcntl socket F_GETFD) FD_CLOEXEC)))
+ (_ (fcntl socket F_SETFL
+ (bitwise-ior (fcntl socket F_GETFL) O_NONBLOCK)))
+ ;; Grrr why can't we just use 'select' on socket to wait
+ ;; for the connection to complete, like with Internet sockets?
+ (/o/ retry (timeout (standard-back-off 0)))
+ (! (retry)
+ (sleep (/ timeout time-unit:second))
+ (retry (standard-back-off timeout)))
+ ;; 'system-error-errno' returns #f if 'condition'
+ ;; is not a 'system-error'.
+ (! (eagain? errno)
+ (= EAGAIN errno))
+ (! (connection-refused? errno)
+ (= ECONNREFUSED errno))
+ (! (path-resolution-error? errno)
+ (memv errno %path-resolution-errors))
+ (! (wrong-type-socket-error? errno)
+ (= EPROTOTYPE errno))
+ (! (retry-errno? errno)
+ ;; On Linux, EAGAIN can happen if the receive queue
+ ;; of the listening socket is full. I.e., the listening
+ ;; is being connected to more frequently than the corresponding
+ ;; proces can keep up with. This situation should resolve
+ ;; itself automatically.
+ (or (eagain? errno)
+ ;; This can happen if the listening socket is
+ ;; not actually listening yet. Give the
+ ;; corresponding process a little more time.
+ (connection-refused? errno)
+ ;; See "connect-unix, will connect even if previous socket
+ ;; is different type" test case.
+ (wrong-type-socket-error? errno)
+ ;; Give the process implementing the process some
+ ;; time to set up directory structures, set the
+ ;; permissions appropriately ...
+ (path-resolution-error? errno)))
+ (! ok? (guard (c ((and (eq? (exception-kind c) 'system-error)
+ (retry-errno?
+ (car (list-ref (exception-args c) 3))))
+ #f))
+ (connect socket AF_UNIX unix-path))))
+ ;; Guile returns #f if SOCKET is non-blocking
+ ;; and the connection cannot be made immediately.
+ (if ok?
+ socket
+ (retry))))
+
+ (define* (connect/fibers config service-name handlers error-handler
+ #:key (spawn spawn-fiber))
+ "Create a message queue that will be connected in the background
+to a GNUnet service @var{service-name}. The message queue can be used
+before the message queue is connected; any send messages will be buffered
+until they can be sent. Some fibers may be created with @var{spawn}
+(@code{spawn-fiber} by default).
+
+When the connection has been established, the error @code{connection:connected}
+(a symbol) is injected into the message queue."
+ ;; TODO closing message queues
+ (define socket/box (make-atomic-box #f))
+ (define rcvar (make-repeated-condition))
+ (define (interrupt! mq)
+ (trigger-condition! rcvar))
+ (define wait! (cut await-trigger! rcvar))
+ (define socket-connected? (make-condition))
+ (let ((mq (make-message-queue handlers error-handler interrupt!)))
+ (spawn (lambda ()
+ (wait socket-connected?)
+ (handle-input! mq (atomic-box-ref socket/box))))
+ (spawn (lambda ()
+ (wait socket-connected?)
+ (handle-output! mq (atomic-box-ref socket/box) wait!)))
+ (spawn (lambda ()
+ (atomic-box-set! socket/box
+ (connect-unix config service-name))
+ (inject-error! mq 'connection:connected)
+ (signal-condition! socket-connected?)))
+ mq))))
+
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index fa70a1d..1fce3f5 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -21,6 +21,7 @@
(gnu gnunet mq handler)
(gnu gnunet utils hat-let)
(gnu gnunet utils bv-slice)
+ (gnu gnunet config db)
(gnu gnunet concurrency repeated-condition)
(fibers conditions)
(fibers operations)
@@ -28,12 +29,15 @@
(rnrs bytevectors)
((rnrs io ports) #:select (open-bytevector-input-port))
((rnrs base) #:select (assert))
+ (rnrs hashtables)
(srfi srfi-26)
(srfi srfi-43)
(rnrs io ports)
(ice-9 binary-ports)
(ice-9 suspendable-ports)
- (ice-9 control))
+ (ice-9 control)
+ (ice-9 match)
+ (ice-9 threads))
(define (no-sender . _)
(error "no sender!"))
@@ -231,4 +235,171 @@
#:parallelism 1
#:hz 0)))
+(define (call-with-temporary-directory proc)
+ (let ((file (mkdtemp (in-vicinity (or (getenv "TMPDIR") "/tmp")
+ "test-XXXXXX"))))
+ (with-exception-handler
+ (lambda (e)
+ (system* "rm" "-r" file)
+ (raise-exception e))
+ (lambda ()
+ (call-with-values
+ (lambda () (proc file))
+ (lambda the-values
+ (system* "rm" "-r" file)
+ (apply values the-values)))))))
+
+(define (make-config where)
+ (hash->configuration
+ (alist->hash-table
+ `((("service" . "UNIXPATH") . ,where)))))
+
+(define (call-with-socket-location proc)
+ (call-with-temporary-directory
+ (lambda (dir)
+ (define where (in-vicinity dir "sock.et"))
+ (define config (make-config where))
+ (proc where config))))
+
+(define (connect/test config connected?)
+ (define (error-handler . error)
+ (match error
+ ;; XXX correct documentation: multiple end-of-files
+ ;; are perfectly possible.
+ (('input:regular-end-of-file) (values))
+ (('connection:connected) (signal-condition! connected?))))
+ (connect/fibers config "service" no-handlers error-handler
+ #:spawn call-with-new-thread))
+
+(define (alist->hash-table alist)
+ (define h (make-hashtable (lambda (key) 0) equal?))
+ (define (insert! key+value)
+ (hashtable-set! h (car key+value) (cdr key+value)))
+ (for-each insert! alist)
+ h)
+
+(define (test-connection mq server-sock)
+ (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
+ (let ((client (car (accept server-sock))))
+ (assert (equal? #vu8(0 4 0 0) (get-bytevector-n client 4)))
+ #t))
+
+(define (yield-many)
+ ;; Give the new threads some time to run before binding the socket.
+ ;; This allowed a bug in the use of 'connect' to be detected.
+ (let loop ((n (* 8 (+ 1 (length (all-threads))))))
+ (when (> n 0)
+ (yield)
+ (loop (- n 1)))))
+
+(test-assert "connect-unix, can connect when socket is already listening"
+ (call-with-socket-location
+ (lambda (where config)
+ (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+ (define connected? (make-condition))
+ (bind listening-sock AF_UNIX where)
+ (listen listening-sock 1)
+ (define mq (connect/test config connected?))
+ (wait connected?)
+ (test-connection mq listening-sock))))
+
+;; Consider the case where a service starts, has bound its socket
+;; but is not yet listening, and a client connects.
+(test-assert "connect-unix, will connect when socket is listening"
+ (call-with-socket-location
+ (lambda (where config)
+ (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+ (define connected? (make-condition))
+ (bind listening-sock AF_UNIX where)
+ (define mq (connect/test config connected?))
+ (yield-many)
+ (listen listening-sock 1)
+ (wait connected?)
+ (test-connection mq listening-sock))))
+
+;; Consider the case where a client starts before a service.
+(test-assert "connect-unix, will connect when socket is bound (and listening)"
+ (call-with-socket-location
+ (lambda (where config)
+ (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+ (define connected? (make-condition))
+ (define mq (connect/test config connected?))
+ (yield-many)
+ (bind listening-sock AF_UNIX where)
+ (listen listening-sock 1)
+ (wait connected?)
+ (test-connection mq listening-sock))))
+
+;; Consider the case where a service starts and stops,
+;; a client connects and the service restarts.
+(test-assert
+ "connect-unix, will connect even if there's an old socket lying around"
+ (call-with-socket-location
+ (lambda (where config)
+ (let ((old-sock (socket PF_UNIX SOCK_STREAM 0)))
+ (bind old-sock AF_UNIX where)
+ (close-port old-sock))
+ (define connected? (make-condition))
+ (define mq (connect/test config connected?))
+ (yield-many)
+ (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+ (yield-many)
+ ;; Delete the old socket, otherwise the 'bind' below results in ‘address
alreay in use’
+ (delete-file where)
+ (yield-many)
+ (bind listening-sock AF_UNIX where)
+ (yield-many)
+ (listen listening-sock 1)
+ (wait connected?)
+ (test-connection mq listening-sock))))
+
+;; Consider the case where GNUnet version N uses stream sockets,
+;; GNUnet version M uses datagram sockets, the system initially
+;; uses GNUnet version N, a client for version M is started
+;; (initially failing to connect to the server), then the system
+;; switches to GNUnet version M.
+(test-assert
+ "connect-unix, will connect even if previous socket is different type"
+ (call-with-socket-location
+ (lambda (where config)
+ (define old-sock (socket PF_UNIX SOCK_DGRAM 0))
+ (bind old-sock AF_UNIX where)
+ ;; Datagram sockets don't support 'listen', so don't
+ ;; call 'listen' with 'old-sock'.
+ (define connected? (make-condition))
+ (define mq (connect/test config connected?))
+ (yield-many)
+ (close-port old-sock)
+ (delete-file where)
+ (define new-sock (socket PF_UNIX SOCK_STREAM 0))
+ (bind new-sock AF_UNIX where)
+ (listen new-sock 1)
+ (wait connected?)
+ (test-connection mq new-sock))))
+
+;; Consider a system that creates directories and the socket
+;; with world-unreadable, world-unexecutable permissions at
+;; first and makes the permissions more permissive later.
+(test-assert
+ "connect-unix, will connect even if permissions are temporarily wrong"
+ (call-with-temporary-directory
+ (lambda (tmpdir)
+ ;; Permissions on sockets can be unreliable on some systems,
+ ;; so modify the permissions of a directory instead.
+ (define subdir (in-vicinity tmpdir "dir"))
+ (mkdir subdir)
+ (define where (in-vicinity subdir "sock.et"))
+ (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+ (bind listening-sock AF_UNIX where)
+ (listen listening-sock 1)
+ (chmod subdir #o000) ; unreadable
+ (define connected? (make-condition))
+ (define mq (connect/test (make-config where) connected?))
+ (yield-many)
+ ;; make it readable again
+ ;; (and writable such that 'tmpdir' can be deleted).
+ (chmod subdir #o700)
+ (wait connected?)
+ (test-connection mq listening-sock))))
+
(test-end "mq-stream")
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 163/324: util/time: Add time units and implement bounded exponential back-off., (continued)
- [gnunet-scheme] 163/324: util/time: Add time units and implement bounded exponential back-off., gnunet, 2021/09/21
- [gnunet-scheme] 169/324: mq-stream: Allow turning ports into message queues., gnunet, 2021/09/21
- [gnunet-scheme] 177/324: nse/client: Implement connecting to the NSE service., gnunet, 2021/09/21
- [gnunet-scheme] 181/324: ROADMAP: Start a TODO list for version 0.1., gnunet, 2021/09/21
- [gnunet-scheme] 183/324: tests/utils: Use a better hash function., gnunet, 2021/09/21
- [gnunet-scheme] 182/324: doc: Document the asynchronuousity of connecting., gnunet, 2021/09/21
- [gnunet-scheme] 189/324: ROADMAP: Mark ‘Document NSE’ as done, gnunet, 2021/09/21
- [gnunet-scheme] 188/324: nse/client: Document the optionality of callbacks., gnunet, 2021/09/21
- [gnunet-scheme] 187/324: doc: Document the ‘network size estimation’ API., gnunet, 2021/09/21
- [gnunet-scheme] 197/324: mq-impl/stream: Stop all fibers when EOF is reached (part 2)., gnunet, 2021/09/21
- [gnunet-scheme] 165/324: mq-impl/stream: Implement connecting to unix sockets.,
gnunet <=
- [gnunet-scheme] 171/324: util/struct: Define /time-absolute., gnunet, 2021/09/21
- [gnunet-scheme] 168/324: README: Remove paragraph about avoiding callbacks., gnunet, 2021/09/21
- [gnunet-scheme] 175/324: mq/handler: Handle the case where no handler exists., gnunet, 2021/09/21
- [gnunet-scheme] 178/324: Makefile.am: Compile with more optimisations., gnunet, 2021/09/21
- [gnunet-scheme] 174/324: nse/struct: Add missing imports., gnunet, 2021/09/21
- [gnunet-scheme] 186/324: nse: Allow 'updated' to be absent., gnunet, 2021/09/21
- [gnunet-scheme] 166/324: guix: Use fixed version of guile., gnunet, 2021/09/21
- [gnunet-scheme] 176/324: tests/utils: New utilities for tests., gnunet, 2021/09/21
- [gnunet-scheme] 172/324: crypto/struct: Define /ecc-signature-purpose., gnunet, 2021/09/21
- [gnunet-scheme] 179/324: nse/struct: Document 'timestamp' field of estimates., gnunet, 2021/09/21