[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
branch master updated: remote: Remove address argument.
From: |
Mathieu Othacehe |
Subject: |
branch master updated: remote: Remove address argument. |
Date: |
Fri, 12 Feb 2021 08:39:53 -0500 |
This is an automated email from the git hooks/post-receive script.
mothacehe pushed a commit to branch master
in repository guix-cuirass.
The following commit(s) were added to refs/heads/master by this push:
new 83f33cd remote: Remove address argument.
83f33cd is described below
commit 83f33cdbb43b81b4cc743ff1b4458564ec2675ef
Author: Mathieu Othacehe <othacehe@gnu.org>
AuthorDate: Fri Feb 12 14:39:12 2021 +0100
remote: Remove address argument.
---
src/cuirass/remote-server.scm | 22 ++++++++-------
src/cuirass/remote-worker.scm | 64 +++++++++++++++++++++++--------------------
src/cuirass/remote.scm | 17 ++++++++++--
3 files changed, 61 insertions(+), 42 deletions(-)
diff --git a/src/cuirass/remote-server.scm b/src/cuirass/remote-server.scm
index f01e64f..6c94673 100644
--- a/src/cuirass/remote-server.scm
+++ b/src/cuirass/remote-server.scm
@@ -179,8 +179,8 @@ Start a remote build server.\n"))
((build) build)
(() #f))))))
-(define* (read-worker-exp exp #:key reply-worker)
- "Read the given EXP sent by a worker. REPLY-WORKER is a procedure that can
+(define* (read-worker-exp msg #:key reply-worker)
+ "Read the given MSG sent by a worker. REPLY-WORKER is a procedure that can
be used to reply to the worker."
(define (update-worker! base-worker)
(let* ((worker* (worker
@@ -188,12 +188,13 @@ be used to reply to the worker."
(last-seen (current-time)))))
(db-add-or-update-worker worker*)))
- (match (zmq-read-message exp)
+ (match (zmq-read-message
+ (zmq-message-string msg))
(('worker-ready worker)
(update-worker! worker))
(('worker-request-info)
(reply-worker
- (zmq-server-info (%log-port) (%publish-port))))
+ (zmq-server-info (zmq-remote-address msg) (%log-port) (%publish-port))))
(('worker-request-work name)
(let ((build (pop-build name)))
(if build
@@ -357,18 +358,19 @@ frontend to the workers connected through the TCP
backend."
(let loop ()
(let ((items (zmq-poll* poll-items 1000)))
(when (zmq-socket-ready? items build-socket)
- (match (zmq-get-msg-parts-bytevector build-socket)
+ (match (zmq-message-receive build-socket)
((worker empty rest)
(let ((reply-worker
(lambda (message)
(zmq-send-msg-parts-bytevector
build-socket
- (list worker
+ (list (zmq-message-content worker)
(zmq-empty-delimiter)
- (string->bv message))))))
- (if (need-fetching? (bv->string rest))
- (zmq-send-bytevector fetch-socket rest)
- (read-worker-exp (bv->string rest)
+ (string->bv message)))))
+ (rest-bv (zmq-message-content rest)))
+ (if (need-fetching? (bv->string rest-bv))
+ (zmq-send-bytevector fetch-socket rest-bv)
+ (read-worker-exp rest
#:reply-worker reply-worker))))))
(db-remove-unresponsive-workers (%worker-timeout))
(loop)))))
diff --git a/src/cuirass/remote-worker.scm b/src/cuirass/remote-worker.scm
index 36e9d46..69ccf02 100644
--- a/src/cuirass/remote-worker.scm
+++ b/src/cuirass/remote-worker.scm
@@ -87,9 +87,6 @@ Start a remote build worker.\n"))
(option '(#\V "version") #f #f
(lambda _
(show-version-and-exit "guix publish")))
- (option '(#\a "address") #t #f
- (lambda (opt name arg result)
- (alist-cons 'address arg result)))
(option '(#\w "workers") #t #f
(lambda (opt name arg result)
(alist-cons 'workers (string->number* arg) result)))
@@ -230,7 +227,7 @@ command. REPLY is a procedure that can be used to reply to
this server."
(sleep 60)
(loop))))))
-(define (start-worker worker serv)
+(define (start-worker wrk serv)
"Start a worker thread named NAME, reading commands from the DEALER socket
and executing them. The worker can reply on the same socket."
(define (reply socket)
@@ -239,14 +236,14 @@ and executing them. The worker can reply on the same
socket."
socket
(list (zmq-empty-delimiter) (string->bv message)))))
- (define (ready socket)
+ (define (ready socket worker)
(zmq-send-msg-parts-bytevector
socket
(list (make-bytevector 0)
(string->bv
(zmq-worker-ready-message (worker->sexp worker))))))
- (define (request-work socket)
+ (define (request-work socket worker)
(let ((name (worker-name worker)))
(zmq-send-msg-parts-bytevector
socket
@@ -259,37 +256,54 @@ and executing them. The worker can reply on the same
socket."
(list (make-bytevector 0)
(string->bv (zmq-worker-request-info-message)))))
- (define (read-server-info socket serv)
+ (define (read-server-info socket)
(request-info socket)
(match (zmq-get-msg-parts-bytevector socket '())
((empty info)
(match (zmq-read-message (bv->string info))
(('server-info
+ ('worker-address worker-address)
('log-port log-port)
('publish-port publish-port))
- (let ((url (publish-url (server-address serv)
- publish-port)))
- (server
- (inherit serv)
- (log-port log-port)
- (publish-url url))))))))
+ (list worker-address log-port publish-port))))))
+
+ (define (server-info->server info serv)
+ (match info
+ ((_ log-port publish-port)
+ (let ((url (publish-url (server-address serv)
+ publish-port)))
+ (server
+ (inherit serv)
+ (log-port log-port)
+ (publish-url url))))))
+
+ (define (server-info->worker info w)
+ (match info
+ ((worker-address _ _)
+ (let ((url (local-publish-url worker-address)))
+ (worker
+ (inherit w)
+ (address worker-address)
+ (publish-url url))))))
(match (primitive-fork)
(0
- (set-thread-name (worker-name worker))
+ (set-thread-name (worker-name wrk))
(let* ((socket (zmq-dealer-socket))
(address (server-address serv))
(port (server-port serv))
(endpoint (zmq-backend-endpoint address port)))
(zmq-connect socket endpoint)
- (ready socket)
- (worker-ping worker serv)
- (let ((server* (read-server-info socket serv)))
+ (let* ((info (read-server-info socket))
+ (server (server-info->server info serv))
+ (worker (server-info->worker info wrk)))
+ (ready socket worker)
+ (worker-ping worker server)
(let loop ()
- (request-work socket)
+ (request-work socket worker)
(match (zmq-get-msg-parts-bytevector socket '())
((empty command)
- (run-command (bv->string command) server*
+ (run-command (bv->string command) server
#:reply (reply socket)
#:worker worker)))
(sleep 10)
@@ -343,7 +357,6 @@ exiting."
(lambda (arg result)
(leave (G_ "~A: extraneous argument~%") arg))
%default-options))
- (address (assoc-ref opts 'address))
(workers (assoc-ref opts 'workers))
(publish-port (assoc-ref opts 'publish-port))
(server-address (assoc-ref opts 'server))
@@ -363,18 +376,12 @@ exiting."
#:public-key public-key
#:private-key private-key))
- (when (and server-address (not address))
- (leave (G_ "Address must be set when server is provided.~%")))
-
(if server-address
(for-each
(lambda (n)
- (let* ((publish-url (local-publish-url address))
- (worker (worker
+ (let* ((worker (worker
(name (generate-worker-name))
- (address address)
(machine (gethostname))
- (publish-url publish-url)
(systems systems)))
(addr (string-split server-address #\:))
(server (match addr
@@ -391,8 +398,7 @@ exiting."
((new-service)
(for-each
(lambda (n)
- (let* ((address (or address
- (avahi-service-local-address service)))
+ (let* ((address (avahi-service-local-address service))
(publish-url (local-publish-url address)))
(add-to-worker-pids!
(start-worker (worker
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index c5a27ea..268a643 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -82,6 +82,8 @@
zmq-worker-request-work-message
zmq-worker-request-info-message
zmq-server-info
+ zmq-remote-address
+ zmq-message-string
zmq-read-message
remote-server-service-type))
@@ -95,7 +97,8 @@
worker make-worker
worker?
(name worker-name)
- (address worker-address)
+ (address worker-address
+ (default #f))
(machine worker-machine)
(publish-url worker-publish-url
(default #f))
@@ -400,6 +403,13 @@ retries a call to PROC."
(eq? (poll-item-socket item) socket))
items))
+(define (zmq-remote-address message)
+ (zmq-message-gets message "Peer-Address"))
+
+(define (zmq-message-string message)
+ (bv->string
+ (zmq-message-content message)))
+
(define (zmq-read-message msg)
(call-with-input-string msg read))
@@ -455,9 +465,10 @@ retries a call to PROC."
"Return a message requesting server information."
(format #f "~s" '(worker-request-info)))
-(define (zmq-server-info log-port publish-port)
+(define (zmq-server-info worker-address log-port publish-port)
"Return a message containing server information."
- (format #f "~s" `(server-info (log-port ,log-port)
+ (format #f "~s" `(server-info (worker-address ,worker-address)
+ (log-port ,log-port)
(publish-port ,publish-port))))
(define remote-server-service-type
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- branch master updated: remote: Remove address argument.,
Mathieu Othacehe <=