[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Ludovic Courtès |
Date: |
Thu, 13 Jul 2023 19:45:57 -0400 (EDT) |
branch: master
commit de8586080e04677cbe34c58f34715757ac61eea3
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Fri Jul 14 00:29:55 2023 +0200
remote-worker: Fiberize.
This turns 'cuirass remote-worker' into a fiberized program instead of a
multi-process program (previously 'cuirass remote-worker' would create
one child process per actual "worker").
* src/cuirass/remote.scm (send-log): Pass SOCK_CLOEXEC | SOCK_NONBLOCK
to 'socket'. Remove 'select' call.
* src/cuirass/scripts/remote-worker.scm (spawn-worker-ping): Replace
'call-with-new-thread' by 'spawn-fiber'.
(start-worker): Replace 'primitive-fork' by 'spawn-fiber'.
(worker-management-thunk): New procedure.
(%worker-pids, add-to-worker-pids!): Remove.
(signal-handler): Adjust accordingly.
(cuirass-remote-worker): Define 'management-channel'. Spawn
a fiber running 'worker-management-thunk'. Create workers by sending
message to MANAGEMENT-CHANNEL.
---
src/cuirass/remote.scm | 38 +++----
src/cuirass/scripts/remote-worker.scm | 204 +++++++++++++++++-----------------
2 files changed, 120 insertions(+), 122 deletions(-)
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index 1926679..aa94927 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -39,7 +39,6 @@
#:use-module (srfi srfi-26)
#:use-module (ice-9 match)
#:use-module (ice-9 rdelim)
- #:use-module (ice-9 threads)
#:use-module (ice-9 suspendable-ports)
#:use-module (fibers)
#:use-module (fibers scheduler)
@@ -337,30 +336,25 @@ PRIVATE-KEY to sign narinfos."
(const #f)))
(define* (send-log address port derivation log)
- (let* ((sock (socket AF_INET SOCK_STREAM 0))
+ (let* ((sock (socket AF_INET
+ (logior SOCK_STREAM SOCK_CLOEXEC SOCK_NONBLOCK) 0))
(in-addr (inet-pton AF_INET address))
(addr (make-socket-address AF_INET in-addr port)))
+ ;; TODO: Time out after a while.
(connect sock addr)
- ;; TODO: Fiberize together with 'remote-worker'.
- (match (select (list sock) '() '() 10)
- (((_) () ())
- (match (read sock)
- (('log-server ('version version ...))
- (let ((header `(log
- (version 0)
- (derivation ,derivation))))
- (write header sock)
- (swallow-zlib-error
- (call-with-gzip-output-port sock
- (lambda (sock-compressed)
- (dump-port log sock-compressed))))
- (close-port sock)))
- (x
- (log-error "invalid handshake ~s." x)
- (close-port sock)
- #f)))
- ((() () ()) ;timeout
- (log-error "timeout while sending log")
+ (match (read sock)
+ (('log-server ('version version ...))
+ (let ((header `(log
+ (version 0)
+ (derivation ,derivation))))
+ (write header sock)
+ (swallow-zlib-error
+ (call-with-gzip-output-port sock
+ (lambda (sock-compressed)
+ (dump-port log sock-compressed))))
+ (close-port sock)))
+ (x
+ (log-error "invalid handshake ~s." x)
(close-port sock)
#f))))
diff --git a/src/cuirass/scripts/remote-worker.scm
b/src/cuirass/scripts/remote-worker.scm
index db4260f..70445ae 100644
--- a/src/cuirass/scripts/remote-worker.scm
+++ b/src/cuirass/scripts/remote-worker.scm
@@ -18,6 +18,8 @@
;;; along with GNU Guix. If not, see <http://www.gnu.org/licenses/>.
(define-module (cuirass scripts remote-worker)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
#:use-module (cuirass base)
#:use-module (cuirass logging)
#:use-module (cuirass remote)
@@ -55,7 +57,7 @@
#:use-module (srfi srfi-37)
#:use-module (ice-9 atomic)
#:use-module (ice-9 match)
- #:use-module (ice-9 threads)
+ #:use-module ((ice-9 threads) #:select (current-processor-count))
#:export (cuirass-remote-worker))
;; Indicate if the process has to be stopped.
@@ -278,7 +280,7 @@ command. REPLY is a procedure that can be used to reply to
this server."
(send-message socket
(worker-ping (worker->sexp worker))))
- (call-with-new-thread
+ (spawn-fiber
(lambda ()
(let* ((socket (zmq-dealer-socket))
(address (server-address server))
@@ -349,49 +351,62 @@ and executing them. The worker can reply on the same
socket."
(address worker-address)
(publish-url url))))))
- (match (primitive-fork)
- (0
- (dynamic-wind
- (const #t)
- (lambda ()
- (set-thread-name (worker-name wrk))
- (let* ((socket (zmq-dealer-socket))
- (address (server-address serv))
- (port (server-port serv))
- (endpoint (zmq-backend-endpoint address port)))
- (zmq-connect socket endpoint)
- (log-info (G_ "worker ~a (PID ~a) connected to ~a")
- (worker-name wrk) (getpid) endpoint)
- (let* ((srv-info (read-server-info socket))
- (server (server-info->server srv-info serv))
- (worker (server-info->worker srv-info wrk)))
- (log-info (G_ "server publish URL: ~a; server log port: ~a")
- (server-publish-url server)
- (server-log-port server))
- (ready socket worker)
- (spawn-worker-ping worker server)
- (let loop ()
- (if (low-disk-space?)
- (log-info (G_ "warning: low disk space, doing nothing"))
- (begin
- (log-info (G_ "~a: request work.") (worker-name wrk))
- (request-work socket worker)
- (match (receive-message socket)
- ((? unspecified?) ;server reconnect
- (log-info (G_ "~a: received a bootstrap message.")
- (worker-name wrk)))
- (command
- (log-debug (G_ "~a: received command: ~s")
- (worker-name wrk) command)
- (run-command command server
- #:reply (reply socket)
- #:worker worker)))))
-
- (sleep (%request-period))
- (loop)))))
- (lambda ()
- (primitive-exit 1))))
- (pid pid)))
+ (spawn-fiber
+ (lambda ()
+ (let* ((socket (zmq-dealer-socket))
+ (address (server-address serv))
+ (port (server-port serv))
+ (endpoint (zmq-backend-endpoint address port)))
+ (zmq-connect socket endpoint)
+ (log-info (G_ "worker ~a (PID ~a) connected to ~a")
+ (worker-name wrk) (getpid) endpoint)
+ (let* ((srv-info (read-server-info socket))
+ (server (server-info->server srv-info serv))
+ (worker (server-info->worker srv-info wrk)))
+ (log-info (G_ "server publish URL: ~a; server log port: ~a")
+ (server-publish-url server)
+ (server-log-port server))
+ (ready socket worker)
+ (spawn-worker-ping worker server)
+ (let loop ()
+ (if (low-disk-space?)
+ (log-info (G_ "warning: low disk space, doing nothing"))
+ (begin
+ (log-info (G_ "~a: request work.") (worker-name wrk))
+ (request-work socket worker)
+ (match (receive-message socket)
+ ((? unspecified?) ;server reconnect
+ (log-info (G_ "~a: received a bootstrap message.")
+ (worker-name wrk)))
+ (command
+ (log-debug (G_ "~a: received command: ~s")
+ (worker-name wrk) command)
+ (run-command command server
+ #:reply (reply socket)
+ #:worker worker)))))
+
+ (sleep (%request-period))
+ (loop)))))))
+
+(define (worker-management-thunk channel systems)
+ "Return a thunk that reads from CHANNEL requests to start new workers for
+SYSTEMS."
+ (lambda ()
+ (let loop ()
+ (match (get-message channel)
+ (`(start-workers ,count ,server ,local-address)
+ (log-info "starting ~a workers for server at ~a"
+ count (server-address server))
+ (let spawn ((i 0))
+ (when (< i count)
+ (start-worker (worker (name (generate-worker-name))
+ (address local-address)
+ (machine (gethostname))
+ (publish-url (local-publish-url
local-address))
+ (systems systems))
+ server)
+ (spawn (+ i 1))))))
+ (loop))))
;;;
@@ -402,31 +417,20 @@ and executing them. The worker can reply on the same
socket."
(define %publish-pid
(make-atomic-box #f))
-(define %worker-pids
- (make-atomic-box '()))
-
-(define (add-to-worker-pids! pid)
- (let ((pids (atomic-box-ref %worker-pids)))
- (atomic-box-set! %worker-pids (cons pid pids))))
-
(define (signal-handler)
"Catch SIGINT to stop the Avahi event loop and the publish process before
exiting."
(sigaction SIGINT
(lambda (signum)
- (let ((publish-pid (atomic-box-ref %publish-pid))
- (worker-pids (atomic-box-ref %worker-pids)))
- (atomic-box-set! %stop-process? #t)
-
- (for-each (lambda (pid)
- (when pid
- (log-info (G_ "terminating worker sub-process ~a")
- pid)
- (kill pid SIGKILL)
- (waitpid pid)))
- (cons publish-pid worker-pids))
+ (let ((pid (atomic-box-ref %publish-pid)))
+ (when pid
+ (log-info (G_ "terminating worker sub-process ~a")
+ pid)
+ (kill pid SIGKILL)
+ (waitpid pid))
- (exit 1)))))
+ (atomic-box-set! %stop-process? #t)
+ (primitive-exit 1)))))
(define (cuirass-remote-worker args)
(signal-handler)
@@ -464,27 +468,8 @@ exiting."
#:public-key public-key
#:private-key private-key))
- (if server-address
- (begin
- (log-info (N_ "creating ~a worker for build server at ~a"
- "creating ~a workers for build server at ~a"
- workers)
- workers server-address)
- (for-each
- (lambda (n)
- (let* ((worker (worker
- (name (generate-worker-name))
- (machine (gethostname))
- (systems systems)))
- (addr (string-split server-address #\:))
- (server (match addr
- ((address port)
- (server
- (address address)
- (port (string->number port)))))))
- (add-to-worker-pids!
- (start-worker worker server))))
- (iota workers)))
+ (let ((management-channel (make-channel)))
+ (unless server-address
(avahi-browse-service-thread
(lambda (action service)
(log-info (N_ "discovered build server at ~a, creating ~a
worker"
@@ -494,24 +479,43 @@ exiting."
workers)
(case action
((new-service)
- (for-each
- (lambda (n)
- (let* ((address (avahi-service-local-address service))
- (publish-url (local-publish-url address)))
- (add-to-worker-pids!
- (start-worker (worker
- (name (generate-worker-name))
- (address address)
- (machine (gethostname))
- (publish-url publish-url)
- (systems systems))
- (avahi-service->server service)))))
- (iota workers))
+ (put-message management-channel
+ `(start-workers ,workers
+ ,(avahi-service->server service)
+ ,(avahi-service-local-address
+ service)))
(atomic-box-set! %stop-process? #t))))
#:ignore-local? #f
#:types (list remote-server-service-type)
#:stop-loop? (lambda ()
(atomic-box-ref %stop-process?))))
- (while #t
- (sleep 1))))))
+ (run-fibers
+ (lambda ()
+ ;; Spawn the fiber that'll actually create workers as it receives
+ ;; requests on MANAGEMENT-CHANNEL.
+ (spawn-fiber (worker-management-thunk management-channel systems))
+
+ (when server-address
+ (log-info (N_ "creating ~a worker for build server at ~a"
+ "creating ~a workers for build server at ~a"
+ workers)
+ workers server-address)
+ (let* ((addr (string-split server-address #\:))
+ (server (match addr
+ ((address port)
+ (server
+ (address address)
+ (port (string->number port)))))))
+ (put-message management-channel
+ `(start-workers ,workers
+ ,server
+ ,(gethostname)))))
+
+ ;; XXX: Somehow #:drain? #t is not enough.
+ (while #t
+ (sleep 1800)
+ (log-info "worker's alive")))
+ #:hz 0
+ #:parallelism (min (current-processor-count) 4)
+ #:drain? #t))))))