[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Mathieu Othacehe |
Date: |
Mon, 21 Dec 2020 05:30:42 -0500 (EST) |
branch: wip-offload
commit 7ba2dc5f572cf0d820bc0bcb6bfa4cb06aab00da
Author: Mathieu Othacehe <othacehe@gnu.org>
AuthorDate: Mon Dec 21 11:30:16 2020 +0100
tmp2
---
bin/cuirass.in | 179 +++++++++++++++++++++--------------------
src/cuirass/base.scm | 44 ----------
src/cuirass/database.scm | 1 +
src/cuirass/remote-server.scm | 181 +++++++++++-------------------------------
src/cuirass/remote-worker.scm | 14 ++--
src/cuirass/remote.scm | 93 +---------------------
6 files changed, 144 insertions(+), 368 deletions(-)
diff --git a/bin/cuirass.in b/bin/cuirass.in
index 7ba89db..762b6ab 100644
--- a/bin/cuirass.in
+++ b/bin/cuirass.in
@@ -149,96 +149,95 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0"
"$@"
(lambda ()
(with-database
(with-queue-writer-worker
- (with-build-offload-thread
- (and specfile
- (let ((new-specs (save-module-excursion
- (lambda ()
- (set-current-module
- (make-user-module '()))
- (primitive-load specfile)))))
- (for-each db-add-specification new-specs)))
-
- (when queries-file
- (log-message "Enable SQL query logging.")
- (db-log-queries queries-file))
-
- (if one-shot?
- (process-specs (db-get-specifications))
- (let ((exit-channel (make-channel)))
- (start-watchdog)
- (if (option-ref opts 'web #f)
- (begin
- (spawn-fiber
- (essential-task
- 'web exit-channel
- (lambda ()
- (run-cuirass-server #:host host
- #:port port)))
- #:parallel? #t)
-
- (spawn-fiber
- (essential-task
- 'monitor exit-channel
- (lambda ()
- (while #t
- (log-monitoring-stats)
- (sleep 600))))))
-
- (begin
- (clear-build-queue)
-
- ;; If Cuirass was stopped during an evaluation,
- ;; abort it. Builds that were not registered
- ;; during this evaluation will be registered
- ;; during the next evaluation.
- (db-abort-pending-evaluations)
-
- ;; First off, restart builds that had not
- ;; completed or were not even started on a
- ;; previous run.
- (spawn-fiber
- (essential-task
- 'restart-builds exit-channel
- (lambda ()
- (restart-builds))))
-
- (spawn-fiber
- (essential-task
- 'build exit-channel
- (lambda ()
- (while #t
- (process-specs (db-get-specifications))
- (log-message
- "next evaluation in ~a seconds" interval)
- (sleep interval)))))
-
- (when (%build-remote?)
- (spawn-fiber
- (essential-task
- 'request-workers exit-channel
- (lambda ()
- (while #t
- (request-workers)
- (sleep 60))))))
-
- (spawn-fiber
- (essential-task
- 'metrics exit-channel
- (lambda ()
- (while #t
- (with-time-logging
- "Metrics update"
- (db-update-metrics))
- (sleep 3600)))))
-
- (spawn-fiber
- (essential-task
- 'monitor exit-channel
- (lambda ()
- (while #t
- (log-monitoring-stats)
- (sleep 600)))))))
- (primitive-exit (get-message exit-channel))))))))
+ (and specfile
+ (let ((new-specs (save-module-excursion
+ (lambda ()
+ (set-current-module
+ (make-user-module '()))
+ (primitive-load specfile)))))
+ (for-each db-add-specification new-specs)))
+
+ (when queries-file
+ (log-message "Enable SQL query logging.")
+ (db-log-queries queries-file))
+
+ (if one-shot?
+ (process-specs (db-get-specifications))
+ (let ((exit-channel (make-channel)))
+ (start-watchdog)
+ (if (option-ref opts 'web #f)
+ (begin
+ (spawn-fiber
+ (essential-task
+ 'web exit-channel
+ (lambda ()
+ (run-cuirass-server #:host host
+ #:port port)))
+ #:parallel? #t)
+
+ (spawn-fiber
+ (essential-task
+ 'monitor exit-channel
+ (lambda ()
+ (while #t
+ (log-monitoring-stats)
+ (sleep 600))))))
+
+ (begin
+ (clear-build-queue)
+
+ ;; If Cuirass was stopped during an evaluation,
+ ;; abort it. Builds that were not registered
+ ;; during this evaluation will be registered
+ ;; during the next evaluation.
+ (db-abort-pending-evaluations)
+
+ ;; First off, restart builds that had not
+ ;; completed or were not even started on a
+ ;; previous run.
+ (spawn-fiber
+ (essential-task
+ 'restart-builds exit-channel
+ (lambda ()
+ (restart-builds))))
+
+ (spawn-fiber
+ (essential-task
+ 'build exit-channel
+ (lambda ()
+ (while #t
+ (process-specs (db-get-specifications))
+ (log-message
+ "next evaluation in ~a seconds" interval)
+ (sleep interval)))))
+
+ (when (%build-remote?)
+ (spawn-fiber
+ (essential-task
+ 'request-workers exit-channel
+ (lambda ()
+ (while #t
+ (request-workers)
+ (sleep 60))))))
+
+ (spawn-fiber
+ (essential-task
+ 'metrics exit-channel
+ (lambda ()
+ (while #t
+ (with-time-logging
+ "Metrics update"
+ (db-update-metrics))
+ (sleep 3600)))))
+
+ (spawn-fiber
+ (essential-task
+ 'monitor exit-channel
+ (lambda ()
+ (while #t
+ (log-monitoring-stats)
+ (sleep 600)))))))
+ (primitive-exit (get-message exit-channel)))))))
;; Most of our code is I/O so preemption doesn't matter much (it
;; could help while we're doing SQL requests, for instance, but it
diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 47cdb8b..2a2cbb3 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -72,7 +72,6 @@
prepare-git
process-specs
evaluation-log-file
- with-build-offload-thread
request-workers
;; Parameters.
@@ -124,10 +123,6 @@
;; Define whether to fall back to building when the substituter fails.
(make-parameter #f))
-(define %build-offload-channel
- ;; Channel to communicate with the remote build server.
- (make-parameter #f))
-
(define %package-cachedir
;; Define to location of cache directory of this package.
(make-parameter (or (getenv "CUIRASS_CACHEDIR")
@@ -454,40 +449,6 @@ Essentially this procedure inverts the
inversion-of-control that
(raise c))
(x x)))))))
-(define (make-build-offload-thread)
- "Return a channel used to offload builds by communicating with the remote
-build server in a separate thread. The spawned thread also polls for build
-events sent by the remote server and calls HANDLE-BUILD-EVENT to register them
-in the database."
- (let ((channel (make-channel)))
- (call-with-new-thread
- (lambda ()
- (parameterize (((@@ (fibers internal) current-fiber) #f)
- (current-read-waiter (lambda (port)
- (port-poll port "r")))
- (current-write-waiter (lambda (port)
- (port-poll port "w"))))
- (remote-build-init!)
- (let ((socket (remote-build-socket)))
- (let loop ()
- (remote-build-poll socket handle-build-event)
- (match (get-message-with-timeout channel
- #:seconds 0.1
- #:retry? #f)
- ('workers
- (remote-send-workers socket))
- ('timeout #f))
- (loop))))))
- channel))
-
-(define-syntax-rule (with-build-offload-thread body ...)
- (parameterize ((%build-offload-channel
- (make-build-offload-thread)))
- body ...))
-
-(define (request-workers)
- (put-message (%build-offload-channel) 'workers))
-
;;;
;;; Building packages.
@@ -659,11 +620,6 @@ updating the database accordingly."
(log-message "build failed: '~a'" drv)
(db-update-build-status! drv (build-status failed)))
(log-message "bogus build-failed event for '~a'" drv)))
- (('workers workers)
- (db-clear-workers)
- (for-each (lambda (worker)
- (db-add-worker (sexp->worker worker)))
- workers))
(('substituter-started item _ ...)
(log-message "substituter started: '~a'" item))
(('substituter-succeeded item _ ...)
diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm
index c9e3f64..53f013b 100644
--- a/src/cuirass/database.scm
+++ b/src/cuirass/database.scm
@@ -640,6 +640,7 @@ string."
(define-enumeration build-status
;; Build status as expected by Hydra's API. Note: the negative values are
;; Cuirass' own extensions.
+ (submitted -3)
(scheduled -2)
(started -1)
(succeeded 0)
diff --git a/src/cuirass/remote-server.scm b/src/cuirass/remote-server.scm
index 3ad722b..22a1915 100644
--- a/src/cuirass/remote-server.scm
+++ b/src/cuirass/remote-server.scm
@@ -19,6 +19,7 @@
(define-module (cuirass remote-server)
#:use-module (cuirass base)
#:use-module (cuirass database)
+ #:use-module (cuirass logging)
#:use-module (cuirass remote)
#:use-module (gcrypt pk-crypto)
#:use-module (guix avahi)
@@ -156,49 +157,18 @@ Start a remote build server.\n"))
;; Set of connected workers.
(make-hash-table))
-(define %build-queues
- ;; Builds request queue.
- (map (lambda (system)
- (cons system (make-q)))
- %supported-systems))
-
-(define (find-system-queues systems)
- "Return the list of build queues for SYSTEMS that are not empty."
- (filter-map (match-lambda
- ((system . queue)
- (and (member system systems)
- (not (q-empty? queue))
- queue)))
- %build-queues))
-
-(define (build-available? name)
- "Return #t if there is some available work for the worker with the given
-NAME and #f otherwise."
- (let* ((worker (hash-ref %workers name))
- (systems (and worker (worker-systems worker)))
- (queues (if systems
- (find-system-queues systems)
- '())))
- (not (null? queues))))
-
-(define (pop-random-build name)
- "Pop randomly and return a build from all the build queues with available
-work for the worker with the given NAME."
- (define (random-queue queues)
- (list-ref queues (random (length queues))))
+(define (pop-build name)
+ (define (random-system systems)
+ (list-ref systems (random (length systems))))
(let* ((worker (hash-ref %workers name))
- (systems (worker-systems worker))
- (queues (find-system-queues systems)))
- (q-pop! (random-queue queues))))
-
-(define (sort-build-queue! queue)
- (sort! (car queue)
- (lambda (a b)
- (if (= (build-priority a) (build-priority b))
- (> (build-timestamp a) (build-timestamp b))
- (< (build-priority a) (build-priority b)))))
- (sync-q! queue))
+ (systems (worker-systems worker)))
+ (match (db-get-builds `((status . scheduled)
+ (system . ,(random-system systems))
+ (order . priority+timestamp)
+ (nr . 1)))
+ ((build) build)
+ (() #f))))
(define (remove-unresponsive-workers!)
(let ((unresponsive
@@ -214,47 +184,6 @@ work for the worker with the given NAME."
(hash-remove! %workers worker))
unresponsive)))
-(define* (read-client-exp client-socket client exp)
- "Read the given EXP sent by CLIENT."
- (catch 'system-error
- (lambda ()
- (match (zmq-read-message exp)
- (('build ('drv drv)
- ('priority priority)
- ('timeout timeout)
- ('max-silent max-silent)
- ('timestamp timestamp)
- ('system system))
- (let* ((system (or system
- (derivation-system
- (read-derivation-from-file drv))))
- (queue (assoc-ref %build-queues system)))
- ;; Push the derivation to the matching queue according to the
- ;; targeted system. Also save the client ID in the queue to be able
- ;; to send it build events later on.
- (q-push! queue
- (build
- (client client)
- (derivation drv)
- (priority priority)
- (timeout timeout)
- (max-silent max-silent)
- (timestamp timestamp)
- (system system)))
- (sort-build-queue! queue)))
- (('request-workers)
- (zmq-send-msg-parts-bytevector
- client-socket
- (list client
- (make-bytevector 0)
- (string->bv
- (zmq-workers
- (hash-fold (lambda (key value old)
- (cons (worker->sexp value) old))
- '()
- %workers))))))))
- (const #f)))
-
(define* (read-worker-exp exp #:key reply-worker)
"Read the given EXP sent by a worker. REPLY-WORKER is a procedure that can
be used to reply to the worker."
@@ -272,21 +201,30 @@ be used to reply to the worker."
(lambda (name)
(info (G_ "Worker `~a' is ready.~%") name))))
(('worker-request-work name)
- (if (build-available? name)
- (let ((build (pop-random-build name)))
- (match-record build <build>
- (client derivation priority timeout max-silent)
+ (let ((build (pk (pop-build name))))
+ (if build
+ (let ((derivation (assq-ref build #:derivation))
+ (priority (assq-ref build #:priority))
+ (timeout (assq-ref build #:timeout))
+ (max-silent (assq-ref build #:max-silent)))
+ (db-update-build-status! derivation (build-status submitted))
(reply-worker
- client
(zmq-build-request-message derivation
#:priority priority
#:timeout timeout
#:max-silent max-silent))))
(reply-worker
- (zmq-empty-delimiter)
(zmq-no-build-message))))
(('worker-ping worker)
- (update-workers! worker (const #t)))))
+ (update-workers! worker (const #t))
+ (db-clear-workers)
+ (hash-for-each (lambda (key value)
+ (db-add-worker value))
+ %workers))
+ (('build-started ('drv drv) ('worker worker))
+ (log-message "build started: '~a'" drv)
+ (db-update-build-worker! drv worker)
+ (db-update-build-status! drv (build-status started)))))
;;;
@@ -301,7 +239,7 @@ be used to reply to the worker."
(define (zmq-fetch-worker-socket)
"Return a socket used to communicate with the fetch workers."
- (let ((socket (zmq-create-socket %zmq-context ZMQ_REP))
+ (let ((socket (zmq-create-socket %zmq-context ZMQ_PULL))
(endpoint (zmq-fetch-workers-endpoint)))
(zmq-connect socket endpoint)
socket))
@@ -442,10 +380,7 @@ required and #f otherwise."
(define* (run-fetch message)
"Read MESSAGE and download the corresponding build outputs. If
%CACHE-DIRECTORY is set, download the matching NAR and NARINFO files in this
-directory. If %ADD-TO-STORE? is set, add the build outputs to the store.
-
-REPLY is procedure used to forward MESSAGE to the client once the build
-outputs are downloaded."
+directory. If %ADD-TO-STORE? is set, add the build outputs to the store."
(define (build-outputs drv)
(catch 'system-error
(lambda ()
@@ -459,7 +394,6 @@ outputs are downloaded."
(let ((log-directory (%log-directory)))
(match (zmq-read-message message)
(('build-succeeded ('drv drv) ('url url) _ ...)
- (info (G_ "Fetching derivation ~a build outputs.~%") drv)
(let ((outputs (build-outputs drv))
(log-file
(and log-directory
@@ -468,6 +402,7 @@ outputs are downloaded."
(add-to-store outputs url))
(when (%cache-directory)
(download-nar (%cache-directory) outputs url))
+ (log-message "build succeeded: '~a'" drv)
(set-build-successful! drv log-file)))
(('build-failed ('drv drv) ('url url) _ ...)
(let ((log-file
@@ -490,8 +425,8 @@ socket."
(let ((socket (zmq-fetch-worker-socket)))
(let loop ()
(match (zmq-get-msg-parts-bytevector socket)
- ((client empty rest)
- (run-fetch (bv->string rest))))
+ ((message)
+ (run-fetch (bv->string message))))
(loop))))))
@@ -515,18 +450,13 @@ frontend to the workers connected through the TCP
backend."
(eq? (poll-item-socket item) socket))
items))
- (let* ((client-socket
- (zmq-create-socket %zmq-context ZMQ_ROUTER))
- (build-socket
+ (let* ((build-socket
(zmq-create-socket %zmq-context ZMQ_ROUTER))
(fetch-socket
- (zmq-create-socket %zmq-context ZMQ_DEALER))
+ (zmq-create-socket %zmq-context ZMQ_PUSH))
(poll-items (list
- (poll-item client-socket ZMQ_POLLIN)
- (poll-item build-socket ZMQ_POLLIN)
- (poll-item fetch-socket ZMQ_POLLIN))))
+ (poll-item build-socket ZMQ_POLLIN))))
- (zmq-bind-socket client-socket zmq-frontend-endpoint)
(zmq-bind-socket build-socket (zmq-backend-endpoint backend-port))
(zmq-bind-socket fetch-socket (zmq-fetch-workers-endpoint))
@@ -534,40 +464,20 @@ frontend to the workers connected through the TCP
backend."
;; frontend messages before forwarding them to the backend.
(let loop ()
(let ((items (zmq-poll* poll-items 1000)))
- ;; client -> remote-server.
- (when (zmq-socket-ready? items client-socket)
- (match (zmq-get-msg-parts-bytevector client-socket)
- ((client empty rest)
- (read-client-exp client-socket
- client
- (bv->string rest)))))
- ;; build-worker -> remote-server.
(when (zmq-socket-ready? items build-socket)
(match (zmq-get-msg-parts-bytevector build-socket)
((worker empty rest)
(let ((reply-worker
- (lambda (client message)
+ (lambda (message)
(zmq-send-msg-parts-bytevector
build-socket
(list worker
(zmq-empty-delimiter)
- client
- (zmq-empty-delimiter)
(string->bv message))))))
- (read-worker-exp (bv->string rest)
- #:reply-worker reply-worker)))
- ((worker empty client empty rest)
- (let ((message (list client (zmq-empty-delimiter) rest)))
(if (need-fetching? (bv->string rest))
- (zmq-send-msg-parts-bytevector fetch-socket
- (cons empty message))
- (zmq-send-msg-parts-bytevector client-socket message))))))
- ;; fetch-worker -> remote-server.
- (when (zmq-socket-ready? items fetch-socket)
- (match (zmq-get-msg-parts-bytevector fetch-socket)
- ((empty . rest)
- (zmq-send-msg-parts-bytevector client-socket rest))))
-
+ (zmq-send-bytevector fetch-socket rest)
+ (read-worker-exp (bv->string rest)
+ #:reply-worker reply-worker))))))
(remove-unresponsive-workers!)
(loop)))))
@@ -641,6 +551,7 @@ exiting."
(parameterize ((%add-to-store? add-to-store?)
(%cache-directory cache)
+ (%db-writer-queue-size 1)
(%log-directory log-directory)
(%package-database database)
(%public-key public-key)
@@ -665,12 +576,12 @@ exiting."
#:txt (list (string-append "publish="
(number->string publish-port)))))
- (for-each (lambda (number)
- (start-fetch-worker
- (string-append "fetch-worker-" (number->string number))))
- (iota 4))
-
(with-database
(with-queue-writer-worker
- (zmq-init!)
+ (for-each (lambda (number)
+ (start-fetch-worker
+ (string-append "fetch-worker-"
+ (number->string number))))
+ (iota 4))
+
(zmq-start-proxy backend-port)))))))
diff --git a/src/cuirass/remote-worker.scm b/src/cuirass/remote-worker.scm
index 7bc1471..baee0e4 100644
--- a/src/cuirass/remote-worker.scm
+++ b/src/cuirass/remote-worker.scm
@@ -228,18 +228,17 @@ command. REPLY is a procedure that can be used to reply
to this server."
(zmq-connect socket endpoint)
(let loop ()
(ping socket)
- (sleep 10)
+ (sleep 60)
(loop))))))
(define (start-worker worker server)
"Start a worker thread named NAME, reading commands from the DEALER socket
and executing them. The worker can reply on the same socket."
- (define (reply socket client)
+ (define (reply socket)
(lambda (message)
(zmq-send-msg-parts-bytevector
socket
- (list (zmq-empty-delimiter) client
- (zmq-empty-delimiter) (string->bv message)))))
+ (list (zmq-empty-delimiter) (string->bv message)))))
(define (ready socket)
(zmq-send-msg-parts-bytevector
@@ -262,18 +261,17 @@ and executing them. The worker can reply on the same
socket."
(address (server-address server))
(port (server-port server))
(endpoint (zmq-backend-endpoint address port)))
- (zmq-init!)
(zmq-connect socket endpoint)
(ready socket)
(worker-ping worker server)
(let loop ()
(request-work socket)
(match (zmq-get-msg-parts-bytevector socket '())
- ((empty client empty command)
+ ((empty command)
(run-command (bv->string command) server
- #:reply (reply socket client)
+ #:reply (reply socket)
#:worker worker)))
- (sleep 1)
+ (sleep 10)
(loop))))))
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index 768fce8..690cac9 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -53,22 +53,9 @@
publish-url
avahi-service->server
- build
- <build>
- builds?
- build-client
- build-derivation
- build-priority
- build-timeout
- build-max-silent
- build-timestamp
- build-system
-
publish-server
set-build-options*
- zmq-init!
- zmq-frontend-endpoint
zmq-poll*
zmq-socket-ready?
zmq-empty-delimiter
@@ -81,15 +68,9 @@
zmq-worker-ping
zmq-worker-ready-message
zmq-worker-request-work-message
- zmq-request-workers
- zmq-workers
zmq-read-message
- remote-server-service-type
- remote-build-init!
- remote-build-socket
- remote-send-workers
- remote-build-poll))
+ remote-server-service-type))
;;;
@@ -162,7 +143,7 @@
(string-append (gethostname) "-" (random-string 4)))
(define %worker-timeout
- (make-parameter 60))
+ (make-parameter 120))
;;;
@@ -214,22 +195,6 @@ given NAME."
;;;
-;;; Build.
-;;;
-
-(define-record-type* <build>
- build make-build
- build?
- (client build-client)
- (derivation build-derivation)
- (priority build-priority)
- (timeout build-timeout)
- (max-silent build-max-silent)
- (timestamp build-timestamp)
- (system build-system))
-
-
-;;;
;;; Store publishing.
;;;
@@ -286,12 +251,6 @@ PRIVATE-KEY to sign narinfos."
(define %zmq-context
(zmq-create-context))
-(define (zmq-init!)
- (zmq-set-buffer-size (* 4096 16)))
-
-(define zmq-frontend-endpoint
- "ipc://@remote-build-socket")
-
(define (EINTR-safe proc)
"Return a variant of PROC that catches EINTR 'zmq-error' exceptions and
retries a call to PROC."
@@ -368,53 +327,5 @@ retries a call to PROC."
"Return a message that indicates that WORKER is requesting work."
(format #f "~s" `(worker-request-work ,name)))
-(define (zmq-request-workers)
- "Return a message requesting the WORKERS list."
- (format #f "~s" `(request-workers)))
-
-(define (zmq-workers workers)
- "Return a message containing the WORKERS list."
- (format #f "~s" `(workers ,workers)))
-
-
-;;;
-;;; Remote builds.
-;;;
-
(define remote-server-service-type
"_remote-server._tcp")
-
-(define (remote-build-init!)
- (zmq-init!))
-
-(define (remote-build-socket)
- "Return a socket used to communicate with the remote build server."
- (let ((socket (zmq-create-socket %zmq-context ZMQ_DEALER))
- (endpoint zmq-frontend-endpoint))
- (zmq-connect socket endpoint)
- socket))
-
-(define (remote-send-workers socket)
- "Request the workers list on SOCKET."
- (zmq-send-msg-parts-bytevector
- socket
- (list (make-bytevector 0)
- (string->bv (zmq-request-workers)))))
-
-(define* (remote-build-poll socket event-proc
- #:key
- (timeout 1000))
- "Poll SOCKET for messages and call EVENT-PROC each time a build event is
-received, return if no event occured for TIMEOUT milliseconds."
- (define (parse-result result)
- (match (zmq-read-message result)
- (('workers workers)
- (event-proc (list 'workers workers)))))
-
- (let* ((poll-items (list
- (poll-item socket ZMQ_POLLIN)))
- (items (zmq-poll* poll-items timeout)))
- (when (zmq-socket-ready? items socket)
- (match (zmq-get-msg-parts-bytevector socket '())
- ((empty result)
- (parse-result (bv->string result)))))))