guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Ludovic Courtès
Date: Wed, 13 Sep 2023 13:05:39 -0400 (EDT)

branch: wip-actors
commit 7fdd780623493aa5a5d2bff4bbed06b15b731521
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Wed Sep 13 11:15:40 2023 +0200

    base: Move evaluations to a separate actor.
    
    * src/cuirass/base.scm (start-evaluation, jobset-evaluator)
    (spawn-jobset-evaluator): New procedures.
    (jobset-monitor): Add #:evaluator.  Replace inline evaluation with a
    message to EVALUATOR.
    (spawn-jobset-monitor): Add #:evaluator and pass honor it.
    (jobset-registry, spawn-jobset-registry): Likewise.
    * src/cuirass/scripts/register.scm (cuirass-register): Call
    ‘spawn-jobset-evaluator’ and pass it to ‘spawn-jobset-registry’.
---
 src/cuirass/base.scm             | 142 ++++++++++++++++++++++++++-------------
 src/cuirass/scripts/register.scm |   5 +-
 2 files changed, 100 insertions(+), 47 deletions(-)

diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 6da1fd2..932af4d 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -75,6 +75,7 @@
             build-packages
             prepare-git
             spawn-channel-update-service
+            spawn-jobset-evaluator
             spawn-jobset-registry
             lookup-jobset
             register-jobset
@@ -726,6 +727,78 @@ channels, and return its communication channel."
     (spawn-fiber (channel-update-service channel))
     channel))
 
+(define (start-evaluation spec instances timestamp)
+  "Start an evaluation of SPEC using the given channel INSTANCES.  Return #f if
+nothing has changed (and thus no new evaluation was created), otherwise return
+the ID of the new evaluation."
+  (let* ((channels (map channel-instance-channel instances))
+         (new-spec (specification
+                    (inherit spec)
+                    ;; Include possible channel dependencies
+                    (channels channels)))
+         (checkouttime (time-second (current-time time-utc)))
+         (eval-id (db-add-evaluation (specification-name spec) instances
+                                     #:timestamp timestamp
+                                     #:checkouttime checkouttime)))
+
+    (and eval-id
+         (guard (c ((evaluation-error? c)
+                    (log-error "failed to evaluate spec '~a'; see ~a"
+                               (evaluation-error-spec-name c)
+                               (evaluation-log-file
+                                (evaluation-error-id c)))
+                    #f))
+           (log-info "evaluating spec '~a'" (specification-name spec))
+
+           ;; The LATEST-CHANNEL-INSTANCES procedure may return channel
+           ;; dependencies that are not declared in the initial specification
+           ;; channels.  Update the given SPEC to take them into account.
+           (db-add-or-update-specification new-spec)
+           (evaluate spec eval-id)
+           (db-set-evaluation-time eval-id)
+
+           eval-id))))
+
+(define* (jobset-evaluator channel
+                           #:key (max-parallel-evaluations
+                                  (current-processor-count)))
+  (define pool
+    (make-resource-pool (iota max-parallel-evaluations)))
+
+  (lambda ()
+    (log-info "will perform up to ~a evaluations concurrently"
+              max-parallel-evaluations)
+    (let loop ()
+      (match (get-message channel)
+        (`(evaluate ,spec ,instances ,timestamp)
+         ;; Take a token a perform the given evaluation.
+         (spawn-fiber
+          (lambda ()
+            (define eval-id
+              (with-resource-from-pool pool token
+                (log-info "evaluating '~a' with token #~a"
+                          (specification-name spec) token)
+                (start-evaluation spec instances timestamp)))
+
+            (when eval-id
+              (log-info "new evaluation ~a of jobset '~a'"
+                        eval-id (specification-name spec))
+              (with-store/non-blocking store
+                (build-packages store eval-id)))))
+         (loop))))))
+
+
+(define* (spawn-jobset-evaluator #:key (max-parallel-evaluations
+                                        (current-processor-count)))
+  "Spawn the actor responsible for evaluating jobsets for a given spec and set
+of channel instances.  The actor performs at most MAX-PARALLEL-EVALUATIONS
+concurrently."
+  (let ((channel (make-channel)))
+    (spawn-fiber (jobset-evaluator channel
+                                   #:max-parallel-evaluations
+                                   max-parallel-evaluations))
+    channel))
+
 (define %jobset-trigger-rate-window
   ;; Window (seconds) over which the jobset trigger rate is computed.
   (* 5 60))                                       ;5 minutes
@@ -734,8 +807,9 @@ channels, and return its communication channel."
   ;; Maximum rate (triggers per seconds) at which jobsets may be triggered.
   (/ 3 (* 2 60.)))                                ;3 times in 2 minutes
 
-(define* (jobset-monitor channel spec update-service
-                         #:key (polling-period 60))
+(define* (jobset-monitor channel spec
+                         #:key (polling-period 60)
+                         update-service evaluator)
   (define period
     (if (> (specification-period spec) 0)
         (specification-period spec)
@@ -782,57 +856,28 @@ channels, and return its communication channel."
                 (instances
                  (log-info "fetched channels for '~a':~{ ~a~}"
                            name (map channel-name channels))
-                 (let* ((channels (map channel-instance-channel instances))
-                        (new-spec (specification
-                                   (inherit spec)
-                                   ;; Include possible channel dependencies
-                                   (channels channels)))
-                        (checkouttime (time-second (current-time time-utc)))
-                        (eval-id (db-add-evaluation name instances
-                                                    #:timestamp timestamp
-                                                    #:checkouttime 
checkouttime)))
-
-                   (when eval-id
-                     (spawn-fiber
-                      (lambda ()
-                        ;; TODO: Move this to an evaluation actor that limits
-                        ;; parallelism.
-                        (guard (c ((evaluation-error? c)
-                                   (log-error "failed to evaluate spec '~a'; 
see ~a"
-                                              (evaluation-error-spec-name c)
-                                              (evaluation-log-file
-                                               (evaluation-error-id c)))
-                                   #f))
-                          (log-info "evaluating spec '~a'" name)
-
-                          ;; The LATEST-CHANNEL-INSTANCES procedure may return 
channel
-                          ;; dependencies that are not declared in the initial
-                          ;; specification channels.  Update the given SPEC to 
take
-                          ;; them into account.
-                          (db-add-or-update-specification new-spec)
-                          (evaluate spec eval-id)
-                          (db-set-evaluation-time eval-id)
-                          (with-store/non-blocking store
-                            (build-packages store eval-id)))))
-
-                     ;; 'spawn-fiber' returns zero values but we need one.
-                     *unspecified*))))
+                 (put-message evaluator
+                              `(evaluate ,spec ,instances ,timestamp))))
 
               (loop (cons timestamp (take-while recent? last-updates)))))))))
 
-(define* (spawn-jobset-monitor spec update-service
-                               #:key (polling-period 60))
+(define* (spawn-jobset-monitor spec
+                               #:key (polling-period 60)
+                               update-service evaluator)
   "Spawn an actor responsible for monitoring the jobset corresponding to SPEC,
 a <specification> record, and return it.  The actor will send messages to
 UPDATE-SERVICE anytime it needs Guix channels to be updated, at most every
 POLLING-PERIOD seconds."
   (let ((channel (make-channel)))
-    (spawn-fiber (jobset-monitor channel spec update-service
+    (spawn-fiber (jobset-monitor channel spec
+                                 #:update-service update-service
+                                 #:evaluator evaluator
                                  #:polling-period polling-period))
     channel))
 
-(define* (jobset-registry channel update-service
-                         #:key (polling-period 60))
+(define* (jobset-registry channel
+                         #:key (polling-period 60)
+                         update-service evaluator)
   (lambda ()
     (spawn-fiber
      (lambda ()
@@ -853,7 +898,10 @@ POLLING-PERIOD seconds."
         (`(register ,spec)
          (match (vhash-assq (specification-name spec) registry)
            (#f
-            (let ((monitor (spawn-jobset-monitor spec update-service
+            (let ((monitor (spawn-jobset-monitor spec
+                                                 #:update-service
+                                                 update-service
+                                                 #:evaluator evaluator
                                                  #:polling-period
                                                  polling-period))
                   (name (specification-name spec)))
@@ -865,12 +913,14 @@ POLLING-PERIOD seconds."
                       (specification-name spec))
             (loop registry))))))))
 
-(define* (spawn-jobset-registry update-service
-                                #:key (polling-period 60))
+(define* (spawn-jobset-registry #:key (polling-period 60)
+                                 update-service evaluator)
   "Spawn a jobset registry.  In turn, the registry creates a new jobset
 monitoring actor for each 'register' message it receives."
   (let ((channel (make-channel)))
-    (spawn-fiber (jobset-registry channel update-service
+    (spawn-fiber (jobset-registry channel
+                                  #:update-service update-service
+                                  #:evaluator evaluator
                                   #:polling-period polling-period))
     channel))
 
diff --git a/src/cuirass/scripts/register.scm b/src/cuirass/scripts/register.scm
index 81755b1..70d54e2 100644
--- a/src/cuirass/scripts/register.scm
+++ b/src/cuirass/scripts/register.scm
@@ -205,6 +205,8 @@
                (if one-shot?
                    (leave (G_ "'--one-shot' is currently unimplemented~%"))
                    (let ((exit-channel (make-channel))
+                         (evaluator (spawn-jobset-evaluator
+                                     #:max-parallel-evaluations threads))
                          (update-service (spawn-channel-update-service)))
                      (clear-build-queue)
 
@@ -225,7 +227,8 @@
 
                      ;; Spawn one monitoring actor for each jobset.
                      (let ((registry (spawn-jobset-registry
-                                      update-service
+                                      #:update-service update-service
+                                      #:evaluator evaluator
                                       #:polling-period interval)))
                        ;; Spawn the bridge through which other 'cuirass'
                        ;; processes, such as 'cuirass web', may talk to the



reply via email to

[Prev in Thread] Current Thread [Next in Thread]