guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

branch master updated: remote: Remove address argument.


From: Mathieu Othacehe
Subject: branch master updated: remote: Remove address argument.
Date: Fri, 12 Feb 2021 08:39:53 -0500

This is an automated email from the git hooks/post-receive script.

mothacehe pushed a commit to branch master
in repository guix-cuirass.

The following commit(s) were added to refs/heads/master by this push:
     new 83f33cd  remote: Remove address argument.
83f33cd is described below

commit 83f33cdbb43b81b4cc743ff1b4458564ec2675ef
Author: Mathieu Othacehe <othacehe@gnu.org>
AuthorDate: Fri Feb 12 14:39:12 2021 +0100

    remote: Remove address argument.
---
 src/cuirass/remote-server.scm | 22 ++++++++-------
 src/cuirass/remote-worker.scm | 64 +++++++++++++++++++++++--------------------
 src/cuirass/remote.scm        | 17 ++++++++++--
 3 files changed, 61 insertions(+), 42 deletions(-)

diff --git a/src/cuirass/remote-server.scm b/src/cuirass/remote-server.scm
index f01e64f..6c94673 100644
--- a/src/cuirass/remote-server.scm
+++ b/src/cuirass/remote-server.scm
@@ -179,8 +179,8 @@ Start a remote build server.\n"))
              ((build) build)
              (() #f))))))
 
-(define* (read-worker-exp exp #:key reply-worker)
-  "Read the given EXP sent by a worker.  REPLY-WORKER is a procedure that can
+(define* (read-worker-exp msg #:key reply-worker)
+  "Read the given MSG sent by a worker.  REPLY-WORKER is a procedure that can
 be used to reply to the worker."
   (define (update-worker! base-worker)
     (let* ((worker* (worker
@@ -188,12 +188,13 @@ be used to reply to the worker."
                      (last-seen (current-time)))))
       (db-add-or-update-worker worker*)))
 
-  (match (zmq-read-message exp)
+  (match (zmq-read-message
+          (zmq-message-string msg))
     (('worker-ready worker)
      (update-worker! worker))
     (('worker-request-info)
      (reply-worker
-      (zmq-server-info (%log-port) (%publish-port))))
+      (zmq-server-info (zmq-remote-address msg) (%log-port) (%publish-port))))
     (('worker-request-work name)
      (let ((build (pop-build name)))
        (if build
@@ -357,18 +358,19 @@ frontend to the workers connected through the TCP 
backend."
     (let loop ()
       (let ((items (zmq-poll* poll-items 1000)))
         (when (zmq-socket-ready? items build-socket)
-          (match (zmq-get-msg-parts-bytevector build-socket)
+          (match (zmq-message-receive build-socket)
             ((worker empty rest)
              (let ((reply-worker
                     (lambda (message)
                       (zmq-send-msg-parts-bytevector
                        build-socket
-                       (list worker
+                       (list (zmq-message-content worker)
                              (zmq-empty-delimiter)
-                             (string->bv message))))))
-               (if (need-fetching? (bv->string rest))
-                   (zmq-send-bytevector fetch-socket rest)
-                   (read-worker-exp (bv->string rest)
+                             (string->bv message)))))
+                   (rest-bv (zmq-message-content rest)))
+               (if (need-fetching? (bv->string rest-bv))
+                   (zmq-send-bytevector fetch-socket rest-bv)
+                   (read-worker-exp rest
                                     #:reply-worker reply-worker))))))
         (db-remove-unresponsive-workers (%worker-timeout))
         (loop)))))
diff --git a/src/cuirass/remote-worker.scm b/src/cuirass/remote-worker.scm
index 36e9d46..69ccf02 100644
--- a/src/cuirass/remote-worker.scm
+++ b/src/cuirass/remote-worker.scm
@@ -87,9 +87,6 @@ Start a remote build worker.\n"))
         (option '(#\V "version") #f #f
                 (lambda _
                   (show-version-and-exit "guix publish")))
-        (option '(#\a "address") #t #f
-                (lambda (opt name arg result)
-                  (alist-cons 'address arg result)))
         (option '(#\w "workers") #t #f
                 (lambda (opt name arg result)
                   (alist-cons 'workers (string->number* arg) result)))
@@ -230,7 +227,7 @@ command.  REPLY is a procedure that can be used to reply to 
this server."
          (sleep 60)
          (loop))))))
 
-(define (start-worker worker serv)
+(define (start-worker wrk serv)
   "Start a worker thread named NAME, reading commands from the DEALER socket
 and executing them.  The worker can reply on the same socket."
   (define (reply socket)
@@ -239,14 +236,14 @@ and executing them.  The worker can reply on the same 
socket."
        socket
        (list (zmq-empty-delimiter) (string->bv message)))))
 
-  (define (ready socket)
+  (define (ready socket worker)
     (zmq-send-msg-parts-bytevector
      socket
      (list (make-bytevector 0)
            (string->bv
             (zmq-worker-ready-message (worker->sexp worker))))))
 
-  (define (request-work socket)
+  (define (request-work socket worker)
     (let ((name (worker-name worker)))
       (zmq-send-msg-parts-bytevector
        socket
@@ -259,37 +256,54 @@ and executing them.  The worker can reply on the same 
socket."
      (list (make-bytevector 0)
            (string->bv (zmq-worker-request-info-message)))))
 
-  (define (read-server-info socket serv)
+  (define (read-server-info socket)
     (request-info socket)
     (match (zmq-get-msg-parts-bytevector socket '())
       ((empty info)
        (match (zmq-read-message (bv->string info))
          (('server-info
+           ('worker-address worker-address)
            ('log-port log-port)
            ('publish-port publish-port))
-          (let ((url (publish-url (server-address serv)
-                                  publish-port)))
-            (server
-             (inherit serv)
-             (log-port log-port)
-             (publish-url url))))))))
+          (list worker-address log-port publish-port))))))
+
+  (define (server-info->server info serv)
+    (match info
+      ((_ log-port publish-port)
+       (let ((url (publish-url (server-address serv)
+                               publish-port)))
+         (server
+          (inherit serv)
+          (log-port log-port)
+          (publish-url url))))))
+
+  (define (server-info->worker info w)
+    (match info
+      ((worker-address _ _)
+       (let ((url (local-publish-url worker-address)))
+         (worker
+          (inherit w)
+          (address worker-address)
+          (publish-url url))))))
 
   (match (primitive-fork)
     (0
-     (set-thread-name (worker-name worker))
+     (set-thread-name (worker-name wrk))
      (let* ((socket (zmq-dealer-socket))
             (address (server-address serv))
             (port (server-port serv))
             (endpoint (zmq-backend-endpoint address port)))
        (zmq-connect socket endpoint)
-       (ready socket)
-       (worker-ping worker serv)
-       (let ((server* (read-server-info socket serv)))
+       (let* ((info (read-server-info socket))
+              (server (server-info->server info serv))
+              (worker (server-info->worker info wrk)))
+         (ready socket worker)
+         (worker-ping worker server)
          (let loop ()
-           (request-work socket)
+           (request-work socket worker)
            (match (zmq-get-msg-parts-bytevector socket '())
              ((empty command)
-              (run-command (bv->string command) server*
+              (run-command (bv->string command) server
                            #:reply (reply socket)
                            #:worker worker)))
            (sleep 10)
@@ -343,7 +357,6 @@ exiting."
                              (lambda (arg result)
                                (leave (G_ "~A: extraneous argument~%") arg))
                              %default-options))
-           (address (assoc-ref opts 'address))
            (workers (assoc-ref opts 'workers))
            (publish-port (assoc-ref opts 'publish-port))
            (server-address (assoc-ref opts 'server))
@@ -363,18 +376,12 @@ exiting."
                        #:public-key public-key
                        #:private-key private-key))
 
-      (when (and server-address (not address))
-        (leave (G_ "Address must be set when server is provided.~%")))
-
       (if server-address
           (for-each
            (lambda (n)
-             (let* ((publish-url (local-publish-url address))
-                    (worker (worker
+             (let* ((worker (worker
                              (name (generate-worker-name))
-                             (address address)
                              (machine (gethostname))
-                             (publish-url publish-url)
                              (systems systems)))
                     (addr (string-split server-address #\:))
                     (server (match addr
@@ -391,8 +398,7 @@ exiting."
                ((new-service)
                 (for-each
                  (lambda (n)
-                   (let* ((address (or address
-                                       (avahi-service-local-address service)))
+                   (let* ((address (avahi-service-local-address service))
                           (publish-url (local-publish-url address)))
                      (add-to-worker-pids!
                       (start-worker (worker
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index c5a27ea..268a643 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -82,6 +82,8 @@
             zmq-worker-request-work-message
             zmq-worker-request-info-message
             zmq-server-info
+            zmq-remote-address
+            zmq-message-string
             zmq-read-message
 
             remote-server-service-type))
@@ -95,7 +97,8 @@
   worker make-worker
   worker?
   (name           worker-name)
-  (address        worker-address)
+  (address        worker-address
+                  (default #f))
   (machine        worker-machine)
   (publish-url    worker-publish-url
                   (default #f))
@@ -400,6 +403,13 @@ retries a call to PROC."
           (eq? (poll-item-socket item) socket))
         items))
 
+(define (zmq-remote-address message)
+  (zmq-message-gets message "Peer-Address"))
+
+(define (zmq-message-string message)
+  (bv->string
+   (zmq-message-content message)))
+
 (define (zmq-read-message msg)
   (call-with-input-string msg read))
 
@@ -455,9 +465,10 @@ retries a call to PROC."
   "Return a message requesting server information."
   (format #f "~s" '(worker-request-info)))
 
-(define (zmq-server-info log-port publish-port)
+(define (zmq-server-info worker-address log-port publish-port)
   "Return a message containing server information."
-  (format #f "~s" `(server-info (log-port ,log-port)
+  (format #f "~s" `(server-info (worker-address ,worker-address)
+                                (log-port ,log-port)
                                 (publish-port ,publish-port))))
 
 (define remote-server-service-type



reply via email to

[Prev in Thread] Current Thread [Next in Thread]