guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Ludovic Courtès
Date: Sun, 9 Jul 2023 18:02:45 -0400 (EDT)

branch: master
commit 7079250d1bed692bb5b1128e3b3e7c04400749dc
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Sun Jul 9 23:52:45 2023 +0200

    remote-server: Fiberize.
    
    This turns 'cuirass remote-server' into a fiberized program, as opposed
    to a heavy-handed multi-threaded program.
    
    * src/cuirass/remote.scm (receive-logs): Rewrite in Fibers style,
    assuming non-blocking I/O calls and using 'spawn-fiber' instead of
    'call-with-new-thread'.
    (zmq-socket->port): New procedure.
    (receive-message)[wait]: New procedure.
    Call it upfront.
    * src/cuirass/notification.scm (start-notification-thread): Rename to...
    (spawn-notification-fiber): ... this.  Use 'spawn-fiber' instead of
    'call-with-new-thread'.  Use (@ (fibers) sleep) instead of 'sleep'.
    * src/cuirass/scripts/remote-server.scm 
(ensure-non-blocking-store-connection):
    New procedure.
    (add-to-store): Use it.
    (start-fetch-worker): Use 'spawn-fiber' instead of
    'call-with-new-thread'.  Remove 'set-thread-name' call.
    (start-periodic-updates-thread): Rename to...
    (spawn-periodic-updates-fiber): ... this.  Use 'spawn-fiber' instead of
    'call-with-new-thread'.  Remove 'set-thread-name' call.
    (zmq-start-proxy): Remove 'socket-ready?', '%loop-timeout', and
    'poll-items'.  Use 'spawn-fiber' for the worker list update.  Remove use
    of 'zmq-poll*' in the main loop.
    (terminate-helper-processes): New procedure.
    (signal-handler): Use it instead of inline code.  Call 'primitive-exit'
    rather than 'exit'.
    (cuirass-remote-server): Wrap body in 'run-fibers'.
    * tests/database.scm ("database")["db-init"]: Remove
    'start-notification-thread' call.
    ["mail notification", "mail notification, broken job"]: Add call to
    'spawn-notification-fiber'.
---
 src/cuirass/notification.scm          |  13 +--
 src/cuirass/remote.scm                | 104 +++++++++++++++--------
 src/cuirass/scripts/remote-server.scm | 155 ++++++++++++++++++----------------
 tests/database.scm                    |   3 +-
 4 files changed, 165 insertions(+), 110 deletions(-)

diff --git a/src/cuirass/notification.scm b/src/cuirass/notification.scm
index f3077fb..5249cb9 100644
--- a/src/cuirass/notification.scm
+++ b/src/cuirass/notification.scm
@@ -1,5 +1,6 @@
 ;;; notification.scm -- Send build notifications.
 ;;; Copyright © 2021 Mathieu Othacehe <othacehe@gnu.org>
+;;; Copyright © 2023 Ludovic Courtès <ludo@gnu.org>
 ;;;
 ;;; This file is part of Cuirass.
 ;;;
@@ -27,6 +28,7 @@
   #:use-module (guix records)
   #:use-module (ice-9 match)
   #:use-module (ice-9 threads)
+  #:autoload   (fibers) (spawn-fiber)
   #:export (email
             email?
             email-from
@@ -39,7 +41,7 @@
             notification->sexp
             sexp->notification
 
-            start-notification-thread))
+            spawn-notification-fiber))
 
 
 ;;;
@@ -152,19 +154,20 @@ the detailed information about this build here: ~a."
         (log-error "Failed to send the mastodon notification: ~a."
                    args)))))
 
-(define (start-notification-thread)
+(define (spawn-notification-fiber)
   "Start a thread sending build notifications."
-  (call-with-new-thread
+  (spawn-fiber
    (lambda ()
-     (set-thread-name "notification")
      (let loop ()
        (match (db-pop-notification)
          ((notif . build)
+          (log-debug "notification ~s for build ~s"
+                     notif build)
           (cond
            ((email? notif)
             (send-email* notif build))
            ((mastodon? notif)
             (send-mastodon build))))
          (#f #f))
-       (sleep 1)
+       ((@ (fibers) sleep) 1)
        (loop)))))
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index e73392f..6fedca7 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -40,6 +40,9 @@
   #:use-module (ice-9 match)
   #:use-module (ice-9 rdelim)
   #:use-module (ice-9 threads)
+  #:use-module (ice-9 suspendable-ports)
+  #:use-module (fibers)
+  #:use-module (fibers scheduler)
   #:export (worker
             worker?
             worker-name
@@ -278,6 +281,7 @@ PRIVATE-KEY to sign narinfos."
     (match (false-if-exception (read port))
       (('log ('version 0)
              ('derivation derivation))
+       (log-debug (G_ "reading build log for ~a") derivation)
        (let ((file (log-path cache derivation)))
          (call-with-output-file file
            (lambda (output)
@@ -286,43 +290,44 @@ PRIVATE-KEY to sign narinfos."
        (log-error "invalid log received.")
        #f)))
 
-  (define (wait-for-client port proc)
-    (let ((sock (socket AF_INET SOCK_STREAM 0)))
+  (define (wait-for-client port)
+    (let ((sock (socket AF_INET
+                        (logior SOCK_STREAM SOCK_NONBLOCK SOCK_CLOEXEC)
+                        0)))
       (setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
       (bind sock AF_INET INADDR_ANY port)
       (listen sock 1024)
-      (while #t
-        (match (select (list sock) '() '() 60)
-          (((_) () ())
-           (match (accept sock)
-             ((client . address)
-              (catch 'system-error
-                (lambda ()
-                  (write '(log-server (version 0)) client)
-                  (force-output client)
-                  (proc client))
-                (lambda args
-                  (let ((errno (system-error-errno args)))
-                    (when (memv errno (list EPIPE ECONNRESET ECONNABORTED))
-                      (log-error "~a when replying to ~a."
-                                 (strerror errno) (fileno client)))))))))
-          ((() () ())
-           #f)))))
-
-  (define (client-handler client)
-    (call-with-new-thread
-     (lambda ()
-       (set-thread-name
-        (string-append "log-server-"
-                       (number->string (port->fdes client))))
-       (and=> client read-log)
-       (when client
-         (close-port client)))))
-
-  (call-with-new-thread
+      (log-info (G_ "listening for build logs on port ~a") port)
+      (let loop ()
+        (match (accept sock (logior SOCK_NONBLOCK SOCK_CLOEXEC))
+          ((client . address)
+           (spawn-fiber
+            (lambda ()
+              (handle-client client address)))))
+        (loop))))
+
+  (define (handle-client client address)
+    (catch 'system-error
+      (lambda ()
+        (log-debug "preparing to receive build log from ~a"
+                   (inet-ntop (sockaddr:fam address)
+                              (sockaddr:addr address)))
+        (write '(log-server (version 0)) client)
+        (force-output client)
+        (read-log client)
+        (close-port client))
+      (lambda args
+        (close-port client)
+        (let ((errno (system-error-errno args)))
+          (when (memv errno (list EPIPE ECONNRESET ECONNABORTED))
+            (log-error "~a when replying to ~a."
+                       (strerror errno)
+                       (inet-ntop (sockaddr:fam address)
+                                  (sockaddr:addr address))))))))
+
+  (spawn-fiber
    (lambda ()
-     (set-thread-name "log-server")
-     (wait-for-client port client-handler))))
+     (wait-for-client port))))
 
 (define-syntax-rule (swallow-zlib-error exp ...)
   "Swallow 'zlib-error' exceptions raised by EXP..."
@@ -336,6 +341,7 @@ PRIVATE-KEY to sign narinfos."
          (in-addr (inet-pton AF_INET address))
          (addr (make-socket-address AF_INET in-addr port)))
     (connect sock addr)
+    ;; TODO: Fiberize together with 'remote-worker'.
     (match (select (list sock) '() '() 10)
       (((_) () ())
        (match (read sock)
@@ -400,6 +406,24 @@ the message."
                                        (cons recipient payload)
                                        payload))))
 
+(define zmq-socket->port
+  (let ((table (make-weak-key-hash-table)))
+    (lambda (socket)
+      "Return a port wrapping SOCKET, a file descriptor."
+      (let ((fd (zmq-get-socket-option socket ZMQ_FD)))
+        (match (hashq-ref table socket)
+          (#f
+           (let ((port (fdopen fd "r+0")))
+             (set-port-revealed! port 1)          ;let zmq close it
+             (hashq-set! table socket port)
+             port))
+          (port
+           (if (= fd (fileno port))               ;better be safe
+               port
+               (begin
+                 (hashq-remove! table socket)
+                 (zmq-socket->port socket)))))))))
+
 (define* (receive-message socket #:key router?)
   "Read an sexp from SOCKET, a ZMQ socket, and return it.  Return the
 unspecified value when reading a message without payload.
@@ -407,6 +431,20 @@ unspecified value when reading a message without payload.
 When ROUTER? is true, assume messages received start with a routing
 prefix (the identity of the peer, as a bytevector), and return three values:
 the payload, the peer's identity (a bytevector), and the peer address."
+  (define (wait)
+    ;; Events are edge-triggered so before waiting, check whether there are
+    ;; messages available.  See the discussion at
+    ;; <https://lists.zeromq.org/pipermail/zeromq-dev/2016-May/030349.html>.
+    (when (zero? (logand ZMQ_POLLIN
+                         (zmq-get-socket-option socket ZMQ_EVENTS)))
+      ((current-read-waiter) (zmq-socket->port socket))
+      (when (zero? (zmq-get-socket-option socket ZMQ_EVENTS))
+        ;; Per <http://api.zeromq.org/master:zmq-getsockopt>, "applications
+        ;; should simply ignore this case and restart their polling
+        ;; operation/event loop."
+        (wait))))
+
+  (wait)
   (if router?
       (match (zmq-message-receive* socket)
         ((sender (= zmq-message-size 0) data)
diff --git a/src/cuirass/scripts/remote-server.scm 
b/src/cuirass/scripts/remote-server.scm
index fe2b64c..ca008fc 100644
--- a/src/cuirass/scripts/remote-server.scm
+++ b/src/cuirass/scripts/remote-server.scm
@@ -42,6 +42,7 @@
                 #:select (current-build-output-port
                           ensure-path
                           store-protocol-error?
+                          store-connection-socket
                           with-store))
   #:use-module (guix ui)
   #:use-module (guix utils)
@@ -65,6 +66,7 @@
   #:use-module (ice-9 rdelim)
   #:use-module (ice-9 regex)
   #:use-module (ice-9 threads)
+  #:use-module (fibers)
   #:export (cuirass-remote-server))
 
 ;; Indicate if the process has to be stopped.
@@ -337,12 +339,24 @@ be used to reply to the worker."
      (lambda (tmp-file port)
        (url-fetch* narinfo-url tmp-file)))))
 
+(define (ensure-non-blocking-store-connection store)
+  "Mark the file descriptor that backs STORE, a <store-connection>, as
+O_NONBLOCK."
+  (match (store-connection-socket store)
+    ((? file-port? port)
+     (let* ((fd (fileno port))
+            (flags (fcntl fd F_GETFL)))
+       (when (zero? (logand flags O_NONBLOCK))
+         (fcntl fd F_SETFL (logior O_NONBLOCK flags)))))
+    (_ #f)))
+
 (define (add-to-store drv outputs url)
   "Add the OUTPUTS that are available from the substitute server at URL to the
 store.  Register GC roots for the matching DRV and trigger a substitute baking
 at URL."
   (parameterize ((current-build-output-port (%make-void-port "w")))
     (with-store store
+      (ensure-non-blocking-store-connection store)
       (set-build-options* store (list url))
       (for-each
        (lambda (output)
@@ -404,14 +418,13 @@ directory."
      (db-update-build-status! drv (build-status failed)))))
 
 (define (start-fetch-worker name)
-  "Start a fetch worker thread with the given NAME.  This worker takes care of
-downloading build outputs.  It communicates with the remote server using a ZMQ
-socket."
-  (call-with-new-thread
+  "Start a fetch worker fiber, which takes care of downloading build outputs.
+It communicates with the remote worker using a ZMQ socket."
+  (spawn-fiber
    (lambda ()
      (use-modules (cuirass parameters)) ;XXX: Needed for mu-debug variable.
-     (set-thread-name name)
      (let ((socket (zmq-fetch-worker-socket)))
+       (log-debug "starting fetch worker ~s" name)
        (let loop ()
          (run-fetch (receive-message socket))
          (atomic-box-fetch-and-dec! %fetch-queue-size)
@@ -422,18 +435,17 @@ socket."
 ;;; Periodic updates.
 ;;;
 
-(define (start-periodic-updates-thread)
-  "Start a thread running periodic update queries."
-  (call-with-new-thread
+(define (spawn-periodic-updates-fiber)
+  "Start a fiber running periodic update queries."
+  (spawn-fiber
    (lambda ()
-     (set-thread-name "periodic-updates")
      (let loop ()
        (let ((resumable (db-update-resumable-builds!))
              (failed (db-update-failed-builds!)))
          (log-info "period update: ~a resumable, ~a failed builds."
-                      resumable failed)
+                   resumable failed)
          (log-info "period update: ~a items in the fetch queue."
-                      (atomic-box-ref %fetch-queue-size)))
+                   (atomic-box-ref %fetch-queue-size)))
        (sleep 30)
        (loop)))))
 
@@ -453,21 +465,10 @@ all network interfaces."
 (define (zmq-start-proxy backend-port)
   "This procedure starts a proxy between client connections from the IPC
 frontend to the workers connected through the TCP backend."
-  (define (socket-ready? items socket)
-    (find (lambda (item)
-            (eq? (poll-item-socket item) socket))
-          items))
-
-  ;; The poll loop below must not be blocked.  Print a warning message if a
-  ;; loop iteration takes more than %LOOP-TIMEOUT seconds to complete.
-  (define %loop-timeout 5)
-
   (let* ((build-socket
           (zmq-create-socket %zmq-context ZMQ_ROUTER))
          (fetch-socket
-          (zmq-create-socket %zmq-context ZMQ_PUSH))
-         (poll-items (list
-                      (poll-item build-socket ZMQ_POLLIN))))
+          (zmq-create-socket %zmq-context ZMQ_PUSH)))
 
     ;; Send bootstrap messages on worker connection to wake up the workers
     ;; that were hanging waiting for request-work responses.
@@ -476,28 +477,29 @@ frontend to the workers connected through the TCP 
backend."
     (zmq-bind-socket build-socket (zmq-backend-endpoint backend-port))
     (zmq-bind-socket fetch-socket (zmq-fetch-workers-endpoint))
 
+    (spawn-fiber
+     (lambda ()
+       (let loop ()
+         (sleep (quotient (%worker-timeout) 2))
+         (log-debug (G_ "updating list of live workers"))
+         (db-remove-unresponsive-workers (%worker-timeout))
+         (loop))))
+
     ;; Do not use the built-in zmq-proxy as we want to edit the envelope of
     ;; frontend messages before forwarding them to the backend.
     (let loop ()
-      (let* ((items (zmq-poll* poll-items 1000))
-             (start-time (current-time)))
-        (when (socket-ready? items build-socket)
-          (let* ((command sender sender-address
-                          (receive-message build-socket #:router? #t))
-                 (reply-worker (lambda (message)
-                                 (send-message build-socket message
-                                               #:recipient sender))))
-            (if (need-fetching? command)
-                (begin
-                  (atomic-box-fetch-and-inc! %fetch-queue-size)
-                  (send-message fetch-socket command))
-                (read-worker-exp command
-                                 #:peer-address sender-address
-                                 #:reply-worker reply-worker))))
-        (db-remove-unresponsive-workers (%worker-timeout))
-        (let ((delta (- (current-time) start-time)))
-          (when (> delta %loop-timeout)
-            (log-warning "Poll loop busy during ~a seconds." delta)))
+      (let* ((command sender sender-address
+                      (receive-message build-socket #:router? #t))
+             (reply-worker (lambda (message)
+                             (send-message build-socket message
+                                           #:recipient sender))))
+        (if (need-fetching? command)
+            (begin
+              (atomic-box-fetch-and-inc! %fetch-queue-size)
+              (send-message fetch-socket command))
+            (read-worker-exp command
+                             #:peer-address sender-address
+                             #:reply-worker reply-worker))
         (loop)))))
 
 
@@ -513,24 +515,25 @@ frontend to the workers connected through the TCP 
backend."
 (define %avahi-thread
   (make-atomic-box #f))
 
+(define (terminate-helper-processes)
+  (let ((publish-pid (atomic-box-ref %publish-pid))
+        (avahi-thread (atomic-box-ref %avahi-thread)))
+    (atomic-box-set! %stop-process? #t)
+
+    (when publish-pid
+      (kill publish-pid SIGHUP)
+      (waitpid publish-pid))
+
+    (when avahi-thread
+      (join-thread avahi-thread))))
+
 (define (signal-handler)
   "Catch SIGINT to stop the Avahi event loop and the publish process before
 exiting."
   (sigaction SIGINT
     (lambda (signum)
-      (let ((publish-pid (atomic-box-ref %publish-pid))
-            (avahi-thread (atomic-box-ref %avahi-thread)))
-        (atomic-box-set! %stop-process? #t)
-
-        (and publish-pid
-             (begin
-               (kill publish-pid SIGHUP)
-               (waitpid publish-pid)))
-
-        (and avahi-thread
-             (join-thread avahi-thread))
-
-        (exit 1)))))
+      (terminate-helper-processes)
+      (primitive-exit 1))))
 
 (define (gather-user-privileges user)
   "switch to the identity of user, a user name."
@@ -632,19 +635,29 @@ exiting."
           #:txt `(,(string-append "log-port="
                                   (number->string log-port))
                   ,@(if publish-port
-                      (list (string-append "publish-port="
-                                           (number->string publish-port)))
-                      '()))))
-
-        (receive-logs log-port (%cache-directory))
-
-        (with-database
-            (start-notification-thread)
-            (start-periodic-updates-thread)
-            (for-each (lambda (number)
-                        (start-fetch-worker
-                         (string-append "fetch-worker-"
-                                        (number->string number))))
-                      (iota (%fetch-workers)))
-
-            (zmq-start-proxy backend-port))))))
+                        (list (string-append "publish-port="
+                                             (number->string publish-port)))
+                        '()))))
+
+        (run-fibers
+         (lambda ()
+           (with-database
+             (receive-logs log-port (%cache-directory))
+             (spawn-notification-fiber)
+             (spawn-periodic-updates-fiber)
+             (for-each (lambda (number)
+                         (start-fetch-worker
+                          (string-append "fetch-worker-"
+                                         (number->string number))))
+                       (iota (%fetch-workers)))
+
+             (catch 'zmq-error
+               (lambda ()
+                 (zmq-start-proxy backend-port))
+               (lambda (key errno message . _)
+                 (log-error (G_ "failed to start worker/database proxy: ~a")
+                            message)
+                 (terminate-helper-processes)
+                 (primitive-exit 1)))))
+         #:hz 0
+         #:parallelism 1)))))
diff --git a/tests/database.scm b/tests/database.scm
index f8c9a38..69c6053 100644
--- a/tests/database.scm
+++ b/tests/database.scm
@@ -125,7 +125,6 @@
   (test-assert "db-init"
     (begin
       (test-init-db!)
-      (start-notification-thread)
       #t))
 
   (test-equal "db-add-or-update-specification"
@@ -666,6 +665,7 @@ timestamp, checkouttime, evaltime) VALUES ('guix', 0, 0, 0, 
0);")
 
   (test-assert "mail notification"
     (with-fibers
+      (spawn-notification-fiber)
       (retry
        (lambda ()
          (and (file-exists? tmp-mail)
@@ -684,6 +684,7 @@ timestamp, checkouttime, evaltime) VALUES ('guix', 0, 0, 0, 
0);")
 
   (test-assert "mail notification, broken job"
     (with-fibers
+      (spawn-notification-fiber)
       (retry
        (lambda ()
          (and (file-exists? tmp-mail)



reply via email to

[Prev in Thread] Current Thread [Next in Thread]