guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Ludovic Courtès
Date: Thu, 13 Jul 2023 19:45:57 -0400 (EDT)

branch: master
commit de8586080e04677cbe34c58f34715757ac61eea3
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Fri Jul 14 00:29:55 2023 +0200

    remote-worker: Fiberize.
    
    This turns 'cuirass remote-worker' into a fiberized program instead of a
    multi-process program (previously 'cuirass remote-worker' would create
    one child process per actual "worker").
    
    * src/cuirass/remote.scm (send-log): Pass SOCK_CLOEXEC | SOCK_NONBLOCK
    to 'socket'.  Remove 'select' call.
    * src/cuirass/scripts/remote-worker.scm (spawn-worker-ping): Replace
    'call-with-new-thread' by 'spawn-fiber'.
    (start-worker): Replace 'primitive-fork' by 'spawn-fiber'.
    (worker-management-thunk): New procedure.
    (%worker-pids, add-to-worker-pids!): Remove.
    (signal-handler): Adjust accordingly.
    (cuirass-remote-worker): Define 'management-channel'.  Spawn
    a fiber running 'worker-management-thunk'.  Create workers by sending
    message to MANAGEMENT-CHANNEL.
---
 src/cuirass/remote.scm                |  38 +++----
 src/cuirass/scripts/remote-worker.scm | 204 +++++++++++++++++-----------------
 2 files changed, 120 insertions(+), 122 deletions(-)

diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index 1926679..aa94927 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -39,7 +39,6 @@
   #:use-module (srfi srfi-26)
   #:use-module (ice-9 match)
   #:use-module (ice-9 rdelim)
-  #:use-module (ice-9 threads)
   #:use-module (ice-9 suspendable-ports)
   #:use-module (fibers)
   #:use-module (fibers scheduler)
@@ -337,30 +336,25 @@ PRIVATE-KEY to sign narinfos."
     (const #f)))
 
 (define* (send-log address port derivation log)
-  (let* ((sock (socket AF_INET SOCK_STREAM 0))
+  (let* ((sock (socket AF_INET
+                       (logior SOCK_STREAM SOCK_CLOEXEC SOCK_NONBLOCK) 0))
          (in-addr (inet-pton AF_INET address))
          (addr (make-socket-address AF_INET in-addr port)))
+    ;; TODO: Time out after a while.
     (connect sock addr)
-    ;; TODO: Fiberize together with 'remote-worker'.
-    (match (select (list sock) '() '() 10)
-      (((_) () ())
-       (match (read sock)
-         (('log-server ('version version ...))
-          (let ((header `(log
-                          (version 0)
-                          (derivation ,derivation))))
-            (write header sock)
-            (swallow-zlib-error
-             (call-with-gzip-output-port sock
-               (lambda (sock-compressed)
-                 (dump-port log sock-compressed))))
-            (close-port sock)))
-         (x
-          (log-error "invalid handshake ~s." x)
-          (close-port sock)
-          #f)))
-      ((() () ())                                 ;timeout
-       (log-error "timeout while sending log")
+    (match (read sock)
+      (('log-server ('version version ...))
+       (let ((header `(log
+                       (version 0)
+                       (derivation ,derivation))))
+         (write header sock)
+         (swallow-zlib-error
+          (call-with-gzip-output-port sock
+            (lambda (sock-compressed)
+              (dump-port log sock-compressed))))
+         (close-port sock)))
+      (x
+       (log-error "invalid handshake ~s." x)
        (close-port sock)
        #f))))
 
diff --git a/src/cuirass/scripts/remote-worker.scm 
b/src/cuirass/scripts/remote-worker.scm
index db4260f..70445ae 100644
--- a/src/cuirass/scripts/remote-worker.scm
+++ b/src/cuirass/scripts/remote-worker.scm
@@ -18,6 +18,8 @@
 ;;; along with GNU Guix.  If not, see <http://www.gnu.org/licenses/>.
 
 (define-module (cuirass scripts remote-worker)
+  #:use-module (fibers)
+  #:use-module (fibers channels)
   #:use-module (cuirass base)
   #:use-module (cuirass logging)
   #:use-module (cuirass remote)
@@ -55,7 +57,7 @@
   #:use-module (srfi srfi-37)
   #:use-module (ice-9 atomic)
   #:use-module (ice-9 match)
-  #:use-module (ice-9 threads)
+  #:use-module ((ice-9 threads) #:select (current-processor-count))
   #:export (cuirass-remote-worker))
 
 ;; Indicate if the process has to be stopped.
@@ -278,7 +280,7 @@ command.  REPLY is a procedure that can be used to reply to 
this server."
     (send-message socket
                   (worker-ping (worker->sexp worker))))
 
-  (call-with-new-thread
+  (spawn-fiber
    (lambda ()
      (let* ((socket (zmq-dealer-socket))
             (address (server-address server))
@@ -349,49 +351,62 @@ and executing them.  The worker can reply on the same 
socket."
           (address worker-address)
           (publish-url url))))))
 
-  (match (primitive-fork)
-    (0
-     (dynamic-wind
-       (const #t)
-       (lambda ()
-         (set-thread-name (worker-name wrk))
-         (let* ((socket (zmq-dealer-socket))
-                (address (server-address serv))
-                (port (server-port serv))
-                (endpoint (zmq-backend-endpoint address port)))
-           (zmq-connect socket endpoint)
-           (log-info (G_ "worker ~a (PID ~a) connected to ~a")
-                     (worker-name wrk) (getpid) endpoint)
-           (let* ((srv-info (read-server-info socket))
-                  (server (server-info->server srv-info serv))
-                  (worker (server-info->worker srv-info wrk)))
-             (log-info (G_ "server publish URL: ~a; server log port: ~a")
-                       (server-publish-url server)
-                       (server-log-port server))
-             (ready socket worker)
-             (spawn-worker-ping worker server)
-             (let loop ()
-               (if (low-disk-space?)
-                   (log-info (G_ "warning: low disk space, doing nothing"))
-                   (begin
-                     (log-info (G_ "~a: request work.") (worker-name wrk))
-                     (request-work socket worker)
-                     (match (receive-message socket)
-                       ((? unspecified?)          ;server reconnect
-                        (log-info (G_ "~a: received a bootstrap message.")
-                                  (worker-name wrk)))
-                       (command
-                        (log-debug (G_ "~a: received command: ~s")
-                                   (worker-name wrk) command)
-                        (run-command command server
-                                     #:reply (reply socket)
-                                     #:worker worker)))))
-
-               (sleep (%request-period))
-               (loop)))))
-       (lambda ()
-         (primitive-exit 1))))
-    (pid pid)))
+  (spawn-fiber
+   (lambda ()
+     (let* ((socket (zmq-dealer-socket))
+            (address (server-address serv))
+            (port (server-port serv))
+            (endpoint (zmq-backend-endpoint address port)))
+       (zmq-connect socket endpoint)
+       (log-info (G_ "worker ~a (PID ~a) connected to ~a")
+                 (worker-name wrk) (getpid) endpoint)
+       (let* ((srv-info (read-server-info socket))
+              (server (server-info->server srv-info serv))
+              (worker (server-info->worker srv-info wrk)))
+         (log-info (G_ "server publish URL: ~a; server log port: ~a")
+                   (server-publish-url server)
+                   (server-log-port server))
+         (ready socket worker)
+         (spawn-worker-ping worker server)
+         (let loop ()
+           (if (low-disk-space?)
+               (log-info (G_ "warning: low disk space, doing nothing"))
+               (begin
+                 (log-info (G_ "~a: request work.") (worker-name wrk))
+                 (request-work socket worker)
+                 (match (receive-message socket)
+                   ((? unspecified?)              ;server reconnect
+                    (log-info (G_ "~a: received a bootstrap message.")
+                              (worker-name wrk)))
+                   (command
+                    (log-debug (G_ "~a: received command: ~s")
+                               (worker-name wrk) command)
+                    (run-command command server
+                                 #:reply (reply socket)
+                                 #:worker worker)))))
+
+           (sleep (%request-period))
+           (loop)))))))
+
+(define (worker-management-thunk channel systems)
+  "Return a thunk that reads from CHANNEL requests to start new workers for
+SYSTEMS."
+  (lambda ()
+    (let loop ()
+      (match (get-message channel)
+        (`(start-workers ,count ,server ,local-address)
+         (log-info "starting ~a workers for server at ~a"
+                   count (server-address server))
+         (let spawn ((i 0))
+           (when (< i count)
+             (start-worker (worker (name (generate-worker-name))
+                                   (address local-address)
+                                   (machine (gethostname))
+                                   (publish-url (local-publish-url 
local-address))
+                                   (systems systems))
+                           server)
+             (spawn (+ i 1))))))
+      (loop))))
 
 
 ;;;
@@ -402,31 +417,20 @@ and executing them.  The worker can reply on the same 
socket."
 (define %publish-pid
   (make-atomic-box #f))
 
-(define %worker-pids
-  (make-atomic-box '()))
-
-(define (add-to-worker-pids! pid)
-  (let ((pids (atomic-box-ref %worker-pids)))
-    (atomic-box-set! %worker-pids (cons pid pids))))
-
 (define (signal-handler)
   "Catch SIGINT to stop the Avahi event loop and the publish process before
 exiting."
   (sigaction SIGINT
     (lambda (signum)
-      (let ((publish-pid (atomic-box-ref %publish-pid))
-            (worker-pids (atomic-box-ref %worker-pids)))
-        (atomic-box-set! %stop-process? #t)
-
-        (for-each (lambda (pid)
-                    (when pid
-                      (log-info (G_ "terminating worker sub-process ~a")
-                                pid)
-                      (kill pid SIGKILL)
-                      (waitpid pid)))
-                  (cons publish-pid worker-pids))
+      (let ((pid (atomic-box-ref %publish-pid)))
+        (when pid
+          (log-info (G_ "terminating worker sub-process ~a")
+                    pid)
+          (kill pid SIGKILL)
+          (waitpid pid))
 
-        (exit 1)))))
+        (atomic-box-set! %stop-process? #t)
+        (primitive-exit 1)))))
 
 (define (cuirass-remote-worker args)
   (signal-handler)
@@ -464,27 +468,8 @@ exiting."
                          #:public-key public-key
                          #:private-key private-key))
 
-        (if server-address
-            (begin
-              (log-info (N_ "creating ~a worker for build server at ~a"
-                            "creating ~a workers for build server at ~a"
-                            workers)
-                        workers server-address)
-              (for-each
-               (lambda (n)
-                 (let* ((worker (worker
-                                 (name (generate-worker-name))
-                                 (machine (gethostname))
-                                 (systems systems)))
-                        (addr (string-split server-address #\:))
-                        (server (match addr
-                                  ((address port)
-                                   (server
-                                    (address address)
-                                    (port (string->number port)))))))
-                   (add-to-worker-pids!
-                    (start-worker worker server))))
-               (iota workers)))
+        (let ((management-channel (make-channel)))
+          (unless server-address
             (avahi-browse-service-thread
              (lambda (action service)
                (log-info (N_ "discovered build server at ~a, creating ~a 
worker"
@@ -494,24 +479,43 @@ exiting."
                          workers)
                (case action
                  ((new-service)
-                  (for-each
-                   (lambda (n)
-                     (let* ((address (avahi-service-local-address service))
-                            (publish-url (local-publish-url address)))
-                       (add-to-worker-pids!
-                        (start-worker (worker
-                                       (name (generate-worker-name))
-                                       (address address)
-                                       (machine (gethostname))
-                                       (publish-url publish-url)
-                                       (systems systems))
-                                      (avahi-service->server service)))))
-                   (iota workers))
+                  (put-message management-channel
+                               `(start-workers ,workers
+                                               ,(avahi-service->server service)
+                                               ,(avahi-service-local-address
+                                                 service)))
                   (atomic-box-set! %stop-process? #t))))
              #:ignore-local? #f
              #:types (list remote-server-service-type)
              #:stop-loop? (lambda ()
                             (atomic-box-ref %stop-process?))))
 
-        (while #t
-          (sleep 1))))))
+          (run-fibers
+           (lambda ()
+             ;; Spawn the fiber that'll actually create workers as it receives
+             ;; requests on MANAGEMENT-CHANNEL.
+             (spawn-fiber (worker-management-thunk management-channel systems))
+
+             (when server-address
+               (log-info (N_ "creating ~a worker for build server at ~a"
+                             "creating ~a workers for build server at ~a"
+                             workers)
+                         workers server-address)
+               (let* ((addr (string-split server-address #\:))
+                      (server (match addr
+                                ((address port)
+                                 (server
+                                  (address address)
+                                  (port (string->number port)))))))
+                 (put-message management-channel
+                              `(start-workers ,workers
+                                              ,server
+                                              ,(gethostname)))))
+
+             ;; XXX: Somehow #:drain? #t is not enough.
+             (while #t
+               (sleep 1800)
+               (log-info "worker's alive")))
+           #:hz 0
+           #:parallelism (min (current-processor-count) 4)
+           #:drain? #t))))))



reply via email to

[Prev in Thread] Current Thread [Next in Thread]