guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Ludovic Courtès
Date: Fri, 30 Jun 2023 18:12:49 -0400 (EDT)

branch: master
commit 445198e2a0afcbfb30c2dd178fd9e093480d5718
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Fri Jun 30 23:59:53 2023 +0200

    remote: Simplify interface to send and receive messages.
    
    This hides serialization/deserialization, assembly of message parts, and
    the actual send/receive operation behind 'send-message' and
    'receive-message'.
    
    * src/cuirass/remote.scm (zmq-remote-address)
    (zmq-message-string, zmq-read-message): Remove.
    (send-message, receive-message): New procedures.
    * src/cuirass/remote.scm (build-request-message):
    (no-build-message, build-started-message)
    (build-failed-message, build-succeeded-message)
    (worker-ping, worker-ready-message)
    (worker-request-work-message)
    (worker-request-info-message, server-info-message): Remove 'format'
    call and return an sexp instead.
    * src/cuirass/scripts/remote-server.scm (read-worker-exp):
    Add #:peer-address.  Change 'msg' to 'sexp'.
    (need-fetching?): Remove call to 'zmq-read-message'.  Remove
    inappropriate use of 'else' keyword.
    (run-fetch): Remove call to 'zmq-read-message'.  Use 'receive-message'
    instead of 'zmq-message-receive*' & co.
    (zmq-start-proxy): Use 'receive-message' and 'send-message' instead of
    'zmq-message-receive*', 'zmq-message-send' & co.  Pass #:peer-address to
    'read-worker-exp'.
    * src/cuirass/scripts/remote-worker.scm (run-command): Remove call to
    'zmq-read-message'.
    (spawn-worker-ping)[ping]: Use 'send-message'.
    (start-worker): Use 'send-message' and 'receive-message' instead of
    the whole shebang.
---
 src/cuirass/remote.scm                | 88 ++++++++++++++++++++++-------------
 src/cuirass/scripts/remote-server.scm | 61 +++++++++---------------
 src/cuirass/scripts/remote-worker.scm | 54 +++++++++------------
 3 files changed, 99 insertions(+), 104 deletions(-)

diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index 2193235..6124707 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -70,8 +70,6 @@
             send-log
 
             zmq-poll*
-            zmq-message-receive*
-            zmq-empty-delimiter
 
             build-request-message
             no-build-message
@@ -83,9 +81,9 @@
             worker-request-work-message
             worker-request-info-message
             server-info-message
-            zmq-remote-address
-            zmq-message-string
-            zmq-read-message
+
+            send-message
+            receive-message
 
             remote-server-service-type))
 
@@ -385,20 +383,46 @@ retries a call to PROC."
 (define zmq-message-receive*
   (EINTR-safe zmq-message-receive))
 
-(define (zmq-remote-address message)
-  (zmq-message-gets message "Peer-Address"))
-
-(define (zmq-message-string message)
-  (bv->string
-   (zmq-message-content message)))
-
-(define (zmq-read-message msg)
-  (call-with-input-string msg read))
-
 (define (zmq-empty-delimiter)
   "Return an empty ZMQ delimiter used to format message envelopes."
   (make-bytevector 0))
 
+(define* (send-message socket sexp
+                       #:key recipient)
+  "Send SEXP over SOCKET, a ZMQ socket.  When RECIPIENT is true, assume SOCKET
+is a ROUTER socket and use RECIPIENT, a bytevector, as the routing prefix of
+the message."
+  (let ((payload (list (zmq-empty-delimiter)
+                       (string->bv (object->string sexp)))))
+    (zmq-send-msg-parts-bytevector socket
+                                   (if recipient
+                                       (cons recipient payload)
+                                       payload))))
+
+(define* (receive-message socket #:key router?)
+  "Read an sexp from SOCKET, a ZMQ socket, and return it.  Return the
+unspecified value when reading a message without payload.
+
+When ROUTER? is true, assume messages received start with a routing
+prefix (the identity of the peer, as a bytevector), and return three values:
+the payload, the peer's identity (a bytevector), and the peer address."
+  (if router?
+      (match (zmq-message-receive* socket)
+        ((sender (= zmq-message-size 0) data)
+         (values (call-with-input-string (bv->string
+                                          (zmq-message-content data))
+                   read)
+                 (zmq-message-content sender)
+                 (zmq-message-gets data "Peer-Address")))
+        ((sender #vu8())
+         (values *unspecified* sender)))
+      (match (zmq-get-msg-parts-bytevector socket '())
+        ((#vu8() data)
+         (call-with-input-string (bv->string data)
+           read))
+        ((#vu8())
+         *unspecified*))))
+
 ;; ZMQ Messages.
 (define* (build-request-message drv
                                 #:key
@@ -408,50 +432,50 @@ retries a call to PROC."
                                 timestamp
                                 system)
   "Return a message requesting the build of DRV for SYSTEM."
-  (format #f "~s" `(build (drv ,drv)
-                          (priority ,priority)
-                          (timeout ,timeout)
-                          (max-silent ,max-silent)
-                          (timestamp ,timestamp)
-                          (system ,system))))
+  `(build (drv ,drv)
+          (priority ,priority)
+          (timeout ,timeout)
+          (max-silent ,max-silent)
+          (timestamp ,timestamp)
+          (system ,system)))
 
 (define (no-build-message)
   "Return a message that indicates that no builds are available."
-  (format #f "~s" `(no-build)))
+  `(no-build))
 
 (define (build-started-message drv worker)
   "Return a message that indicates that the build of DRV has started."
-  (format #f "~s" `(build-started (drv ,drv) (worker ,worker))))
+  `(build-started (drv ,drv) (worker ,worker)))
 
 (define* (build-failed-message drv url #:optional log)
   "Return a message that indicates that the build of DRV has failed."
-  (format #f "~s" `(build-failed (drv ,drv) (url ,url) (log ,log))))
+  `(build-failed (drv ,drv) (url ,url) (log ,log)))
 
 (define* (build-succeeded-message drv url #:optional log)
   "Return a message that indicates that the build of DRV is done."
-  (format #f "~s" `(build-succeeded (drv ,drv) (url ,url) (log ,log))))
+  `(build-succeeded (drv ,drv) (url ,url) (log ,log)))
 
 (define (worker-ping worker)
   "Return a message that indicates that WORKER is alive."
-  (format #f "~s" `(worker-ping ,worker)))
+  `(worker-ping ,worker))
 
 (define (worker-ready-message worker)
   "Return a message that indicates that WORKER is ready."
-  (format #f "~s" `(worker-ready ,worker)))
+  `(worker-ready ,worker))
 
 (define (worker-request-work-message name)
   "Return a message that indicates that WORKER is requesting work."
-  (format #f "~s" `(worker-request-work ,name)))
+  `(worker-request-work ,name))
 
 (define (worker-request-info-message)
   "Return a message requesting server information."
-  (format #f "~s" '(worker-request-info)))
+  '(worker-request-info))
 
 (define (server-info-message worker-address log-port publish-port)
   "Return a message containing server information."
-  (format #f "~s" `(server-info (worker-address ,worker-address)
-                                (log-port ,log-port)
-                                (publish-port ,publish-port))))
+  `(server-info (worker-address ,worker-address)
+                (log-port ,log-port)
+                (publish-port ,publish-port)))
 
 (define remote-server-service-type
   "_remote-server._tcp")
diff --git a/src/cuirass/scripts/remote-server.scm 
b/src/cuirass/scripts/remote-server.scm
index 385d5f6..5df0ae2 100644
--- a/src/cuirass/scripts/remote-server.scm
+++ b/src/cuirass/scripts/remote-server.scm
@@ -57,6 +57,7 @@
   #:use-module (srfi srfi-26)
   #:use-module (srfi srfi-34)
   #:use-module (srfi srfi-37)
+  #:use-module (srfi srfi-71)
   #:use-module (ice-9 atomic)
   #:use-module (ice-9 match)
   #:use-module (ice-9 q)
@@ -227,8 +228,8 @@ and store the result inside the BOX."
                         (worker-systems worker))))
            (db-get-pending-build system)))))
 
-(define* (read-worker-exp msg #:key reply-worker)
-  "Read the given MSG sent by a worker.  REPLY-WORKER is a procedure that can
+(define* (read-worker-exp sexp #:key peer-address reply-worker)
+  "Read the given SEXP sent by a worker.  REPLY-WORKER is a procedure that can
 be used to reply to the worker."
   (define (update-worker! base-worker)
     (let* ((worker* (worker
@@ -238,13 +239,12 @@ be used to reply to the worker."
                  (worker-name worker*))
       (db-add-or-update-worker worker*)))
 
-  (match (zmq-read-message
-          (zmq-message-string msg))
+  (match sexp
     (('worker-ready worker)
      (update-worker! worker))
     (('worker-request-info)
      (reply-worker
-      (server-info-message (zmq-remote-address msg) (%log-port) 
(%publish-port))))
+      (server-info-message peer-address (%log-port) (%publish-port))))
     (('worker-request-work name)
      (let ((worker (db-get-worker name)))
        (when (and (%debug) worker)
@@ -358,7 +358,7 @@ at URL."
 (define (need-fetching? message)
   "Return #t if the received MESSAGE implies that some output fetching is
 required and #f otherwise."
-  (match (zmq-read-message message)
+  (match message
     (('build-succeeded ('drv drv) _ ...)
      (when (%debug)
        (log-debug "fetching required for ~a (success)" drv))
@@ -367,7 +367,7 @@ required and #f otherwise."
      (when (%debug)
        (log-debug "fetching required for ~a (fail)" drv))
      #t)
-    (else #f)))
+    (_ #f)))
 
 (define* (run-fetch message)
   "Read MESSAGE and download the corresponding build outputs.  If
@@ -383,7 +383,7 @@ directory."
               (read-derivation-from-file drv))))
       (const '())))
 
-  (match (zmq-read-message message)
+  (match message
     (('build-succeeded ('drv drv) ('url url) _ ...)
      (let ((outputs (build-outputs drv)))
        (log-info "fetching '~a' from ~a" drv url)
@@ -412,11 +412,8 @@ socket."
      (set-thread-name name)
      (let ((socket (zmq-fetch-worker-socket)))
        (let loop ()
-         (match (zmq-message-receive* socket)
-           ((message)
-            (run-fetch (bv->string
-                        (zmq-message-content message)))
-            (atomic-box-fetch-and-dec! %fetch-queue-size)))
+         (run-fetch (receive-message socket))
+         (atomic-box-fetch-and-dec! %fetch-queue-size)
          (loop))))))
 
 
@@ -484,32 +481,18 @@ frontend to the workers connected through the TCP 
backend."
       (let* ((items (zmq-poll* poll-items 1000))
              (start-time (current-time)))
         (when (socket-ready? items build-socket)
-          (match (zmq-message-receive* build-socket)
-            ((worker empty rest)
-             (let* ((command (bv->string (zmq-message-content rest)))
-                    (reply-worker
-                     (lambda (message)
-                       (zmq-message-send-parts
-                        build-socket
-                        (map zmq-msg-init
-                             (list (zmq-message-content worker)
-                                   (zmq-empty-delimiter)
-                                   (string->bv message)))))))
-               (if (need-fetching? command)
-                   (let ((fetch-msg (zmq-msg-init
-                                     (zmq-message-content rest))))
-                     (atomic-box-fetch-and-inc! %fetch-queue-size)
-                     (zmq-message-send fetch-socket fetch-msg))
-                   (read-worker-exp rest
-                                    #:reply-worker reply-worker))))
-            (x
-             (log-error "Unexpected message: ~a." x)
-             (for-each (lambda (msg)
-                         (log-error "~/content: ~a (~a)"
-                                    (zmq-message-content msg)
-                                    (false-if-exception
-                                     (zmq-message-string msg))))
-                       x))))
+          (let* ((command sender sender-address
+                          (receive-message build-socket #:router? #t))
+                 (reply-worker (lambda (message)
+                                 (send-message build-socket message
+                                               #:recipient sender))))
+            (if (need-fetching? command)
+                (begin
+                  (atomic-box-fetch-and-inc! %fetch-queue-size)
+                  (send-message fetch-socket command))
+                (read-worker-exp command
+                                 #:peer-address sender-address
+                                 #:reply-worker reply-worker))))
         (db-remove-unresponsive-workers (%worker-timeout))
         (let ((delta (- (current-time) start-time)))
           (when (> delta %loop-timeout)
diff --git a/src/cuirass/scripts/remote-worker.scm 
b/src/cuirass/scripts/remote-worker.scm
index 01eb943..69fe1e4 100644
--- a/src/cuirass/scripts/remote-worker.scm
+++ b/src/cuirass/scripts/remote-worker.scm
@@ -253,7 +253,7 @@ still be substituted."
                       reply worker)
   "Run COMMAND.  SERVICE-NAME is the name of the build server that sent the
 command.  REPLY is a procedure that can be used to reply to this server."
-  (match (zmq-read-message command)
+  (match command
     (('build ('drv drv)
              ('priority priority)
              ('timeout timeout)
@@ -275,10 +275,8 @@ command.  REPLY is a procedure that can be used to reply 
to this server."
 (define (spawn-worker-ping worker server)
   "Spawn a thread that periodically pings SERVER."
   (define (ping socket)
-    (zmq-send-msg-parts-bytevector
-     socket
-     (list (make-bytevector 0)
-           (string->bv (worker-ping (worker->sexp worker))))))
+    (send-message socket
+                  (worker-ping (worker->sexp worker))))
 
   (call-with-new-thread
    (lambda ()
@@ -304,29 +302,19 @@ command.  REPLY is a procedure that can be used to reply 
to this server."
 and executing them.  The worker can reply on the same socket."
   (define (reply socket)
     (lambda (message)
-      (zmq-send-msg-parts-bytevector
-       socket
-       (list (zmq-empty-delimiter) (string->bv message)))))
+      (send-message socket message)))
 
   (define (ready socket worker)
-    (zmq-send-msg-parts-bytevector
-     socket
-     (list (make-bytevector 0)
-           (string->bv
-            (worker-ready-message (worker->sexp worker))))))
+    (send-message socket
+                  (worker-ready-message (worker->sexp worker))))
 
   (define (request-work socket worker)
     (let ((name (worker-name worker)))
-      (zmq-send-msg-parts-bytevector
-       socket
-       (list (make-bytevector 0)
-             (string->bv (worker-request-work-message name))))))
+      (send-message socket
+                    (worker-request-work-message name))))
 
   (define (request-info socket)
-    (zmq-send-msg-parts-bytevector
-     socket
-     (list (make-bytevector 0)
-           (string->bv (worker-request-info-message)))))
+    (send-message socket (worker-request-info-message)))
 
   (define (read-server-info socket)
     ;; Ignore the boostrap message sent due to ZMQ_PROBE_ROUTER option.
@@ -334,14 +322,12 @@ and executing them.  The worker can reply on the same 
socket."
       ((empty) #f))
 
     (request-info socket)
-    (match (zmq-get-msg-parts-bytevector socket '())
-      ((empty info)
-       (match (zmq-read-message (bv->string info))
-         (('server-info
-           ('worker-address worker-address)
-           ('log-port log-port)
-           ('publish-port publish-port))
-          (list worker-address log-port publish-port))))))
+    (match (receive-message socket)
+      (`(server-info
+         (worker-address ,worker-address)
+         (log-port ,log-port)
+         (publish-port ,publish-port))
+       (list worker-address log-port publish-port))))
 
   (define (server-info->server info serv)
     (match info
@@ -390,12 +376,14 @@ and executing them.  The worker can reply on the same 
socket."
                    (begin
                      (log-info (G_ "~a: request work.") (worker-name wrk))
                      (request-work socket worker)
-                     (match (zmq-get-msg-parts-bytevector socket '())
-                       ((empty)                   ;server reconnect
+                     (match (receive-message socket)
+                       ((? unspecified?)          ;server reconnect
                         (log-info (G_ "~a: received a bootstrap message.")
                                   (worker-name wrk)))
-                       ((empty command)
-                        (run-command (bv->string command) server
+                       (command
+                        (log-debug (G_ "~a: received command: ~s")
+                                   (worker-name wrk) command)
+                        (run-command command server
                                      #:reply (reply socket)
                                      #:worker worker)))))
 



reply via email to

[Prev in Thread] Current Thread [Next in Thread]