gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 165/324: mq-impl/stream: Implement connecting to unix so


From: gnunet
Subject: [gnunet-scheme] 165/324: mq-impl/stream: Implement connecting to unix sockets.
Date: Tue, 21 Sep 2021 13:23:25 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit cb3cd67b31a03d2acb40c7d64528865a69f46ab8
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Thu Aug 12 16:04:44 2021 +0200

    mq-impl/stream: Implement connecting to unix sockets.
    
    * README.org
      (Message queues): Note existence of 'connect-unix'.
      (List of errors): Mention 'connection:connected'.
    * gnu/gnunet/mq-impl/stream.scm
      (%path-resolution-errors): New variable.
      (connect-unix): New procedure.
      (connect/fibers): New procedure.
    * tests/mq-stream.scm
      (call-with-temporary-directory)
      (make-config)
      (call-with-socket-location)
      (connect/test)
      (alist->hash-table)
      (test-connection)
      (yield-many): New procedures.
      ("connect-unix, can connect when socket is already listening")
      ("connect-unix, will connect when socket is listening")
      ("connect-unix, will connect when socket is bound (and listening)")
      ("connect-unix, will connect even if there's an old socket lying
      around")
      ("connect-unix, will connect even if previous socket is different
      type")
      ("connect-unix, will connect even if permissions are temporarily
      wrong"): New tests.
---
 README.org                    |  10 ++-
 gnu/gnunet/mq-impl/stream.scm | 138 +++++++++++++++++++++++++++++++--
 tests/mq-stream.scm           | 173 +++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 313 insertions(+), 8 deletions(-)

diff --git a/README.org b/README.org
index a78b48a..2991d4c 100644
--- a/README.org
+++ b/README.org
@@ -96,7 +96,7 @@
      the ambient authority appropriately.
    + gnu/gnunet/mq.scm: the message queue itself!
    + gnu/gnunet/mq-impl/stream.scm: generic implementation on top of
-     Guile's port abstraction.
+     Guile's port abstraction.  Use 'connect-unix' to actually connect.
 
    + TODO actual queues?  Maybe we don't need them?
    + TODO filling the queues
@@ -122,6 +122,14 @@
      The message size in the header was smaller than the minimal
      message size.
 
+   Connection errors:
+   + connection:connected
+
+     The message queue has been connected,
+     so sent messages shouldn't simply be buffered.
+     Injected by (gnu gnunet mq-impl stream).
+   + TODO connection:lost
+
    Input errors (decoding)
    + TODO verification failed, unknown message type
 ** Configuration                                                  :test:good:
diff --git a/gnu/gnunet/mq-impl/stream.scm b/gnu/gnunet/mq-impl/stream.scm
index dcc5706..9762fa0 100644
--- a/gnu/gnunet/mq-impl/stream.scm
+++ b/gnu/gnunet/mq-impl/stream.scm
@@ -24,9 +24,12 @@
 ;; sockets.
 
 (define-library (gnu gnunet mq-impl stream)
-  (export write-envelope! handle-input! handle-output!)
+  (export write-envelope! handle-input! handle-output! connect/fibers)
   (import (only (gnu gnunet mq)
-               make-one-by-one-sender inject-message! inject-error!)
+               make-one-by-one-sender inject-message! inject-error!
+               make-message-queue)
+         (only (gnu gnunet concurrency repeated-condition)
+               make-repeated-condition await-trigger! trigger-condition!)
          (only (gnu gnunet utils bv-slice)
                slice-bv slice-offset slice-length
                slice-readable? bv-slice/read-write
@@ -35,15 +38,40 @@
                attempt-irrevocable-sent!)
          (only (gnu gnunet utils tokeniser)
                make-tokeniser add-from-port!)
+         (only (gnu gnunet config db)
+               read-value)
          (only (gnu gnunet utils hat-let)
                let^)
+         (only (gnu gnunet util time)
+               standard-back-off time-unit:second)
          (only (rnrs base)
                begin define let values quote
-               assert)
+               assert = or and eq?
+               list if lambda car /)
+         (only (rnrs arithmetic bitwise)
+               bitwise-ior)
+         (only (rnrs exceptions)
+               guard)
+         (only (fibers)
+               sleep spawn-fiber)
+         (only (fibers conditions)
+               make-condition signal-condition! wait)
+         (srfi srfi-26)
          (only (guile)
-               error)
+               error define* identity
+               EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG EAGAIN ECONNREFUSED
+               EPROTOTYPE
+               PF_UNIX SOCK_STREAM F_GETFD F_SETFD F_GETFL F_SETFL FD_CLOEXEC
+               O_NONBLOCK AF_UNIX
+               socket connect fcntl
+               make-socket-address
+               exception-args exception-kind)
          (only (rnrs io ports)
                put-bytevector)
+         (only (ice-9 atomic)
+               make-atomic-box atomic-box-set! atomic-box-ref)
+         (only (srfi srfi-1)
+               memv list-ref)
          (srfi srfi-26))
   (begin
     (define (write-envelope! output envelope)
@@ -128,5 +156,103 @@ TODO: closing, destroying @var{mq}, @var{output}."
        (wait!)
        (loop)))
 
-    ;; TODO connecting to TCP ports, Unix domain sockets ...?
-    ))
+    ;; See, e.g., the Linux man page path_resolution(7).
+    (define %path-resolution-errors
+      (list EACCES ENOENT ENOTDIR ELOOP ENAMETOOLONG))
+
+    (define (connect-unix config service-name)
+      "Try connecting to the server using UNIX domain sockets.
+
+On success, the socket is returned.  If the server has bound
+the socket but is not yet listening, wait a little and retry.
+If the socket file does not yet exist, wait until it does exist
+and retry.  It is assumed the file name of the socket is set
+in the configuration @var{config}.  If it is not set there,
+an appropriate @code{&undefined-key-error} is raised."
+      ;; TODO: use a mechanism like 'inotify' instead of sleeping
+      ;; when the socket file does not exist.
+      (let^ ((! unix-path
+               (read-value identity config service-name "UNIXPATH"))
+            (! address (make-socket-address PF_UNIX unix-path))
+            ;; TODO: maybe catch ENOMEM, ENOBUFS, EMFILE ...
+            (! socket (socket PF_UNIX SOCK_STREAM 0))
+            (_ (fcntl socket F_SETFD
+                      (bitwise-ior (fcntl socket F_GETFD) FD_CLOEXEC)))
+            (_ (fcntl socket F_SETFL
+                      (bitwise-ior (fcntl socket F_GETFL) O_NONBLOCK)))
+            ;; Grrr why can't we just use 'select' on socket to wait
+            ;; for the connection to complete, like with Internet sockets?
+            (/o/ retry (timeout (standard-back-off 0)))
+            (! (retry)
+               (sleep (/ timeout time-unit:second))
+               (retry (standard-back-off timeout)))
+            ;; 'system-error-errno' returns #f if 'condition'
+            ;; is not a 'system-error'.
+            (! (eagain? errno)
+               (= EAGAIN errno))
+            (! (connection-refused? errno)
+               (= ECONNREFUSED errno))
+            (! (path-resolution-error? errno)
+               (memv errno %path-resolution-errors))
+            (! (wrong-type-socket-error? errno)
+               (= EPROTOTYPE errno))
+            (! (retry-errno? errno)
+               ;; On Linux, EAGAIN can happen if the receive queue
+               ;; of the listening socket is full.  I.e., the listening
+               ;; is being connected to more frequently than the corresponding
+               ;; proces can keep up with.  This situation should resolve
+               ;; itself automatically.
+               (or (eagain? errno)
+                   ;; This can happen if the listening socket is
+                   ;; not actually listening yet.  Give the
+                   ;; corresponding process a little more time.
+                   (connection-refused? errno)
+                   ;; See "connect-unix, will connect even if previous socket
+                   ;; is different type" test case.
+                   (wrong-type-socket-error? errno)
+                   ;; Give the process implementing the process some
+                   ;; time to set up directory structures, set the
+                   ;; permissions appropriately ...
+                   (path-resolution-error? errno)))
+            (! ok? (guard (c ((and (eq? (exception-kind c) 'system-error)
+                                   (retry-errno?
+                                    (car (list-ref (exception-args c) 3))))
+                              #f))
+                     (connect socket AF_UNIX unix-path))))
+           ;; Guile returns #f if SOCKET is non-blocking
+           ;; and the connection cannot be made immediately.
+           (if ok?
+               socket
+               (retry))))
+
+    (define* (connect/fibers config service-name handlers error-handler
+                            #:key (spawn spawn-fiber))
+      "Create a message queue that will be connected in the background
+to a GNUnet service @var{service-name}.  The message queue can be used
+before the message queue is connected; any send messages will be buffered
+until they can be sent.  Some fibers may be created with @var{spawn}
+(@code{spawn-fiber} by default).
+
+When the connection has been established, the error @code{connection:connected}
+(a symbol) is injected into the message queue."
+      ;; TODO closing message queues
+      (define socket/box (make-atomic-box #f))
+      (define rcvar (make-repeated-condition))
+      (define (interrupt! mq)
+       (trigger-condition! rcvar))
+      (define wait! (cut await-trigger! rcvar))
+      (define socket-connected? (make-condition))
+      (let ((mq (make-message-queue handlers error-handler interrupt!)))
+       (spawn (lambda ()
+                (wait socket-connected?)
+                (handle-input! mq (atomic-box-ref socket/box))))
+       (spawn (lambda ()
+                (wait socket-connected?)
+                (handle-output! mq (atomic-box-ref socket/box) wait!)))
+       (spawn (lambda ()
+                (atomic-box-set! socket/box
+                                 (connect-unix config service-name))
+                (inject-error! mq 'connection:connected)
+                (signal-condition! socket-connected?)))
+       mq))))
+
diff --git a/tests/mq-stream.scm b/tests/mq-stream.scm
index fa70a1d..1fce3f5 100644
--- a/tests/mq-stream.scm
+++ b/tests/mq-stream.scm
@@ -21,6 +21,7 @@
             (gnu gnunet mq handler)
             (gnu gnunet utils hat-let)
             (gnu gnunet utils bv-slice)
+            (gnu gnunet config db)
             (gnu gnunet concurrency repeated-condition)
             (fibers conditions)
             (fibers operations)
@@ -28,12 +29,15 @@
             (rnrs bytevectors)
             ((rnrs io ports) #:select (open-bytevector-input-port))
             ((rnrs base) #:select (assert))
+            (rnrs hashtables)
             (srfi srfi-26)
             (srfi srfi-43)
             (rnrs io ports)
             (ice-9 binary-ports)
             (ice-9 suspendable-ports)
-            (ice-9 control))
+            (ice-9 control)
+            (ice-9 match)
+            (ice-9 threads))
 
 (define (no-sender . _)
   (error "no sender!"))
@@ -231,4 +235,171 @@
         #:parallelism 1
         #:hz 0)))
 
+(define (call-with-temporary-directory proc)
+  (let ((file (mkdtemp (in-vicinity (or (getenv "TMPDIR") "/tmp")
+                                   "test-XXXXXX"))))
+    (with-exception-handler
+       (lambda (e)
+         (system* "rm" "-r" file)
+         (raise-exception e))
+      (lambda ()
+       (call-with-values
+           (lambda () (proc file))
+         (lambda the-values
+           (system* "rm" "-r" file)
+           (apply values the-values)))))))
+
+(define (make-config where)
+  (hash->configuration
+   (alist->hash-table
+    `((("service" . "UNIXPATH") . ,where)))))
+
+(define (call-with-socket-location proc)
+  (call-with-temporary-directory
+   (lambda (dir)
+     (define where (in-vicinity dir "sock.et"))
+     (define config  (make-config where))
+     (proc where config))))
+
+(define (connect/test config connected?)
+  (define (error-handler . error)
+    (match error
+      ;; XXX correct documentation: multiple end-of-files
+      ;; are perfectly possible.
+      (('input:regular-end-of-file) (values))
+      (('connection:connected) (signal-condition! connected?))))
+  (connect/fibers config "service" no-handlers error-handler
+                 #:spawn call-with-new-thread))
+
+(define (alist->hash-table alist)
+  (define h (make-hashtable (lambda (key) 0) equal?))
+  (define (insert! key+value)
+    (hashtable-set! h (car key+value) (cdr key+value)))
+  (for-each insert! alist)
+  h)
+
+(define (test-connection mq server-sock)
+  (send-message! mq (bv-slice/read-write #vu8(0 4 0 0)))
+  (let ((client (car (accept server-sock))))
+    (assert (equal? #vu8(0 4 0 0) (get-bytevector-n client 4)))
+    #t))
+
+(define (yield-many)
+  ;; Give the new threads some time to run before binding the socket.
+  ;; This allowed a bug in the use of 'connect' to be detected.
+  (let loop ((n (* 8 (+ 1 (length (all-threads))))))
+    (when (> n 0)
+      (yield)
+      (loop (- n 1)))))
+
+(test-assert "connect-unix, can connect when socket is already listening"
+  (call-with-socket-location
+   (lambda (where config)
+     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+     (define connected? (make-condition))
+     (bind listening-sock AF_UNIX where)
+     (listen listening-sock 1)
+     (define mq (connect/test config connected?))
+     (wait connected?)
+     (test-connection mq listening-sock))))
+
+;; Consider the case where a service starts, has bound its socket
+;; but is not yet listening, and a client connects.
+(test-assert "connect-unix, will connect when socket is listening"
+  (call-with-socket-location
+   (lambda (where config)
+     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+     (define connected? (make-condition))
+     (bind listening-sock AF_UNIX where)
+     (define mq (connect/test config connected?))
+     (yield-many)
+     (listen listening-sock 1)
+     (wait connected?)
+     (test-connection mq listening-sock))))
+
+;; Consider the case where a client starts before a service.
+(test-assert "connect-unix, will connect when socket is bound (and listening)"
+  (call-with-socket-location
+   (lambda (where config)
+     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+     (define connected? (make-condition))
+     (define mq (connect/test config connected?))
+     (yield-many)
+     (bind listening-sock AF_UNIX where)
+     (listen listening-sock 1)
+     (wait connected?)
+     (test-connection mq listening-sock))))
+
+;; Consider the case where a service starts and stops,
+;; a client connects and the service restarts.
+(test-assert
+    "connect-unix, will connect even if there's an old socket lying around"
+  (call-with-socket-location
+   (lambda (where config)
+     (let ((old-sock (socket PF_UNIX SOCK_STREAM 0)))
+       (bind old-sock AF_UNIX where)
+       (close-port old-sock))
+     (define connected? (make-condition))
+     (define mq (connect/test config connected?))
+     (yield-many)
+     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+     (yield-many)
+     ;; Delete the old socket, otherwise the 'bind' below results in ‘address 
alreay in use’
+     (delete-file where)
+     (yield-many)
+     (bind listening-sock AF_UNIX where)
+     (yield-many)
+     (listen listening-sock 1)
+     (wait connected?)
+     (test-connection mq listening-sock))))
+
+;; Consider the case where GNUnet version N uses stream sockets,
+;; GNUnet version M uses datagram sockets, the system initially
+;; uses GNUnet version N, a client for version M is started
+;; (initially failing to connect to the server), then the system
+;; switches to GNUnet version M.
+(test-assert
+    "connect-unix, will connect even if previous socket is different type"
+  (call-with-socket-location
+   (lambda (where config)
+     (define old-sock (socket PF_UNIX SOCK_DGRAM 0))
+     (bind old-sock AF_UNIX where)
+     ;; Datagram sockets don't support 'listen', so don't
+     ;; call 'listen' with 'old-sock'.
+     (define connected? (make-condition))
+     (define mq (connect/test config connected?))
+     (yield-many)
+     (close-port old-sock)
+     (delete-file where)
+     (define new-sock (socket PF_UNIX SOCK_STREAM 0))
+     (bind new-sock AF_UNIX where)
+     (listen new-sock 1)
+     (wait connected?)
+     (test-connection mq new-sock))))
+
+;; Consider a system that creates directories and the socket
+;; with world-unreadable, world-unexecutable permissions at
+;; first and makes the permissions more permissive later.
+(test-assert
+    "connect-unix, will connect even if permissions are temporarily wrong"
+  (call-with-temporary-directory
+   (lambda (tmpdir)
+     ;; Permissions on sockets can be unreliable on some systems,
+     ;; so modify the permissions of a directory instead.
+     (define subdir (in-vicinity tmpdir "dir"))
+     (mkdir subdir)
+     (define where (in-vicinity subdir "sock.et"))
+     (define listening-sock (socket PF_UNIX SOCK_STREAM 0))
+     (bind listening-sock AF_UNIX where)
+     (listen listening-sock 1)
+     (chmod subdir #o000) ; unreadable
+     (define connected? (make-condition))
+     (define mq (connect/test (make-config where) connected?))
+     (yield-many)
+     ;; make it readable again
+     ;; (and writable such that 'tmpdir' can be deleted).
+     (chmod subdir #o700)
+     (wait connected?)
+     (test-connection mq listening-sock))))
+
 (test-end "mq-stream")

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]