guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Ludovic Courtès
Date: Wed, 13 Sep 2023 13:05:39 -0400 (EDT)

branch: wip-actors
commit 0ddd43fd3098f85396fd9ff506c08d1499c6a139
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Wed Sep 13 12:48:12 2023 +0200

    base: Implement ‘update-jobset’.
    
    * src/cuirass/base.scm (jobset-monitor): Thread SPEC through the loop.
    Introduce ‘perform-update’.  Implement ‘update-spec’ message handling.
    (jobset-registry): Handle ‘update’ messages.
    (update-jobset): New procedure.
---
 src/cuirass/base.scm | 119 +++++++++++++++++++++++++++++++--------------------
 1 file changed, 73 insertions(+), 46 deletions(-)

diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 932af4d..97a7ddf 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -77,8 +77,11 @@
             spawn-channel-update-service
             spawn-jobset-evaluator
             spawn-jobset-registry
+
             lookup-jobset
             register-jobset
+            update-jobset
+
             evaluation-log-file
             latest-checkouts
 
@@ -810,56 +813,68 @@ concurrently."
 (define* (jobset-monitor channel spec
                          #:key (polling-period 60)
                          update-service evaluator)
-  (define period
-    (if (> (specification-period spec) 0)
-        (specification-period spec)
-        polling-period))
-
   (define name (specification-name spec))
-  (define channels (specification-channels spec))
 
   (lambda ()
     (log-info "starting monitor for spec '~a'" name)
-    (let loop ((last-updates '()))
-      (unless (null? last-updates)                ;first time?
-        (match (get-message* channel polling-period 'timeout)
-          ('timeout
-           (log-info "polling jobset '~a' after ~as timeout expiry"
-                     name polling-period))
-          ('trigger
-           (log-info "triggered update of jobset '~a'" name))
-          (message
-           (log-warning "jobset '~a' got bogus message: ~s"
-                        name message))))
-
-      (let* ((timestamp (time-second (current-time time-utc)))
-             (recent? (lambda (time)
-                        (>= time (- timestamp %jobset-trigger-rate-window)))))
-        (define (rate lst)
-          ;; Return the (approximate) trigger rate (triggers per second).
-          (/ (count recent? lst) %jobset-trigger-rate-window 1.))
-
-        ;; Mitigate the risk of a DoS attack by rejecting frequent requests.
-        (if (> (rate last-updates) %jobset-trigger-maximum-rate)
-            (begin
-              (log-warning "trigger rate for jobset '~a' exceeded; skipping"
-                           name)
-              (loop last-updates))
-            (begin
-              (match (let ((reply (make-channel)))
-                       (log-info "fetching channels for spec '~a'" name)
-                       (put-message update-service
-                                    `(fetch ,channels ,reply))
-                       (get-message reply))
-                (#f
-                 (log-warning "failed to fetch channels for '~a'" name))
-                (instances
-                 (log-info "fetched channels for '~a':~{ ~a~}"
-                           name (map channel-name channels))
-                 (put-message evaluator
-                              `(evaluate ,spec ,instances ,timestamp))))
-
-              (loop (cons timestamp (take-while recent? last-updates)))))))))
+    (let loop ((spec spec)
+               (last-updates '()))
+      (define period
+        (if (> (specification-period spec) 0)
+            (specification-period spec)
+            polling-period))
+
+      (define channels
+        (specification-channels spec))
+
+      (define (perform-update)
+        (let* ((timestamp (time-second (current-time time-utc)))
+               (recent? (lambda (time)
+                          (>= time (- timestamp 
%jobset-trigger-rate-window)))))
+          (define (rate lst)
+            ;; Return the (approximate) trigger rate (triggers per second).
+            (/ (count recent? lst) %jobset-trigger-rate-window 1.))
+
+          ;; Mitigate the risk of a DoS attack by rejecting frequent requests.
+          (if (> (rate last-updates) %jobset-trigger-maximum-rate)
+              (begin
+                (log-warning "trigger rate for jobset '~a' exceeded; skipping"
+                             name)
+                (loop spec last-updates))
+              (begin
+                (match (let ((reply (make-channel)))
+                         (log-info "fetching channels for spec '~a'" name)
+                         (put-message update-service
+                                      `(fetch ,channels ,reply))
+                         (get-message reply))
+                  (#f
+                   (log-warning "failed to fetch channels for '~a'" name))
+                  (instances
+                   (log-info "fetched channels for '~a':~{ ~a~}"
+                             name (map channel-name channels))
+                   (put-message evaluator
+                                `(evaluate ,spec ,instances ,timestamp))))
+
+                (loop spec
+                      (cons timestamp (take-while recent? last-updates)))))))
+
+      (if (null? last-updates)                    ;first time?
+          (perform-update)
+          (match (get-message* channel polling-period 'timeout)
+            ('timeout
+             (log-info "polling jobset '~a' after ~as timeout expiry"
+                       name polling-period)
+             (perform-update))
+            ('trigger
+             (log-info "triggered update of jobset '~a'" name)
+             (perform-update))
+            (`(update-spec ,spec)
+             (log-info "updating spec of jobset '~a'" name)
+             (loop spec last-updates))
+            (message
+             (log-warning "jobset '~a' got bogus message: ~s"
+                          name message)
+             (loop spec last-updates)))))))
 
 (define* (spawn-jobset-monitor spec
                                #:key (polling-period 60)
@@ -895,6 +910,14 @@ POLLING-PERIOD seconds."
                         (#f #f)
                         ((_ . actor) actor)))
          (loop registry))
+        (`(update ,spec)
+         (let ((name (string->symbol (specification-name spec))))
+           (match (vhash-assq name registry)
+             (#f
+              (log-error "cannot update non-existent spec '~s'" name))
+             ((_ . monitor)
+              (put-message monitor `(update-spec ,spec)))))
+         (loop registry))
         (`(register ,spec)
          (match (vhash-assq (specification-name spec) registry)
            (#f
@@ -934,3 +957,7 @@ monitoring actor for each 'register' message it receives."
   "Register a new jobset of SPEC.  REGISTRY is the channel returned by
 'spawn-jobset-registry'."
   (put-message registry `(register ,spec)))
+
+(define* (update-jobset registry spec)
+  "Update SPEC, so far known under FORMER-NAME, in REGISTRY."
+  (put-message registry `(update ,spec)))



reply via email to

[Prev in Thread] Current Thread [Next in Thread]