[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Ludovic Courtès |
Date: |
Sun, 9 Jul 2023 18:02:45 -0400 (EDT) |
branch: master
commit 7079250d1bed692bb5b1128e3b3e7c04400749dc
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Sun Jul 9 23:52:45 2023 +0200
remote-server: Fiberize.
This turns 'cuirass remote-server' into a fiberized program, as opposed
to a heavy-handed multi-threaded program.
* src/cuirass/remote.scm (receive-logs): Rewrite in Fibers style,
assuming non-blocking I/O calls and using 'spawn-fiber' instead of
'call-with-new-thread'.
(zmq-socket->port): New procedure.
(receive-message)[wait]: New procedure.
Call it upfront.
* src/cuirass/notification.scm (start-notification-thread): Rename to...
(spawn-notification-fiber): ... this. Use 'spawn-fiber' instead of
'call-with-new-thread'. Use (@ (fibers) sleep) instead of 'sleep'.
* src/cuirass/scripts/remote-server.scm
(ensure-non-blocking-store-connection):
New procedure.
(add-to-store): Use it.
(start-fetch-worker): Use 'spawn-fiber' instead of
'call-with-new-thread'. Remove 'set-thread-name' call.
(start-periodic-updates-thread): Rename to...
(spawn-periodic-updates-fiber): ... this. Use 'spawn-fiber' instead of
'call-with-new-thread'. Remove 'set-thread-name' call.
(zmq-start-proxy): Remove 'socket-ready?', '%loop-timeout', and
'poll-items'. Use 'spawn-fiber' for the worker list update. Remove use
of 'zmq-poll*' in the main loop.
(terminate-helper-processes): New procedure.
(signal-handler): Use it instead of inline code. Call 'primitive-exit'
rather than 'exit'.
(cuirass-remote-server): Wrap body in 'run-fibers'.
* tests/database.scm ("database")["db-init"]: Remove
'start-notification-thread' call.
["mail notification", "mail notification, broken job"]: Add call to
'spawn-notification-fiber'.
---
src/cuirass/notification.scm | 13 +--
src/cuirass/remote.scm | 104 +++++++++++++++--------
src/cuirass/scripts/remote-server.scm | 155 ++++++++++++++++++----------------
tests/database.scm | 3 +-
4 files changed, 165 insertions(+), 110 deletions(-)
diff --git a/src/cuirass/notification.scm b/src/cuirass/notification.scm
index f3077fb..5249cb9 100644
--- a/src/cuirass/notification.scm
+++ b/src/cuirass/notification.scm
@@ -1,5 +1,6 @@
;;; notification.scm -- Send build notifications.
;;; Copyright © 2021 Mathieu Othacehe <othacehe@gnu.org>
+;;; Copyright © 2023 Ludovic Courtès <ludo@gnu.org>
;;;
;;; This file is part of Cuirass.
;;;
@@ -27,6 +28,7 @@
#:use-module (guix records)
#:use-module (ice-9 match)
#:use-module (ice-9 threads)
+ #:autoload (fibers) (spawn-fiber)
#:export (email
email?
email-from
@@ -39,7 +41,7 @@
notification->sexp
sexp->notification
- start-notification-thread))
+ spawn-notification-fiber))
;;;
@@ -152,19 +154,20 @@ the detailed information about this build here: ~a."
(log-error "Failed to send the mastodon notification: ~a."
args)))))
-(define (start-notification-thread)
+(define (spawn-notification-fiber)
"Start a thread sending build notifications."
- (call-with-new-thread
+ (spawn-fiber
(lambda ()
- (set-thread-name "notification")
(let loop ()
(match (db-pop-notification)
((notif . build)
+ (log-debug "notification ~s for build ~s"
+ notif build)
(cond
((email? notif)
(send-email* notif build))
((mastodon? notif)
(send-mastodon build))))
(#f #f))
- (sleep 1)
+ ((@ (fibers) sleep) 1)
(loop)))))
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index e73392f..6fedca7 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -40,6 +40,9 @@
#:use-module (ice-9 match)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 threads)
+ #:use-module (ice-9 suspendable-ports)
+ #:use-module (fibers)
+ #:use-module (fibers scheduler)
#:export (worker
worker?
worker-name
@@ -278,6 +281,7 @@ PRIVATE-KEY to sign narinfos."
(match (false-if-exception (read port))
(('log ('version 0)
('derivation derivation))
+ (log-debug (G_ "reading build log for ~a") derivation)
(let ((file (log-path cache derivation)))
(call-with-output-file file
(lambda (output)
@@ -286,43 +290,44 @@ PRIVATE-KEY to sign narinfos."
(log-error "invalid log received.")
#f)))
- (define (wait-for-client port proc)
- (let ((sock (socket AF_INET SOCK_STREAM 0)))
+ (define (wait-for-client port)
+ (let ((sock (socket AF_INET
+ (logior SOCK_STREAM SOCK_NONBLOCK SOCK_CLOEXEC)
+ 0)))
(setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
(bind sock AF_INET INADDR_ANY port)
(listen sock 1024)
- (while #t
- (match (select (list sock) '() '() 60)
- (((_) () ())
- (match (accept sock)
- ((client . address)
- (catch 'system-error
- (lambda ()
- (write '(log-server (version 0)) client)
- (force-output client)
- (proc client))
- (lambda args
- (let ((errno (system-error-errno args)))
- (when (memv errno (list EPIPE ECONNRESET ECONNABORTED))
- (log-error "~a when replying to ~a."
- (strerror errno) (fileno client)))))))))
- ((() () ())
- #f)))))
-
- (define (client-handler client)
- (call-with-new-thread
- (lambda ()
- (set-thread-name
- (string-append "log-server-"
- (number->string (port->fdes client))))
- (and=> client read-log)
- (when client
- (close-port client)))))
-
- (call-with-new-thread
+ (log-info (G_ "listening for build logs on port ~a") port)
+ (let loop ()
+ (match (accept sock (logior SOCK_NONBLOCK SOCK_CLOEXEC))
+ ((client . address)
+ (spawn-fiber
+ (lambda ()
+ (handle-client client address)))))
+ (loop))))
+
+ (define (handle-client client address)
+ (catch 'system-error
+ (lambda ()
+ (log-debug "preparing to receive build log from ~a"
+ (inet-ntop (sockaddr:fam address)
+ (sockaddr:addr address)))
+ (write '(log-server (version 0)) client)
+ (force-output client)
+ (read-log client)
+ (close-port client))
+ (lambda args
+ (close-port client)
+ (let ((errno (system-error-errno args)))
+ (when (memv errno (list EPIPE ECONNRESET ECONNABORTED))
+ (log-error "~a when replying to ~a."
+ (strerror errno)
+ (inet-ntop (sockaddr:fam address)
+ (sockaddr:addr address))))))))
+
+ (spawn-fiber
(lambda ()
- (set-thread-name "log-server")
- (wait-for-client port client-handler))))
+ (wait-for-client port))))
(define-syntax-rule (swallow-zlib-error exp ...)
"Swallow 'zlib-error' exceptions raised by EXP..."
@@ -336,6 +341,7 @@ PRIVATE-KEY to sign narinfos."
(in-addr (inet-pton AF_INET address))
(addr (make-socket-address AF_INET in-addr port)))
(connect sock addr)
+ ;; TODO: Fiberize together with 'remote-worker'.
(match (select (list sock) '() '() 10)
(((_) () ())
(match (read sock)
@@ -400,6 +406,24 @@ the message."
(cons recipient payload)
payload))))
+(define zmq-socket->port
+ (let ((table (make-weak-key-hash-table)))
+ (lambda (socket)
+ "Return a port wrapping SOCKET, a file descriptor."
+ (let ((fd (zmq-get-socket-option socket ZMQ_FD)))
+ (match (hashq-ref table socket)
+ (#f
+ (let ((port (fdopen fd "r+0")))
+ (set-port-revealed! port 1) ;let zmq close it
+ (hashq-set! table socket port)
+ port))
+ (port
+ (if (= fd (fileno port)) ;better be safe
+ port
+ (begin
+ (hashq-remove! table socket)
+ (zmq-socket->port socket)))))))))
+
(define* (receive-message socket #:key router?)
"Read an sexp from SOCKET, a ZMQ socket, and return it. Return the
unspecified value when reading a message without payload.
@@ -407,6 +431,20 @@ unspecified value when reading a message without payload.
When ROUTER? is true, assume messages received start with a routing
prefix (the identity of the peer, as a bytevector), and return three values:
the payload, the peer's identity (a bytevector), and the peer address."
+ (define (wait)
+ ;; Events are edge-triggered so before waiting, check whether there are
+ ;; messages available. See the discussion at
+ ;; <https://lists.zeromq.org/pipermail/zeromq-dev/2016-May/030349.html>.
+ (when (zero? (logand ZMQ_POLLIN
+ (zmq-get-socket-option socket ZMQ_EVENTS)))
+ ((current-read-waiter) (zmq-socket->port socket))
+ (when (zero? (zmq-get-socket-option socket ZMQ_EVENTS))
+ ;; Per <http://api.zeromq.org/master:zmq-getsockopt>, "applications
+ ;; should simply ignore this case and restart their polling
+ ;; operation/event loop."
+ (wait))))
+
+ (wait)
(if router?
(match (zmq-message-receive* socket)
((sender (= zmq-message-size 0) data)
diff --git a/src/cuirass/scripts/remote-server.scm
b/src/cuirass/scripts/remote-server.scm
index fe2b64c..ca008fc 100644
--- a/src/cuirass/scripts/remote-server.scm
+++ b/src/cuirass/scripts/remote-server.scm
@@ -42,6 +42,7 @@
#:select (current-build-output-port
ensure-path
store-protocol-error?
+ store-connection-socket
with-store))
#:use-module (guix ui)
#:use-module (guix utils)
@@ -65,6 +66,7 @@
#:use-module (ice-9 rdelim)
#:use-module (ice-9 regex)
#:use-module (ice-9 threads)
+ #:use-module (fibers)
#:export (cuirass-remote-server))
;; Indicate if the process has to be stopped.
@@ -337,12 +339,24 @@ be used to reply to the worker."
(lambda (tmp-file port)
(url-fetch* narinfo-url tmp-file)))))
+(define (ensure-non-blocking-store-connection store)
+ "Mark the file descriptor that backs STORE, a <store-connection>, as
+O_NONBLOCK."
+ (match (store-connection-socket store)
+ ((? file-port? port)
+ (let* ((fd (fileno port))
+ (flags (fcntl fd F_GETFL)))
+ (when (zero? (logand flags O_NONBLOCK))
+ (fcntl fd F_SETFL (logior O_NONBLOCK flags)))))
+ (_ #f)))
+
(define (add-to-store drv outputs url)
"Add the OUTPUTS that are available from the substitute server at URL to the
store. Register GC roots for the matching DRV and trigger a substitute baking
at URL."
(parameterize ((current-build-output-port (%make-void-port "w")))
(with-store store
+ (ensure-non-blocking-store-connection store)
(set-build-options* store (list url))
(for-each
(lambda (output)
@@ -404,14 +418,13 @@ directory."
(db-update-build-status! drv (build-status failed)))))
(define (start-fetch-worker name)
- "Start a fetch worker thread with the given NAME. This worker takes care of
-downloading build outputs. It communicates with the remote server using a ZMQ
-socket."
- (call-with-new-thread
+ "Start a fetch worker fiber, which takes care of downloading build outputs.
+It communicates with the remote worker using a ZMQ socket."
+ (spawn-fiber
(lambda ()
(use-modules (cuirass parameters)) ;XXX: Needed for mu-debug variable.
- (set-thread-name name)
(let ((socket (zmq-fetch-worker-socket)))
+ (log-debug "starting fetch worker ~s" name)
(let loop ()
(run-fetch (receive-message socket))
(atomic-box-fetch-and-dec! %fetch-queue-size)
@@ -422,18 +435,17 @@ socket."
;;; Periodic updates.
;;;
-(define (start-periodic-updates-thread)
- "Start a thread running periodic update queries."
- (call-with-new-thread
+(define (spawn-periodic-updates-fiber)
+ "Start a fiber running periodic update queries."
+ (spawn-fiber
(lambda ()
- (set-thread-name "periodic-updates")
(let loop ()
(let ((resumable (db-update-resumable-builds!))
(failed (db-update-failed-builds!)))
(log-info "period update: ~a resumable, ~a failed builds."
- resumable failed)
+ resumable failed)
(log-info "period update: ~a items in the fetch queue."
- (atomic-box-ref %fetch-queue-size)))
+ (atomic-box-ref %fetch-queue-size)))
(sleep 30)
(loop)))))
@@ -453,21 +465,10 @@ all network interfaces."
(define (zmq-start-proxy backend-port)
"This procedure starts a proxy between client connections from the IPC
frontend to the workers connected through the TCP backend."
- (define (socket-ready? items socket)
- (find (lambda (item)
- (eq? (poll-item-socket item) socket))
- items))
-
- ;; The poll loop below must not be blocked. Print a warning message if a
- ;; loop iteration takes more than %LOOP-TIMEOUT seconds to complete.
- (define %loop-timeout 5)
-
(let* ((build-socket
(zmq-create-socket %zmq-context ZMQ_ROUTER))
(fetch-socket
- (zmq-create-socket %zmq-context ZMQ_PUSH))
- (poll-items (list
- (poll-item build-socket ZMQ_POLLIN))))
+ (zmq-create-socket %zmq-context ZMQ_PUSH)))
;; Send bootstrap messages on worker connection to wake up the workers
;; that were hanging waiting for request-work responses.
@@ -476,28 +477,29 @@ frontend to the workers connected through the TCP
backend."
(zmq-bind-socket build-socket (zmq-backend-endpoint backend-port))
(zmq-bind-socket fetch-socket (zmq-fetch-workers-endpoint))
+ (spawn-fiber
+ (lambda ()
+ (let loop ()
+ (sleep (quotient (%worker-timeout) 2))
+ (log-debug (G_ "updating list of live workers"))
+ (db-remove-unresponsive-workers (%worker-timeout))
+ (loop))))
+
;; Do not use the built-in zmq-proxy as we want to edit the envelope of
;; frontend messages before forwarding them to the backend.
(let loop ()
- (let* ((items (zmq-poll* poll-items 1000))
- (start-time (current-time)))
- (when (socket-ready? items build-socket)
- (let* ((command sender sender-address
- (receive-message build-socket #:router? #t))
- (reply-worker (lambda (message)
- (send-message build-socket message
- #:recipient sender))))
- (if (need-fetching? command)
- (begin
- (atomic-box-fetch-and-inc! %fetch-queue-size)
- (send-message fetch-socket command))
- (read-worker-exp command
- #:peer-address sender-address
- #:reply-worker reply-worker))))
- (db-remove-unresponsive-workers (%worker-timeout))
- (let ((delta (- (current-time) start-time)))
- (when (> delta %loop-timeout)
- (log-warning "Poll loop busy during ~a seconds." delta)))
+ (let* ((command sender sender-address
+ (receive-message build-socket #:router? #t))
+ (reply-worker (lambda (message)
+ (send-message build-socket message
+ #:recipient sender))))
+ (if (need-fetching? command)
+ (begin
+ (atomic-box-fetch-and-inc! %fetch-queue-size)
+ (send-message fetch-socket command))
+ (read-worker-exp command
+ #:peer-address sender-address
+ #:reply-worker reply-worker))
(loop)))))
@@ -513,24 +515,25 @@ frontend to the workers connected through the TCP
backend."
(define %avahi-thread
(make-atomic-box #f))
+(define (terminate-helper-processes)
+ (let ((publish-pid (atomic-box-ref %publish-pid))
+ (avahi-thread (atomic-box-ref %avahi-thread)))
+ (atomic-box-set! %stop-process? #t)
+
+ (when publish-pid
+ (kill publish-pid SIGHUP)
+ (waitpid publish-pid))
+
+ (when avahi-thread
+ (join-thread avahi-thread))))
+
(define (signal-handler)
"Catch SIGINT to stop the Avahi event loop and the publish process before
exiting."
(sigaction SIGINT
(lambda (signum)
- (let ((publish-pid (atomic-box-ref %publish-pid))
- (avahi-thread (atomic-box-ref %avahi-thread)))
- (atomic-box-set! %stop-process? #t)
-
- (and publish-pid
- (begin
- (kill publish-pid SIGHUP)
- (waitpid publish-pid)))
-
- (and avahi-thread
- (join-thread avahi-thread))
-
- (exit 1)))))
+ (terminate-helper-processes)
+ (primitive-exit 1))))
(define (gather-user-privileges user)
"switch to the identity of user, a user name."
@@ -632,19 +635,29 @@ exiting."
#:txt `(,(string-append "log-port="
(number->string log-port))
,@(if publish-port
- (list (string-append "publish-port="
- (number->string publish-port)))
- '()))))
-
- (receive-logs log-port (%cache-directory))
-
- (with-database
- (start-notification-thread)
- (start-periodic-updates-thread)
- (for-each (lambda (number)
- (start-fetch-worker
- (string-append "fetch-worker-"
- (number->string number))))
- (iota (%fetch-workers)))
-
- (zmq-start-proxy backend-port))))))
+ (list (string-append "publish-port="
+ (number->string publish-port)))
+ '()))))
+
+ (run-fibers
+ (lambda ()
+ (with-database
+ (receive-logs log-port (%cache-directory))
+ (spawn-notification-fiber)
+ (spawn-periodic-updates-fiber)
+ (for-each (lambda (number)
+ (start-fetch-worker
+ (string-append "fetch-worker-"
+ (number->string number))))
+ (iota (%fetch-workers)))
+
+ (catch 'zmq-error
+ (lambda ()
+ (zmq-start-proxy backend-port))
+ (lambda (key errno message . _)
+ (log-error (G_ "failed to start worker/database proxy: ~a")
+ message)
+ (terminate-helper-processes)
+ (primitive-exit 1)))))
+ #:hz 0
+ #:parallelism 1)))))
diff --git a/tests/database.scm b/tests/database.scm
index f8c9a38..69c6053 100644
--- a/tests/database.scm
+++ b/tests/database.scm
@@ -125,7 +125,6 @@
(test-assert "db-init"
(begin
(test-init-db!)
- (start-notification-thread)
#t))
(test-equal "db-add-or-update-specification"
@@ -666,6 +665,7 @@ timestamp, checkouttime, evaltime) VALUES ('guix', 0, 0, 0,
0);")
(test-assert "mail notification"
(with-fibers
+ (spawn-notification-fiber)
(retry
(lambda ()
(and (file-exists? tmp-mail)
@@ -684,6 +684,7 @@ timestamp, checkouttime, evaltime) VALUES ('guix', 0, 0, 0,
0);")
(test-assert "mail notification, broken job"
(with-fibers
+ (spawn-notification-fiber)
(retry
(lambda ()
(and (file-exists? tmp-mail)
- master updated (445198e -> 7079250), Ludovic Courtès, 2023/07/09
- [no subject], Ludovic Courtès, 2023/07/09
- [no subject], Ludovic Courtès, 2023/07/09
- [no subject], Ludovic Courtès, 2023/07/09
- [no subject], Ludovic Courtès, 2023/07/09
- [no subject], Ludovic Courtès, 2023/07/09
- [no subject], Ludovic Courtès, 2023/07/09
- [no subject],
Ludovic Courtès <=
- [no subject], Ludovic Courtès, 2023/07/09
- [no subject], Ludovic Courtès, 2023/07/09
- [no subject], Ludovic Courtès, 2023/07/09