guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Ludovic Courtès
Date: Sat, 2 Sep 2023 16:51:37 -0400 (EDT)

branch: wip-actors
commit e3d80aa67133dba3c7c4beab1d8c740901ba9552
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Fri Sep 1 19:26:08 2023 +0200

    base: Split jobset evaluation into several actors.
    
    * src/cuirass/base.scm (process-specs): Remove.
    (channel-update-service, spawn-channel-update-service)
    (jobset-monitor, spawn-jobset-monitor)
    (jobset-registry, spawn-jobset-registry)
    (lookup-jobset, register-jobset): New procedures.
    * src/cuirass/http.scm (url-handler): Add FIXMEs.
    * src/cuirass/scripts/register.scm (cuirass-register): Remove 'build'
    fiber and call 'spawn-jobset-registry' instead.  Error out on
    'one-shot?'.
---
 src/cuirass/base.scm             | 275 +++++++++++++++++++++++++++------------
 src/cuirass/http.scm             |   6 +
 src/cuirass/scripts/register.scm |  17 +--
 3 files changed, 203 insertions(+), 95 deletions(-)

diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 075c49d..657ec00 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -52,6 +52,7 @@
   #:use-module (ice-9 atomic)
   #:use-module (ice-9 ftw)
   #:use-module (ice-9 threads)
+  #:use-module (ice-9 vlist)
   #:use-module (srfi srfi-1)
   #:use-module (srfi srfi-11)
   #:use-module (srfi srfi-19)
@@ -73,7 +74,10 @@
             restart-builds
             build-packages
             prepare-git
-            process-specs
+            spawn-channel-update-service
+            spawn-jobset-registry
+            lookup-jobset
+            register-jobset
             evaluation-log-file
             latest-checkouts
 
@@ -652,87 +656,190 @@ specification."
 (define exception-with-kind-and-args?
   (exception-predicate &exception-with-kind-and-args))
 
-(define (process-specs jobspecs)
-  "Evaluate and build JOBSPECS and store results in the database."
-  (define (new-eval? spec)
-    (let ((name (specification-name spec))
-          (period (specification-period spec)))
-      (or (= period 0)
-          (let ((time
-                 (db-get-time-since-previous-eval name)))
-            (cond
-             ((not time) #t)
-             ((> time period) #t)
-             (else #f))))))
-
-  (define (process spec)
+(define (channel-update-service channel)
+  "Return a thunk (an actor) that reads messages on CHANNEL and is responsible
+to update Git checkouts, effectively serializing all Git operations."
+  ;; Note: All Git operations are serialized when in fact it would be enough
+  ;; to serialize operations with the same URL (because they are cached in the
+  ;; same directory).
+  (define (fetch store channels)
+    (let/ec return
+      (with-exception-handler
+          (lambda (exception)
+            (if (exception-with-kind-and-args? exception)
+                (match (exception-kind exception)
+                  ('git-error
+                   (log-error "Git error while fetching channels from~{ ~a~}: 
~a"
+                              (map channel-url channels)
+                              (git-error-message
+                               (first (exception-args exception)))))
+                  ('system-error
+                   (log-error "while processing '~a': ~s"
+                              (strerror
+                               (system-error-errno
+                                (cons 'system-error
+                                      (exception-args exception))))))
+                  (kind
+                   (log-error "uncaught '~a' exception: ~s"
+                              kind (exception-args exception))))
+                (log-error "uncaught exception: ~s" exception))
+            (return #f))
+        (lambda ()
+          (non-blocking
+           (latest-channel-instances* store channels))))))
+
+  (lambda ()
     (with-store store
-      (let* ((name (specification-name spec))
-             (timestamp (time-second (current-time time-utc)))
-             (channels (specification-channels spec))
-             (instances (non-blocking
-                         (log-info "fetching channels for spec '~a'" name)
-                         (latest-channel-instances* store channels)))
-             (new-channels (let ((channels (map channel-instance-channel
-                                                instances)))
-                             (log-info "fetched channels for '~a':~{ ~a~}"
-                                       name (map channel-name channels))
-                             channels))
-             (new-spec (specification
-                        (inherit spec)
-                        (channels new-channels))) ;include possible channel
-                                                  ;dependencies.
-             (checkouttime (time-second (current-time time-utc)))
-             (eval-id (db-add-evaluation name instances
-                                         #:timestamp timestamp
-                                         #:checkouttime checkouttime)))
-        (when eval-id
-          (spawn-fiber
-           (lambda ()
-             (guard (c ((evaluation-error? c)
-                        (log-error "failed to evaluate spec '~a'; see ~a"
-                                   (evaluation-error-spec-name c)
-                                   (evaluation-log-file
-                                    (evaluation-error-id c)))
-                        #f))
-               (log-info "evaluating spec '~a'" name)
-
-               ;; The LATEST-CHANNEL-INSTANCES procedure may return channel
-               ;; dependencies that are not declared in the initial
-               ;; specification channels.  Update the given SPEC to take
-               ;; them into account.
-               (db-add-or-update-specification new-spec)
-               (evaluate spec eval-id)
-               (db-set-evaluation-time eval-id)
-               (build-packages store eval-id))))
-
-          ;; 'spawn-fiber' returns zero values but we need one.
-          *unspecified*))))
-
-  (for-each (lambda (spec)
-              ;; Catch Git errors, which might be transient, and keep going.
-              (let/ec return
-                (with-exception-handler
-                    (lambda (exception)
-                      (if (exception-with-kind-and-args? exception)
-                          (match (exception-kind exception)
-                            ('git-error
-                             (log-error "Git error while fetching inputs of 
'~a': ~a"
-                                        (specification-name spec)
-                                        (git-error-message
-                                         (first (exception-args exception)))))
-                            ('system-error
-                             (log-error "while processing '~a': ~s"
-                                        (strerror
-                                         (system-error-errno
-                                          (cons 'system-error
-                                                (exception-args exception))))))
-                            (kind
-                             (log-error "uncaught '~a' exception: ~s"
-                                        kind (exception-args exception))))
-                          (log-error "uncaught exception: ~s" exception))
-                      (return #f))
-                  (lambda ()
-                    (and (new-eval? spec)
-                         (process spec))))))
-            jobspecs))
+      (let loop ()
+        (match (get-message channel)
+          (`(fetch ,channels ,reply)
+           (log-info "fetching channels:~{ '~a'~}"
+                     (map channel-name channels))
+           (let ((result (fetch store channels)))
+             (if result
+                 (log-info "pulled commits~{ ~a~}"
+                           (zip (map (compose channel-name
+                                              channel-instance-channel)
+                                     result)
+                                (map channel-instance-commit result)))
+                 (log-info "failed to fetch channels~{ '~a'~}"
+                           (map channel-name channels)))
+             (put-message reply result))
+           (loop)))))))
+
+(define (spawn-channel-update-service)
+  "Spawn an actor responsible for fetching the latest revisions of a set of 
Guix
+channels, and return its communication channel."
+  (let ((channel (make-channel)))
+    (spawn-fiber (channel-update-service channel))
+    channel))
+
+(define* (jobset-monitor channel                  ;currently unused
+                         spec update-service
+                         #:key (polling-period 60))
+  (define period
+    (if (> (specification-period spec) 0)
+        (specification-period spec)
+        polling-period))
+
+  (define name (specification-name spec))
+  (define channels (specification-channels spec))
+
+  (lambda ()
+    (log-info "starting monitor for spec '~a'" name)
+    (let loop ()
+      (let ((timestamp (time-second (current-time time-utc))))
+        (match (let ((reply (make-channel)))
+                 (log-info "fetching channels for spec '~a'" name)
+                 (put-message update-service
+                              `(fetch ,channels ,reply))
+                 (get-message reply))
+          (#f
+           (log-warning "failed to fetch channels for '~a'" name))
+          (instances
+           (log-info "fetched channels for '~a':~{ ~a~}"
+                     name (map channel-name channels))
+           (let* ((channels (map channel-instance-channel instances))
+                  (new-spec (specification
+                             (inherit spec)
+                             ;; Include possible channel dependencies
+                             (channels channels)))
+                  (checkouttime (time-second (current-time time-utc)))
+                  (eval-id (db-add-evaluation name instances
+                                              #:timestamp timestamp
+                                              #:checkouttime checkouttime)))
+
+             (when eval-id
+               (spawn-fiber
+                (lambda ()
+                  ;; TODO: Move this to an evaluation actor that limits
+                  ;; parallelism.
+                  (guard (c ((evaluation-error? c)
+                             (log-error "failed to evaluate spec '~a'; see ~a"
+                                        (evaluation-error-spec-name c)
+                                        (evaluation-log-file
+                                         (evaluation-error-id c)))
+                             #f))
+                    (log-info "evaluating spec '~a'" name)
+
+                    ;; The LATEST-CHANNEL-INSTANCES procedure may return 
channel
+                    ;; dependencies that are not declared in the initial
+                    ;; specification channels.  Update the given SPEC to take
+                    ;; them into account.
+                    (db-add-or-update-specification new-spec)
+                    (evaluate spec eval-id)
+                    (db-set-evaluation-time eval-id)
+                    (with-store/non-blocking store
+                      (build-packages store eval-id)))))
+
+               ;; 'spawn-fiber' returns zero values but we need one.
+               *unspecified*))))
+
+        (log-info "polling '~a' channels in ~a seconds" name period)
+        (sleep period)
+        (loop)))))
+
+(define* (spawn-jobset-monitor spec update-service
+                               #:key (polling-period 60))
+  "Spawn an actor responsible for monitoring the jobset corresponding to SPEC,
+a <specification> record, and return it.  The actor will send messages to
+UPDATE-SERVICE anytime it needs Guix channels to be updated, at most every
+POLLING-PERIOD seconds."
+  (let ((channel (make-channel)))
+    (spawn-fiber (jobset-monitor channel spec update-service
+                                 #:polling-period polling-period))
+    channel))
+
+(define* (jobset-registry channel update-service
+                         #:key (polling-period 60))
+  (lambda ()
+    (spawn-fiber
+     (lambda ()
+       (let ((specs (db-get-specifications)))
+         (log-info "registering ~a jobsets" (length specs))
+         (for-each (lambda (spec)
+                     (register-jobset channel spec))
+                   specs))))
+
+    (let loop ((registry vlist-null))
+      (match (get-message channel)
+        (`(lookup ,jobset ,reply)
+         (put-message reply
+                      (match (vhash-assq jobset registry)
+                        (#f #f)
+                        ((_ . actor) actor)))
+         (loop registry))
+        (`(register ,spec)
+         (match (vhash-assq (specification-name spec) registry)
+           (#f
+            (let ((monitor (spawn-jobset-monitor spec update-service
+                                                 #:polling-period
+                                                 polling-period))
+                  (name (specification-name spec)))
+              (log-info "registering new jobset '~a'" name)
+              (loop (vhash-consq (string->symbol name) monitor
+                                 registry))))
+           ((_ . monitor)
+            (log-info "jobset '~a' was already registered"
+                      (specification-name spec))
+            (loop registry))))))))
+
+(define* (spawn-jobset-registry update-service
+                                #:key (polling-period 60))
+  "Spawn a jobset registry.  In turn, the registry creates a new jobset
+monitoring actor for each 'register' message it receives."
+  (let ((channel (make-channel)))
+    (spawn-fiber (jobset-registry channel update-service
+                                  #:polling-period polling-period))
+    channel))
+
+(define* (lookup-jobset registry jobset)
+  "Return the monitor of JOBSET, a specification name (symbol)."
+  (let ((reply (make-channel)))
+    (put-message registry `(lookup ,jobset ,reply))
+    (get-message reply)))
+
+(define (register-jobset registry spec)
+  "Register a new jobset of SPEC.  REGISTRY is the channel returned by
+'spawn-jobset-registry'."
+  (put-message registry `(register ,spec)))
diff --git a/src/cuirass/http.scm b/src/cuirass/http.scm
index 2350be2..92d187e 100644
--- a/src/cuirass/http.scm
+++ b/src/cuirass/http.scm
@@ -648,6 +648,10 @@ passed, only display JOBS targeting this SYSTEM."
             #:code 400)
            (begin
              (db-add-or-update-specification spec)
+             ;; FIXME: Notify the jobset registry in the 'cuirass register'
+             ;; process.
+             ;;
+             ;; (register-jobset jobset-registry spec)
              (respond
               (build-response #:code 302
                               #:headers
@@ -663,6 +667,8 @@ passed, only display JOBS targeting this SYSTEM."
        ;; XXX: It is not possible yet to edit build outputs and notifications
        ;; using the web interface.  Use the outputs and notifications from the
        ;; existing specification.
+
+       ;; FIXME: Notify the jobset registry in the 'cuirass register' process.
        (db-add-or-update-specification
         (specification
          (inherit spec)
diff --git a/src/cuirass/scripts/register.scm b/src/cuirass/scripts/register.scm
index 09488a1..0373e5f 100644
--- a/src/cuirass/scripts/register.scm
+++ b/src/cuirass/scripts/register.scm
@@ -126,8 +126,9 @@
                  (and paramfile (read-parameters paramfile))
 
                (if one-shot?
-                   (process-specs (db-get-specifications))
-                   (let ((exit-channel (make-channel)))
+                   (leave (G_ "'--one-shot' is currently unimplemented~%"))
+                   (let ((exit-channel (make-channel))
+                         (update-service (spawn-channel-update-service)))
                      (clear-build-queue)
 
                      ;; If Cuirass was stopped during an evaluation,
@@ -145,15 +146,9 @@
                        (lambda ()
                          (restart-builds))))
 
-                     (spawn-fiber
-                      (essential-task
-                       'build exit-channel
-                       (lambda ()
-                         (while #t
-                           (process-specs (db-get-specifications))
-                           (log-info
-                            "next evaluation in ~a seconds" interval)
-                           (sleep interval)))))
+                     ;; Spawn one monitoring actor for each jobset.
+                     (spawn-jobset-registry update-service
+                                            #:polling-period interval)
 
                      (spawn-fiber
                       (essential-task



reply via email to

[Prev in Thread] Current Thread [Next in Thread]