guix-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[no subject]


From: Ludovic Courtès
Date: Sat, 6 Jul 2024 19:10:26 -0400 (EDT)

branch: main
commit fa2ad7109a11a21c6fecb6a8bb07765ee5e21135
Author: Ludovic Courtès <ludo@gnu.org>
AuthorDate: Sun Jul 7 01:01:59 2024 +0200

    remote: ‘receive-message’ takes a timeout.
    
    * src/cuirass/remote.scm (&timeout-error): New error condition type.
    (wait-until-port-readable): New procedure.
    (receive-message): Use it instead of ‘current-read-waiter’.
---
 src/cuirass/remote.scm | 27 ++++++++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git a/src/cuirass/remote.scm b/src/cuirass/remote.scm
index c17ab12..1f7b0ba 100644
--- a/src/cuirass/remote.scm
+++ b/src/cuirass/remote.scm
@@ -44,7 +44,10 @@
   #:use-module (ice-9 suspendable-ports)
   #:autoload   (ice-9 threads) (current-processor-count)
   #:use-module (fibers)
+  #:use-module (fibers operations)
   #:use-module (fibers scheduler)
+  #:use-module (fibers timers)
+  #:use-module (fibers io-wakeup)
   #:export (worker
             worker?
             worker-name
@@ -93,6 +96,7 @@
             invalid-message-error?
             invalid-message-parts
             invalid-message-sender-address
+            timeout-error?
 
             remote-server-service-type))
 
@@ -451,7 +455,23 @@ the message."
   (message-parts  invalid-message-parts)
   (sender-address invalid-message-sender-address))
 
-(define* (receive-message socket #:key router?)
+;; Condition raised upon timeout.
+(define-condition-type &timeout-error &error
+  timeout-error?)
+
+(define* (wait-until-port-readable port #:optional timeout)
+  "Wait until PORT is readable.  If TIMEOUT is true, raise to '&timeout-error'
+after TIMEOUT seconds have expired if PORT still isn't readable."
+  (perform-operation
+   (if timeout
+       (choice-operation (wait-until-port-readable-operation port)
+                         (wrap-operation (sleep-operation timeout)
+                                         (lambda ()
+                                           (raise (condition
+                                                   (&timeout-error))))))
+       (wait-until-port-readable-operation port))))
+
+(define* (receive-message socket #:key router? timeout)
   "Read an sexp from SOCKET, a ZMQ socket, and return it.  Return the
 unspecified value when reading a message without payload.
 
@@ -460,7 +480,8 @@ prefix (the identity of the peer, as a bytevector), and 
return three values:
 the payload, the peer's identity (a bytevector), and the peer address.
 
 Raise to '&invalid-message-error' when receiving a message with unexpected
-parts."
+parts.  If TIMEOUT is true, raise to '&timeout-error' if nothing was received
+within TIMEOUT seconds."
   (let ((port (zmq-socket->port socket)))
     (let wait ()
       ;; Events are edge-triggered so before waiting, check whether there are
@@ -468,7 +489,7 @@ parts."
       ;; <https://lists.zeromq.org/pipermail/zeromq-dev/2016-May/030349.html>.
       (when (zero? (logand ZMQ_POLLIN
                            (zmq-get-socket-option socket ZMQ_EVENTS)))
-        ((current-read-waiter) port)
+        (wait-until-port-readable port timeout)
 
         ;; ZMQ_POLLIN might still be clear after epoll(2) has returned.
         ;; Per <http://api.zeromq.org/master:zmq-getsockopt>, "applications



reply via email to

[Prev in Thread] Current Thread [Next in Thread]