guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Mathieu Othacehe
Date: Mon, 21 Dec 2020 05:30:42 -0500 (EST)

branch: wip-offload
commit 7ba2dc5f572cf0d820bc0bcb6bfa4cb06aab00da
Author: Mathieu Othacehe <othacehe@gnu.org>
AuthorDate: Mon Dec 21 11:30:16 2020 +0100

    tmp2
---
 bin/cuirass.in                | 179 +++++++++++++++++++++--------------------
 src/cuirass/base.scm          |  44 ----------
 src/cuirass/database.scm      |   1 +
 src/cuirass/remote-server.scm | 181 +++++++++++-------------------------------
 src/cuirass/remote-worker.scm |  14 ++--
 src/cuirass/remote.scm        |  93 +---------------------
 6 files changed, 144 insertions(+), 368 deletions(-)

diff --git a/bin/cuirass.in b/bin/cuirass.in
index 7ba89db..762b6ab 100644
--- a/bin/cuirass.in
+++ b/bin/cuirass.in
@@ -149,96 +149,95 @@ exec ${GUILE:-@GUILE@} --no-auto-compile -e main -s "$0" 
"$@"
            (lambda ()
              (with-database
                (with-queue-writer-worker
-                 (with-build-offload-thread
-                  (and specfile
-                       (let ((new-specs (save-module-excursion
-                                         (lambda ()
-                                           (set-current-module
-                                            (make-user-module '()))
-                                           (primitive-load specfile)))))
-                         (for-each db-add-specification new-specs)))
-
-                  (when queries-file
-                    (log-message "Enable SQL query logging.")
-                    (db-log-queries queries-file))
-
-                  (if one-shot?
-                      (process-specs (db-get-specifications))
-                      (let ((exit-channel (make-channel)))
-                        (start-watchdog)
-                        (if (option-ref opts 'web #f)
-                            (begin
-                              (spawn-fiber
-                               (essential-task
-                                'web exit-channel
-                                (lambda ()
-                                  (run-cuirass-server #:host host
-                                                      #:port port)))
-                               #:parallel? #t)
-
-                              (spawn-fiber
-                               (essential-task
-                                'monitor exit-channel
-                                (lambda ()
-                                  (while #t
-                                    (log-monitoring-stats)
-                                    (sleep 600))))))
-
-                            (begin
-                              (clear-build-queue)
-
-                              ;; If Cuirass was stopped during an evaluation,
-                              ;; abort it. Builds that were not registered
-                              ;; during this evaluation will be registered
-                              ;; during the next evaluation.
-                              (db-abort-pending-evaluations)
-
-                              ;; First off, restart builds that had not
-                              ;; completed or were not even started on a
-                              ;; previous run.
-                              (spawn-fiber
-                               (essential-task
-                                'restart-builds exit-channel
-                                (lambda ()
-                                  (restart-builds))))
-
-                              (spawn-fiber
-                               (essential-task
-                                'build exit-channel
-                                (lambda ()
-                                  (while #t
-                                    (process-specs (db-get-specifications))
-                                    (log-message
-                                     "next evaluation in ~a seconds" interval)
-                                    (sleep interval)))))
-
-                              (when (%build-remote?)
-                                (spawn-fiber
-                                 (essential-task
-                                  'request-workers exit-channel
-                                  (lambda ()
-                                    (while #t
-                                      (request-workers)
-                                      (sleep 60))))))
-
-                              (spawn-fiber
-                               (essential-task
-                                'metrics exit-channel
-                                (lambda ()
-                                  (while #t
-                                    (with-time-logging
-                                     "Metrics update"
-                                     (db-update-metrics))
-                                    (sleep 3600)))))
-
-                              (spawn-fiber
-                               (essential-task
-                                'monitor exit-channel
-                                (lambda ()
-                                  (while #t
-                                    (log-monitoring-stats)
-                                    (sleep 600)))))))
-                        (primitive-exit (get-message exit-channel))))))))
+                 (and specfile
+                      (let ((new-specs (save-module-excursion
+                                        (lambda ()
+                                          (set-current-module
+                                           (make-user-module '()))
+                                          (primitive-load specfile)))))
+                        (for-each db-add-specification new-specs)))
+
+                 (when queries-file
+                   (log-message "Enable SQL query logging.")
+                   (db-log-queries queries-file))
+
+                 (if one-shot?
+                     (process-specs (db-get-specifications))
+                     (let ((exit-channel (make-channel)))
+                       (start-watchdog)
+                       (if (option-ref opts 'web #f)
+                           (begin
+                             (spawn-fiber
+                              (essential-task
+                               'web exit-channel
+                               (lambda ()
+                                 (run-cuirass-server #:host host
+                                                     #:port port)))
+                              #:parallel? #t)
+
+                             (spawn-fiber
+                              (essential-task
+                               'monitor exit-channel
+                               (lambda ()
+                                 (while #t
+                                   (log-monitoring-stats)
+                                   (sleep 600))))))
+
+                           (begin
+                             (clear-build-queue)
+
+                             ;; If Cuirass was stopped during an evaluation,
+                             ;; abort it. Builds that were not registered
+                             ;; during this evaluation will be registered
+                             ;; during the next evaluation.
+                             (db-abort-pending-evaluations)
+
+                             ;; First off, restart builds that had not
+                             ;; completed or were not even started on a
+                             ;; previous run.
+                             (spawn-fiber
+                              (essential-task
+                               'restart-builds exit-channel
+                               (lambda ()
+                                 (restart-builds))))
+
+                             (spawn-fiber
+                              (essential-task
+                               'build exit-channel
+                               (lambda ()
+                                 (while #t
+                                   (process-specs (db-get-specifications))
+                                   (log-message
+                                    "next evaluation in ~a seconds" interval)
+                                   (sleep interval)))))
+
+                             (when (%build-remote?)
+                               (spawn-fiber
+                                (essential-task
+                                 'request-workers exit-channel
+                                 (lambda ()
+                                   (while #t
+                                     (request-workers)
+                                     (sleep 60))))))
+
+                             (spawn-fiber
+                              (essential-task
+                               'metrics exit-channel
+                               (lambda ()
+                                 (while #t
+                                   (with-time-logging
+                                    "Metrics update"
+                                    (db-update-metrics))
+                                   (sleep 3600)))))
+
+                             (spawn-fiber
+                              (essential-task
+                               'monitor exit-channel
+                               (lambda ()
+                                 (while #t
+                                   (log-monitoring-stats)
+                                   (sleep 600)))))))
+                       (primitive-exit (get-message exit-channel)))))))
 
            ;; Most of our code is I/O so preemption doesn't matter much (it
            ;; could help while we're doing SQL requests, for instance, but it
diff --git a/src/cuirass/base.scm b/src/cuirass/base.scm
index 47cdb8b..2a2cbb3 100644
--- a/src/cuirass/base.scm
+++ b/src/cuirass/base.scm
@@ -72,7 +72,6 @@
             prepare-git
             process-specs
             evaluation-log-file
-            with-build-offload-thread
             request-workers
 
             ;; Parameters.
@@ -124,10 +123,6 @@
   ;; Define whether to fall back to building when the substituter fails.
   (make-parameter #f))
 
-(define %build-offload-channel
-  ;; Channel to communicate with the remote build server.
-  (make-parameter #f))
-
 (define %package-cachedir
   ;; Define to location of cache directory of this package.
   (make-parameter (or (getenv "CUIRASS_CACHEDIR")
@@ -454,40 +449,6 @@ Essentially this procedure inverts the 
inversion-of-control that
                   (raise c))
                  (x x)))))))
 
-(define (make-build-offload-thread)
-  "Return a channel used to offload builds by communicating with the remote
-build server in a separate thread.  The spawned thread also polls for build
-events sent by the remote server and calls HANDLE-BUILD-EVENT to register them
-in the database."
-  (let ((channel (make-channel)))
-    (call-with-new-thread
-     (lambda ()
-       (parameterize (((@@ (fibers internal) current-fiber) #f)
-                      (current-read-waiter (lambda (port)
-                                             (port-poll port "r")))
-                      (current-write-waiter (lambda (port)
-                                              (port-poll port "w"))))
-         (remote-build-init!)
-         (let ((socket (remote-build-socket)))
-           (let loop ()
-             (remote-build-poll socket handle-build-event)
-             (match (get-message-with-timeout channel
-                                              #:seconds 0.1
-                                              #:retry? #f)
-               ('workers
-                (remote-send-workers socket))
-               ('timeout #f))
-             (loop))))))
-    channel))
-
-(define-syntax-rule (with-build-offload-thread body ...)
-  (parameterize ((%build-offload-channel
-                  (make-build-offload-thread)))
-    body ...))
-
-(define (request-workers)
-  (put-message (%build-offload-channel) 'workers))
-
 
 ;;;
 ;;; Building packages.
@@ -659,11 +620,6 @@ updating the database accordingly."
            (log-message "build failed: '~a'" drv)
            (db-update-build-status! drv (build-status failed)))
          (log-message "bogus build-failed event for '~a'" drv)))
-    (('workers workers)
-     (db-clear-workers)
-     (for-each (lambda (worker)
-                 (db-add-worker (sexp->worker worker)))
-               workers))
     (('substituter-started item _ ...)
      (log-message "substituter started: '~a'" item))
     (('substituter-succeeded item _ ...)
diff --git a/src/cuirass/database.scm b/src/cuirass/database.scm
index c9e3f64..53f013b 100644
--- a/src/cuirass/database.scm
+++ b/src/cuirass/database.scm
@@ -640,6 +640,7 @@ string."
 (define-enumeration build-status
   ;; Build status as expected by Hydra's API.  Note: the negative values are
   ;; Cuirass' own extensions.
+  (submitted        -3)
   (scheduled        -2)
   (started          -1)
   (succeeded         0)
diff --git a/src/cuirass/remote-server.scm b/src/cuirass/remote-server.scm
index 3ad722b..22a1915 100644
--- a/src/cuirass/remote-server.scm
+++ b/src/cuirass/remote-server.scm
@@ -19,6 +19,7 @@
 (define-module (cuirass remote-server)
   #:use-module (cuirass base)
   #:use-module (cuirass database)
+  #:use-module (cuirass logging)
   #:use-module (cuirass remote)
   #:use-module (gcrypt pk-crypto)
   #:use-module (guix avahi)
@@ -156,49 +157,18 @@ Start a remote build server.\n"))
   ;; Set of connected workers.
   (make-hash-table))
 
-(define %build-queues
-  ;; Builds request queue.
-  (map (lambda (system)
-         (cons system (make-q)))
-       %supported-systems))
-
-(define (find-system-queues systems)
-  "Return the list of build queues for SYSTEMS that are not empty."
-  (filter-map (match-lambda
-                ((system . queue)
-                 (and (member system systems)
-                      (not (q-empty? queue))
-                      queue)))
-              %build-queues))
-
-(define (build-available? name)
-  "Return #t if there is some available work for the worker with the given
-NAME and #f otherwise."
-  (let* ((worker (hash-ref %workers name))
-         (systems (and worker (worker-systems worker)))
-         (queues (if systems
-                     (find-system-queues systems)
-                     '())))
-    (not (null? queues))))
-
-(define (pop-random-build name)
-  "Pop randomly and return a build from all the build queues with available
-work for the worker with the given NAME."
-  (define (random-queue queues)
-    (list-ref queues (random (length queues))))
+(define (pop-build name)
+  (define (random-system systems)
+    (list-ref systems (random (length systems))))
 
   (let* ((worker (hash-ref %workers name))
-         (systems (worker-systems worker))
-         (queues (find-system-queues systems)))
-    (q-pop! (random-queue queues))))
-
-(define (sort-build-queue! queue)
-  (sort! (car queue)
-         (lambda (a b)
-           (if (= (build-priority a) (build-priority b))
-               (> (build-timestamp a) (build-timestamp b))
-               (< (build-priority a) (build-priority b)))))
-  (sync-q! queue))
+         (systems (worker-systems worker)))
+    (match (db-get-builds `((status . scheduled)
+                            (system . ,(random-system systems))
+                            (order . priority+timestamp)
+                            (nr . 1)))
+      ((build) build)
+      (() #f))))
 
 (define (remove-unresponsive-workers!)
   (let ((unresponsive
@@ -214,47 +184,6 @@ work for the worker with the given NAME."
                 (hash-remove! %workers worker))
               unresponsive)))
 
-(define* (read-client-exp client-socket client exp)
-  "Read the given EXP sent by CLIENT."
-  (catch 'system-error
-    (lambda ()
-      (match (zmq-read-message exp)
-        (('build ('drv drv)
-                 ('priority priority)
-                 ('timeout timeout)
-                 ('max-silent max-silent)
-                 ('timestamp timestamp)
-                 ('system system))
-         (let* ((system (or system
-                            (derivation-system
-                             (read-derivation-from-file drv))))
-                (queue (assoc-ref %build-queues system)))
-           ;; Push the derivation to the matching queue according to the
-           ;; targeted system. Also save the client ID in the queue to be able
-           ;; to send it build events later on.
-           (q-push! queue
-                    (build
-                     (client client)
-                     (derivation drv)
-                     (priority priority)
-                     (timeout timeout)
-                     (max-silent max-silent)
-                     (timestamp timestamp)
-                     (system system)))
-           (sort-build-queue! queue)))
-        (('request-workers)
-         (zmq-send-msg-parts-bytevector
-          client-socket
-          (list client
-                (make-bytevector 0)
-                (string->bv
-                 (zmq-workers
-                  (hash-fold (lambda (key value old)
-                               (cons (worker->sexp value) old))
-                             '()
-                             %workers))))))))
-    (const #f)))
-
 (define* (read-worker-exp exp #:key reply-worker)
   "Read the given EXP sent by a worker.  REPLY-WORKER is a procedure that can
 be used to reply to the worker."
@@ -272,21 +201,30 @@ be used to reply to the worker."
                       (lambda (name)
                         (info (G_ "Worker `~a' is ready.~%") name))))
     (('worker-request-work name)
-     (if (build-available? name)
-         (let ((build (pop-random-build name)))
-           (match-record build <build>
-             (client derivation priority timeout max-silent)
+     (let ((build (pk (pop-build name))))
+       (if build
+           (let ((derivation (assq-ref build #:derivation))
+                 (priority (assq-ref build #:priority))
+                 (timeout (assq-ref build #:timeout))
+                 (max-silent (assq-ref build #:max-silent)))
+             (db-update-build-status! derivation (build-status submitted))
              (reply-worker
-              client
               (zmq-build-request-message derivation
                                          #:priority priority
                                          #:timeout timeout
                                          #:max-silent max-silent))))
          (reply-worker
-          (zmq-empty-delimiter)
           (zmq-no-build-message))))
     (('worker-ping worker)
-     (update-workers! worker (const #t)))))
+     (update-workers! worker (const #t))
+     (db-clear-workers)
+     (hash-for-each (lambda (key value)
+                      (db-add-worker value))
+                    %workers))
+    (('build-started ('drv drv) ('worker worker))
+     (log-message "build started: '~a'" drv)
+     (db-update-build-worker! drv worker)
+     (db-update-build-status! drv (build-status started)))))
 
 
 ;;;
@@ -301,7 +239,7 @@ be used to reply to the worker."
 
 (define (zmq-fetch-worker-socket)
   "Return a socket used to communicate with the fetch workers."
-  (let ((socket (zmq-create-socket %zmq-context ZMQ_REP))
+  (let ((socket (zmq-create-socket %zmq-context ZMQ_PULL))
         (endpoint (zmq-fetch-workers-endpoint)))
     (zmq-connect socket endpoint)
     socket))
@@ -442,10 +380,7 @@ required and #f otherwise."
 (define* (run-fetch message)
   "Read MESSAGE and download the corresponding build outputs.  If
 %CACHE-DIRECTORY is set, download the matching NAR and NARINFO files in this
-directory.  If %ADD-TO-STORE? is set, add the build outputs to the store.
-
-REPLY is procedure used to forward MESSAGE to the client once the build
-outputs are downloaded."
+directory.  If %ADD-TO-STORE? is set, add the build outputs to the store."
   (define (build-outputs drv)
     (catch 'system-error
       (lambda ()
@@ -459,7 +394,6 @@ outputs are downloaded."
   (let ((log-directory (%log-directory)))
     (match (zmq-read-message message)
       (('build-succeeded ('drv drv) ('url url) _ ...)
-       (info (G_ "Fetching derivation ~a build outputs.~%") drv)
        (let ((outputs (build-outputs drv))
              (log-file
               (and log-directory
@@ -468,6 +402,7 @@ outputs are downloaded."
            (add-to-store outputs url))
          (when (%cache-directory)
            (download-nar (%cache-directory) outputs url))
+         (log-message "build succeeded: '~a'" drv)
          (set-build-successful! drv log-file)))
       (('build-failed ('drv drv) ('url url) _ ...)
        (let ((log-file
@@ -490,8 +425,8 @@ socket."
      (let ((socket (zmq-fetch-worker-socket)))
        (let loop ()
          (match (zmq-get-msg-parts-bytevector socket)
-           ((client empty rest)
-            (run-fetch (bv->string rest))))
+           ((message)
+            (run-fetch (bv->string message))))
          (loop))))))
 
 
@@ -515,18 +450,13 @@ frontend to the workers connected through the TCP 
backend."
             (eq? (poll-item-socket item) socket))
           items))
 
-  (let* ((client-socket
-          (zmq-create-socket %zmq-context ZMQ_ROUTER))
-         (build-socket
+  (let* ((build-socket
           (zmq-create-socket %zmq-context ZMQ_ROUTER))
          (fetch-socket
-          (zmq-create-socket %zmq-context ZMQ_DEALER))
+          (zmq-create-socket %zmq-context ZMQ_PUSH))
          (poll-items (list
-                      (poll-item client-socket ZMQ_POLLIN)
-                      (poll-item build-socket ZMQ_POLLIN)
-                      (poll-item fetch-socket ZMQ_POLLIN))))
+                      (poll-item build-socket ZMQ_POLLIN))))
 
-    (zmq-bind-socket client-socket zmq-frontend-endpoint)
     (zmq-bind-socket build-socket (zmq-backend-endpoint backend-port))
     (zmq-bind-socket fetch-socket (zmq-fetch-workers-endpoint))
 
@@ -534,40 +464,20 @@ frontend to the workers connected through the TCP 
backend."
     ;; frontend messages before forwarding them to the backend.
     (let loop ()
       (let ((items (zmq-poll* poll-items 1000)))
-        ;; client -> remote-server.
-        (when (zmq-socket-ready? items client-socket)
-          (match (zmq-get-msg-parts-bytevector client-socket)
-            ((client empty rest)
-             (read-client-exp client-socket
-                              client
-                              (bv->string rest)))))
-        ;; build-worker -> remote-server.
         (when (zmq-socket-ready? items build-socket)
           (match (zmq-get-msg-parts-bytevector build-socket)
             ((worker empty rest)
              (let ((reply-worker
-                    (lambda (client message)
+                    (lambda (message)
                       (zmq-send-msg-parts-bytevector
                        build-socket
                        (list worker
                              (zmq-empty-delimiter)
-                             client
-                             (zmq-empty-delimiter)
                              (string->bv message))))))
-               (read-worker-exp (bv->string rest)
-                                #:reply-worker reply-worker)))
-            ((worker empty client empty rest)
-             (let ((message (list client (zmq-empty-delimiter) rest)))
                (if (need-fetching? (bv->string rest))
-                   (zmq-send-msg-parts-bytevector fetch-socket
-                                                  (cons empty message))
-                   (zmq-send-msg-parts-bytevector client-socket message))))))
-        ;; fetch-worker -> remote-server.
-        (when (zmq-socket-ready? items fetch-socket)
-          (match (zmq-get-msg-parts-bytevector fetch-socket)
-            ((empty . rest)
-             (zmq-send-msg-parts-bytevector client-socket rest))))
-
+                   (zmq-send-bytevector fetch-socket rest)
+                   (read-worker-exp (bv->string rest)
+                                    #:reply-worker reply-worker))))))
         (remove-unresponsive-workers!)
         (loop)))))
 
@@ -641,6 +551,7 @@ exiting."
 
       (parameterize ((%add-to-store? add-to-store?)
                      (%cache-directory cache)
+                     (%db-writer-queue-size 1)
                      (%log-directory log-directory)
                      (%package-database database)
                      (%public-key public-key)
@@ -665,12 +576,12 @@ exiting."
           #:txt (list (string-append "publish="
                                      (number->string publish-port)))))
 
-        (for-each (lambda (number)
-                    (start-fetch-worker
-                     (string-append "fetch-worker-" (number->string number))))
-                  (iota 4))
-
         (with-database
           (with-queue-writer-worker
-            (zmq-init!)
+            (for-each (lambda (number)
+                        (start-fetch-worker
+                         (string-append "fetch-worker-"
+                                        (number->string number))))
+                      (iota 4))
+
             (zmq-start-proxy backend-port)))))))
diff --git a/src/cuirass/remote-worker.scm b/src/cuirass/remote-worker.scm
index 7bc1471..baee0e4 100644
--- a/src/cuirass/remote-worker.scm
+++ b/src/cuirass/remote-worker.scm
@@ -228,18 +228,17 @@ command.  REPLY is a procedure that can be used to reply 
to this server."
        (zmq-connect socket endpoint)
        (let loop ()
          (ping socket)
-         (sleep 10)
+         (sleep 60)
          (loop))))))
 
 (define (start-worker worker server)
   "Start a worker thread named NAME, reading commands from the DEALER socket
 and executing them.  The worker can reply on the same socket."
-  (define (reply socket client)
+  (define (reply socket)
     (lambda (message)
       (zmq-send-msg-parts-bytevector
        socket
-       (list (zmq-empty-delimiter) client
-             (zmq-empty-delimiter) (string->bv message)))))
+       (list (zmq-empty-delimiter) (string->bv message)))))
 
   (define (ready socket)
     (zmq-send-msg-parts-bytevector
@@ -262,18 +261,17 @@ and executing them.  The worker can reply on the same 
socket."
             (address (server-address server))
             (port (server-port server))
             (endpoint (zmq-backend-endpoint address port)))
-       (zmq-init!)
        (zmq-connect socket endpoint)
        (ready socket)
        (worker-ping worker server)
        (let loop ()
          (request-work socket)
          (match (zmq-get-msg-parts-bytevector socket '())
-           ((empty client empty command)
+           ((empty command)
             (run-command (bv->string command) server
-                         #:reply (reply socket client)
+                         #:reply (reply socket)
                          #:worker worker)))
-         (sleep 1)
+         (sleep 10)
          (loop))))))
 
 
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index 768fce8..690cac9 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -53,22 +53,9 @@
             publish-url
             avahi-service->server
 
-            build
-            <build>
-            builds?
-            build-client
-            build-derivation
-            build-priority
-            build-timeout
-            build-max-silent
-            build-timestamp
-            build-system
-
             publish-server
             set-build-options*
 
-            zmq-init!
-            zmq-frontend-endpoint
             zmq-poll*
             zmq-socket-ready?
             zmq-empty-delimiter
@@ -81,15 +68,9 @@
             zmq-worker-ping
             zmq-worker-ready-message
             zmq-worker-request-work-message
-            zmq-request-workers
-            zmq-workers
             zmq-read-message
 
-            remote-server-service-type
-            remote-build-init!
-            remote-build-socket
-            remote-send-workers
-            remote-build-poll))
+            remote-server-service-type))
 
 
 ;;;
@@ -162,7 +143,7 @@
   (string-append (gethostname) "-" (random-string 4)))
 
 (define %worker-timeout
-  (make-parameter 60))
+  (make-parameter 120))
 
 
 ;;;
@@ -214,22 +195,6 @@ given NAME."
 
 
 ;;;
-;;; Build.
-;;;
-
-(define-record-type* <build>
-  build make-build
-  build?
-  (client         build-client)
-  (derivation     build-derivation)
-  (priority       build-priority)
-  (timeout        build-timeout)
-  (max-silent     build-max-silent)
-  (timestamp      build-timestamp)
-  (system         build-system))
-
-
-;;;
 ;;; Store publishing.
 ;;;
 
@@ -286,12 +251,6 @@ PRIVATE-KEY to sign narinfos."
 (define %zmq-context
   (zmq-create-context))
 
-(define (zmq-init!)
-  (zmq-set-buffer-size (* 4096 16)))
-
-(define zmq-frontend-endpoint
-  "ipc://@remote-build-socket")
-
 (define (EINTR-safe proc)
   "Return a variant of PROC that catches EINTR 'zmq-error' exceptions and
 retries a call to PROC."
@@ -368,53 +327,5 @@ retries a call to PROC."
   "Return a message that indicates that WORKER is requesting work."
   (format #f "~s" `(worker-request-work ,name)))
 
-(define (zmq-request-workers)
-  "Return a message requesting the WORKERS list."
-  (format #f "~s" `(request-workers)))
-
-(define (zmq-workers workers)
-  "Return a message containing the WORKERS list."
-  (format #f "~s" `(workers ,workers)))
-
-
-;;;
-;;; Remote builds.
-;;;
-
 (define remote-server-service-type
   "_remote-server._tcp")
-
-(define (remote-build-init!)
-  (zmq-init!))
-
-(define (remote-build-socket)
-  "Return a socket used to communicate with the remote build server."
-  (let ((socket (zmq-create-socket %zmq-context ZMQ_DEALER))
-        (endpoint zmq-frontend-endpoint))
-    (zmq-connect socket endpoint)
-    socket))
-
-(define (remote-send-workers socket)
-  "Request the workers list on SOCKET."
-  (zmq-send-msg-parts-bytevector
-   socket
-   (list (make-bytevector 0)
-         (string->bv (zmq-request-workers)))))
-
-(define* (remote-build-poll socket event-proc
-                            #:key
-                            (timeout 1000))
-  "Poll SOCKET for messages and call EVENT-PROC each time a build event is
-received, return if no event occured for TIMEOUT milliseconds."
-  (define (parse-result result)
-    (match (zmq-read-message result)
-      (('workers workers)
-       (event-proc (list 'workers workers)))))
-
-  (let* ((poll-items (list
-                      (poll-item socket ZMQ_POLLIN)))
-         (items (zmq-poll* poll-items timeout)))
-    (when (zmq-socket-ready? items socket)
-      (match (zmq-get-msg-parts-bytevector socket '())
-        ((empty result)
-         (parse-result (bv->string result)))))))



reply via email to

[Prev in Thread] Current Thread [Next in Thread]