[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[no subject]
From: |
Ludovic Courtès |
Date: |
Sat, 6 Jul 2024 19:10:26 -0400 (EDT) |
branch: main
commit fa2ad7109a11a21c6fecb6a8bb07765ee5e21135
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Sun Jul 7 01:01:59 2024 +0200
remote: ‘receive-message’ takes a timeout.
* src/cuirass/remote.scm (&timeout-error): New error condition type.
(wait-until-port-readable): New procedure.
(receive-message): Use it instead of ‘current-read-waiter’.
---
src/cuirass/remote.scm | 27 ++++++++++++++++++++++++---
1 file changed, 24 insertions(+), 3 deletions(-)
diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index c17ab12..1f7b0ba 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -44,7 +44,10 @@
#:use-module (ice-9 suspendable-ports)
#:autoload (ice-9 threads) (current-processor-count)
#:use-module (fibers)
+ #:use-module (fibers operations)
#:use-module (fibers scheduler)
+ #:use-module (fibers timers)
+ #:use-module (fibers io-wakeup)
#:export (worker
worker?
worker-name
@@ -93,6 +96,7 @@
invalid-message-error?
invalid-message-parts
invalid-message-sender-address
+ timeout-error?
remote-server-service-type))
@@ -451,7 +455,23 @@ the message."
(message-parts invalid-message-parts)
(sender-address invalid-message-sender-address))
-(define* (receive-message socket #:key router?)
+;; Condition raised upon timeout.
+(define-condition-type &timeout-error &error
+ timeout-error?)
+
+(define* (wait-until-port-readable port #:optional timeout)
+ "Wait until PORT is readable. If TIMEOUT is true, raise to '&timeout-error'
+after TIMEOUT seconds have expired if PORT still isn't readable."
+ (perform-operation
+ (if timeout
+ (choice-operation (wait-until-port-readable-operation port)
+ (wrap-operation (sleep-operation timeout)
+ (lambda ()
+ (raise (condition
+ (&timeout-error))))))
+ (wait-until-port-readable-operation port))))
+
+(define* (receive-message socket #:key router? timeout)
"Read an sexp from SOCKET, a ZMQ socket, and return it. Return the
unspecified value when reading a message without payload.
@@ -460,7 +480,8 @@ prefix (the identity of the peer, as a bytevector), and
return three values:
the payload, the peer's identity (a bytevector), and the peer address.
Raise to '&invalid-message-error' when receiving a message with unexpected
-parts."
+parts. If TIMEOUT is true, raise to '&timeout-error' if nothing was received
+within TIMEOUT seconds."
(let ((port (zmq-socket->port socket)))
(let wait ()
;; Events are edge-triggered so before waiting, check whether there are
@@ -468,7 +489,7 @@ parts."
;; <https://lists.zeromq.org/pipermail/zeromq-dev/2016-May/030349.html>.
(when (zero? (logand ZMQ_POLLIN
(zmq-get-socket-option socket ZMQ_EVENTS)))
- ((current-read-waiter) port)
+ (wait-until-port-readable port timeout)
;; ZMQ_POLLIN might still be clear after epoll(2) has returned.
;; Per <http://api.zeromq.org/master:zmq-getsockopt>, "applications
- main updated (2365ba7 -> fdb6bdf), Ludovic Courtès, 2024/07/06
- [no subject], Ludovic Courtès, 2024/07/06
- [no subject], Ludovic Courtès, 2024/07/06
- [no subject], Ludovic Courtès, 2024/07/06
- [no subject],
Ludovic Courtès <=
- [no subject], Ludovic Courtès, 2024/07/06
- [no subject], Ludovic Courtès, 2024/07/06
- [no subject], Ludovic Courtès, 2024/07/06
- [no subject], Ludovic Courtès, 2024/07/06
- [no subject], Ludovic Courtès, 2024/07/06