gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 18/42: cadet/client: Rewrite with run-loop.


From: gnunet
Subject: [gnunet-scheme] 18/42: cadet/client: Rewrite with run-loop.
Date: Sat, 10 Sep 2022 19:08:11 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit e7f9505a7f08a7458bf792ab75ad4065ad35711d
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Fri Sep 9 16:18:15 2022 +0200

    cadet/client: Rewrite with run-loop.
    
    Reduces duplication, should increase test coverage.
    
    * gnu/gnunet/cadet/client.scm
    (connect)[loop]: Set remaining arguments and use 'run-loop'.
    (reconnect): Split into ...
    (make-message-handlers,control-message-handlers): ... these new
    procedures, adjusting for 'run-loop'.
---
 gnu/gnunet/cadet/client.scm | 400 ++++++++++++++++++++++----------------------
 1 file changed, 196 insertions(+), 204 deletions(-)

diff --git a/gnu/gnunet/cadet/client.scm b/gnu/gnunet/cadet/client.scm
index 04ceb35..f599637 100644
--- a/gnu/gnunet/cadet/client.scm
+++ b/gnu/gnunet/cadet/client.scm
@@ -165,12 +165,16 @@
       (define server (%make-server))
       (define loop
        (apply make-loop
+              #:make-message-handlers make-message-handlers
+              #:make-error-handler* make-error-handler*/loop
+              #:control-message-handler control-message-handler
+              #:service-name "cadet"
               #:configuration config
               #:connected connected
               #:disconnected disconnected
               #:spawn spawn
               (server->loop-arguments server)))
-      (spawn (lambda () (reconnect loop empty-bbtree)))
+      (spawn (lambda () (run-loop loop empty-bbtree 
%minimum-local-channel-id)))
       server)
 
     ;; channel-number->channel-map:
@@ -180,57 +184,48 @@
     ;;   has been closed.
     ;;
     ;;   TODO: GC problems, split in external and internal parts
-    (define (reconnect loop channel-number->channel-map)
-      (define loop-operation
-       (choice-operation
-        (get-operation (loop:control-channel loop))
-        (wrap-operation (collect-lost-and-found-operation (loop:lost-and-found 
loop))
-                        (lambda (lost) (cons 'lost lost)))))
-      (define handlers
-       (message-handlers
-        (message-handler
-         (type (symbol-value message-type msg:cadet:local:data))
-         ((interpose exp) exp)
-         ((well-formed? slice) #true)
-         ((handle! slice)
-          (let^ ((! cadet-data-length (sizeof /:msg:cadet:local:data '()))
-                 (! header (slice-slice slice 0 cadet-data-length))
-                 (! tail (slice-slice slice cadet-data-length))
-                 (! channel-number
-                    (read% /:msg:cadet:local:data '(channel-number) header))
-                 (! channel
-                    (maybe-ask* (loop:terminal-condition loop)
-                                (loop:control-channel loop) 'channel
-                                channel-number))
-                 (? (not channel)
-                    ???))
-                ;; TODO: while the message is being processed, other messages
-                ;; cannot be accepted -- document this limitation.
-                (inject-message! (channel-message-queue channel) tail))))
-        (message-handler
-         (type (symbol-value message-type msg:cadet:local:acknowledgement))
-         ((interpose exp) exp)
-         ((well-formed? slice)
-          (= (slice-length slice) (sizeof /:msg:cadet:local:acknowledgement 
'())))
-         ((handle! slice)
-          ;; The slice needs to be read here (and not in 'control'), as it 
might
-          ;; later be reused for something different.
-          (let ((channel-number (analyse-local-acknowledgement slice)))
-            (maybe-send-control-message!*
-             (loop:terminal-condition loop)
-             (loop:control-channel loop)
-             'acknowledgement
-             channel-number))))))
-      (define error-handler (make-error-handler*/loop loop))
-      (define mq (connect/fibers
-                 (loop:configuration loop) "cadet" handlers error-handler
-                 #:spawn (loop:spawner loop)))
+
+    (define (make-message-handlers loop . _)
+      (message-handlers
+       (message-handler
+       (type (symbol-value message-type msg:cadet:local:data))
+       ((interpose exp) exp)
+       ((well-formed? slice) #true)
+       ((handle! slice)
+        (let^ ((! cadet-data-length (sizeof /:msg:cadet:local:data '()))
+               (! header (slice-slice slice 0 cadet-data-length))
+               (! tail (slice-slice slice cadet-data-length))
+               (! channel-number
+                  (read% /:msg:cadet:local:data '(channel-number) header))
+               (! channel
+                  (maybe-ask* (loop:terminal-condition loop)
+                              (loop:control-channel loop) 'channel
+                              channel-number))
+               (? (not channel)
+                  ???))
+              ;; TODO: while the message is being processed, other messages
+              ;; cannot be accepted -- document this limitation.
+              (inject-message! (channel-message-queue channel) tail))))
+       (message-handler
+       (type (symbol-value message-type msg:cadet:local:acknowledgement))
+       ((interpose exp) exp)
+       ((well-formed? slice)
+        (= (slice-length slice)
+           (sizeof /:msg:cadet:local:acknowledgement '())))
+       ((handle! slice)
+        ;; The slice needs to be read here (and not in 'control'), as it might
+        ;; later be reused for something different.
+        (let ((channel-number (analyse-local-acknowledgement slice)))
+          (maybe-send-control-message!*
+           (loop:terminal-condition loop) (loop:control-channel loop)
+           'acknowledgement channel-number))))))
+
+    (define (control-message-handler message control control* message-queue 
loop
+                                    channel-number->channel-map
+                                    next-free-channel-number)
+      "The main event loop"
       (define (k/reconnect! channel-number->channel-map)
-       (reconnect loop channel-number->channel-map))
-      (define (control loop channel-number->channel-map 
next-free-channel-number)
-       "The main event loop."
-       (control* loop channel-number->channel-map next-free-channel-number
-                 (perform-operation loop-operation)))
+       (run-loop loop channel-number->channel-map next-free-channel-number))
       (define (close-if-possible! channel)
        ;; Pre-conditions:
        ;;  * the channel is open
@@ -238,166 +233,163 @@
        ;;
        ;; TODO: untested.
        (when (= (message-queue-length (channel-message-queue channel)) 0)
-         (send-message! mq
+         (send-message! message-queue
                         (construct-local-channel-destroy
                          (channel-channel-number channel)))
          ;; We don't need the envelope.
          (values)))
-      (define (control* loop channel-number->channel-map 
next-free-channel-number
-                       message)
-       (define (continue)
-         (control loop channel-number->channel-map next-free-channel-number))
-       (define (continue* message)
-         (control* loop channel-number->channel-map next-free-channel-number
-                   message))
-       ;; TODO: what about closed channels?
-       (define (send-channel-stuff! channel)
-         ;; Send messages one-by-one, keeping in mind that we might not be able
-         ;; to send all messages to the service at once, only 
'channel-allow-send'
-         ;; messages can be sent and this decreases by sending messages.
-         ;;
-         ;; TODO: use priority information, somehow when cancelling a message
-         ;; cancel the corresponding message to be sent to the CADET service 
when
-         ;; there is still time, zero-copy networking.
-         (let/ec
-          stop
-          (define (stop-if-exhausted)
-            ;; The mutation 'replace > by >=' is caught by
-            ;; "data is not sent before an acknowledgement"
-            ;; in form of a hang.
-            (if (> (channel-allow-send channel) 0)
-                ;; (unless ...) and (when ...) can return *unspecified*,
-                ;; but (gnu gnunet mq) expects no return values. Detected
-                ;; by the "data is properly sent in response to 
acknowledgements, in-order"
-                ;; test.
-                (values)
-                (stop)))
-          ;; Tested by ‘data is properly sent in response to acknowledgements, 
in-order’
-          ;; -- it catches the mutation 'replace 1 by zero' (as a hang)
-          (define (decrement!)
-            (set-channel-allow-send! channel
-                                     (- (channel-allow-send channel) 1)))
-          ;; It is important to check that a message can be sent before
-          ;; send! is called, otherwise the message will be removed from
-          ;; the message queue and be forgotten without being ever sent.
-          ;;
-          ;; Tested by ‘data is not sent before an acknowledgement’ -- it 
catches
-          ;; the mutation 'remove this line' (as a hang).
-          (stop-if-exhausted)
-          (define (send! envelope)
-            (attempt-irrevocable-sent!
-             envelope
-             ((go message priority)
-              ;; The mutation ‘don't call send-message!’ is caught by
-              ;; ‘data is properly sent in response to acknowledgements, 
in-order’
-              ;; as a hang and an exception.
-              ;;
-              ;; The mutation 'swap send-message!' and 'decrement!' is 
uncaught,
-              ;; but theoretically harmless.
-              (send-message! mq ; TODO: maybe get rid of the message queue 
limit in (gnu gnunet mq)
-                             (construct-local-data
-                              (channel-channel-number channel) ; TODO: 
multiple channels is untested
-                              0 ;; TODO: relation between priority and 
priority-preference?
-                              message)) ; TODO: sending the _right_ message is 
untested
-              ;; The mutation ‘don't call decrement!' is caught by
-              ;; ‘data is properly sent in response to acknowledgements, 
in-order’,
-              ;; as a hang with an exception.
-              (decrement!))
-             ((cancelled) (values)) ; TODO: untested
-             ((already-sent) (error "tried to send an envelope twice 
(CADET)")))
-            ;; Exit once nothing can be sent anymore (TODO check if
-            ;; make-one-by-one-sender allows non-local exits).
-            ;;
-            ;; The mutation 'don't call it' is caught by
+      (define (continue)
+       (control loop channel-number->channel-map next-free-channel-number))
+      (define (continue* message)
+       (control* message loop channel-number->channel-map
+                 next-free-channel-number))
+      ;; TODO: what about closed channels?
+      (define (send-channel-stuff! channel)
+       ;; Send messages one-by-one, keeping in mind that we might not be able
+       ;; to send all messages to the service at once, only 
'channel-allow-send'
+       ;; messages can be sent and this decreases by sending messages.
+       ;;
+       ;; TODO: use priority information, somehow when cancelling a message
+       ;; cancel the corresponding message to be sent to the CADET service when
+       ;; there is still time, zero-copy networking.
+       (let/ec
+        stop
+        (define (stop-if-exhausted)
+          ;; The mutation 'replace > by >=' is caught by
+          ;; "data is not sent before an acknowledgement"
+          ;; in form of a hang.
+          (if (> (channel-allow-send channel) 0)
+              ;; (unless ...) and (when ...) can return *unspecified*,
+              ;; but (gnu gnunet mq) expects no return values. Detected
+              ;; by the "data is properly sent in response to 
acknowledgements, in-order"
+              ;; test.
+              (values)
+              (stop)))
+        ;; Tested by ‘data is properly sent in response to acknowledgements, 
in-order’
+        ;; -- it catches the mutation 'replace 1 by zero' (as a hang)
+        (define (decrement!)
+          (set-channel-allow-send! channel
+                                   (- (channel-allow-send channel) 1)))
+        ;; It is important to check that a message can be sent before
+        ;; send! is called, otherwise the message will be removed from
+        ;; the message queue and be forgotten without being ever sent.
+        ;;
+        ;; Tested by ‘data is not sent before an acknowledgement’ -- it catches
+        ;; the mutation 'remove this line' (as a hang).
+        (stop-if-exhausted)
+        (define (send! envelope)
+          (attempt-irrevocable-sent!
+           envelope
+           ((go message priority)
+            ;; The mutation ‘don't call send-message!’ is caught by
             ;; ‘data is properly sent in response to acknowledgements, 
in-order’
-            ;; as a hang and an exception?
+            ;; as a hang and an exception.
             ;;
-            ;; The mutation 'duplicate it' is uncaught, but theoretically 
harmless
-            ;; albeit inefficient.
-            (stop-if-exhausted))
-          ((make-one-by-one-sender send!) (channel-message-queue channel)))
-         (when (channel-desire-close? channel)
-           (close-if-possible! channel)))
-       (match message
-         (('open-channel! channel)
-          (let^ ((! channel-number next-free-channel-number)
-                 ;; TODO: handle overflow, and respect bounds
-                 (! next-free-channel-number (+ 1 next-free-channel-number))
-                 (_ (set-channel-channel-number! channel channel-number))
-                 ;; Keep track of the new <channel> object; it will be required
-                 ;; later by 'acknowledgement'.
-                 (! channel-number->channel-map
-                    (bbtree-set channel-number->channel-map channel-number
-                                channel)))
-                (send-local-channel-create! mq channel)
-                (control loop channel-number->channel-map 
next-free-channel-number)))
-         (('close-channel! channel)
-          ;; 'close-channel!' can only be sent after the <channel> object
-          ;; was returned by the procedure 'open-channel!', because only
-          ;; then the channel becomes available. This procedure 
(synchronuously)
-          ;; sends a 'open-channel!' message and messages are processed by
-          ;; the control loop in-order, so the channel has already been opened.
+            ;; The mutation 'swap send-message!' and 'decrement!' is uncaught,
+            ;; but theoretically harmless.
+            ;; TODO: maybe get rid of the message queue limit in (gnu gnunet 
mq)
+            (send-message! message-queue
+                           (construct-local-data
+                            (channel-channel-number channel) ; TODO: multiple 
channels is untested
+                            0 ;; TODO: relation between priority and 
priority-preference?
+                            message)) ; TODO: sending the _right_ message is 
untested
+            ;; The mutation ‘don't call decrement!' is caught by
+            ;; ‘data is properly sent in response to acknowledgements, 
in-order’,
+            ;; as a hang with an exception.
+            (decrement!))
+           ((cancelled) (values)) ; TODO: untested
+           ((already-sent) (error "tried to send an envelope twice (CADET)")))
+          ;; Exit once nothing can be sent anymore (TODO check if
+          ;; make-one-by-one-sender allows non-local exits).
           ;;
-          ;; The only remaining states are: the channel is open / the channel
-          ;; is closed.
-          (let^ ((! channel-number (channel-channel-number channel))
-                 (? (channel-desire-close? channel)
-                    ;; It has already been requested to close to channel
-                    ;; (maybe it even has already been closed).  This is fine,
-                    ;; as 'close-channel!' is idempotent.  Nothing to do!
-                    ;; TODO: untested.
-                    (continue)))
-                (set-channel-desire-close? channel #true)
-                ;; This procedure will take care of actually closing the 
channel
-                ;; (if currently possible).  If it's not currently possible
-                ;; due to a lack of acknowledgements, then a future 
'send-channel-stuff!'
-                ;; (in response to an 'acknowledgement' message) will take 
care of things.
-                ;;
-                ;; TODO: untested.  TODO: untested in case of reconnects.
-                (close-if-possible! channel)
-                (continue)))
-         (('resend-old-operations!)
-          ;; TODO: no operations and no channels are implemented yet,
-          ;; so for now nothing can be done.
-          (continue))
-         (('acknowledgement channel-number)
-          ;; TODO: failure case
-          (let^ ((! channel
-                    (bbtree-ref channel-number->channel-map channel-number)))
-                ;; The service is allowing us to send another message;
-                ;; update the number of allowed messages.
-                (set-channel-allow-send!
-                 channel (+ 1 (channel-allow-send channel)))
-                ;; Actually send some message, if there are any to send.
-                (send-channel-stuff! channel)
-                (continue)))
-         (('send-channel-stuff! message-queue channel)
-          ;; Tell the service to send the messages over CADET.
-          (send-channel-stuff! channel)
-          (continue))
-         ;; Respond to a query of the msg:cadet:local:data message handler.
-         (('channel answer-box channel-number)
-          (answer answer-box
-                  (bbtree-ref channel-number->channel-map
-                              channel-number (lambda () #false)))
-          (continue))
-         (('lost . lost)
-          (let loop ((lost lost))
-            (match lost
-              (() (continue))
-              ((object . rest)
-               (match object
-                 ((? channel? lost)
-                  TODO
-                  (loop rest))
-                 ((? server:cadet? lost)
-                  (continue* '(disconnect!))))))))
-         (rest
-          (handle-control-message!
-           rest mq (loop:terminal-condition loop)
-           (cut k/reconnect! channel-number->channel-map)))))
-      ;; Start the main event loop.
-      (control loop channel-number->channel-map %minimum-local-channel-id))
+          ;; The mutation 'don't call it' is caught by
+          ;; ‘data is properly sent in response to acknowledgements, in-order’
+          ;; as a hang and an exception?
+          ;;
+          ;; The mutation 'duplicate it' is uncaught, but theoretically 
harmless
+          ;; albeit inefficient.
+          (stop-if-exhausted))
+        ((make-one-by-one-sender send!) (channel-message-queue channel)))
+       (when (channel-desire-close? channel)
+         (close-if-possible! channel)))
+      (match message
+        (('open-channel! channel)
+        (let^ ((! channel-number next-free-channel-number)
+               ;; TODO: handle overflow, and respect bounds
+               (! next-free-channel-number (+ 1 next-free-channel-number))
+               (_ (set-channel-channel-number! channel channel-number))
+               ;; Keep track of the new <channel> object; it will be required
+               ;; later by 'acknowledgement'.
+               (! channel-number->channel-map
+                  (bbtree-set channel-number->channel-map channel-number
+                              channel)))
+              (send-local-channel-create! message-queue channel)
+              (control loop channel-number->channel-map 
next-free-channel-number)))
+       (('close-channel! channel)
+        ;; 'close-channel!' can only be sent after the <channel> object
+        ;; was returned by the procedure 'open-channel!', because only
+        ;; then the channel becomes available. This procedure (synchronuously)
+        ;; sends a 'open-channel!' message and messages are processed by
+        ;; the control loop in-order, so the channel has already been opened.
+        ;;
+        ;; The only remaining states are: the channel is open / the channel
+        ;; is closed.
+        (let^ ((! channel-number (channel-channel-number channel))
+               (? (channel-desire-close? channel)
+                  ;; It has already been requested to close to channel
+                  ;; (maybe it even has already been closed).  This is fine,
+                  ;; as 'close-channel!' is idempotent.  Nothing to do!
+                  ;; TODO: untested.
+                  (continue)))
+              (set-channel-desire-close? channel #true)
+              ;; This procedure will take care of actually closing the channel
+              ;; (if currently possible).  If it's not currently possible
+              ;; due to a lack of acknowledgements, then a future 
'send-channel-stuff!'
+              ;; (in response to an 'acknowledgement' message) will take care 
of things.
+              ;;
+              ;; TODO: untested.  TODO: untested in case of reconnects.
+              (close-if-possible! channel)
+              (continue)))
+       (('resend-old-operations!)
+        ;; TODO: no operations and no channels are implemented yet,
+        ;; so for now nothing can be done.
+        (continue))
+       (('acknowledgement channel-number)
+        ;; TODO: failure case
+        (let^ ((! channel
+                  (bbtree-ref channel-number->channel-map channel-number)))
+              ;; The service is allowing us to send another message;
+              ;; update the number of allowed messages.
+              (set-channel-allow-send!
+               channel (+ 1 (channel-allow-send channel)))
+              ;; Actually send some message, if there are any to send.
+              (send-channel-stuff! channel)
+              (continue)))
+       (('send-channel-stuff! message-queue channel)
+        ;; Tell the service to send the messages over CADET.
+        (send-channel-stuff! channel)
+        (continue))
+       ;; Respond to a query of the msg:cadet:local:data message handler.
+       (('channel answer-box channel-number)
+        (answer answer-box
+                (bbtree-ref channel-number->channel-map
+                            channel-number (lambda () #false)))
+        (continue))
+       (('lost . lost)
+        (let loop ((lost lost))
+          (match lost
+            (() (continue))
+            ((object . rest)
+             (match object
+               ((? channel? lost)
+                TODO
+                (loop rest))
+               ((? server:cadet? lost)
+                (continue* '(disconnect!))))))))
+       (rest
+        (handle-control-message!
+         rest message-queue (loop:terminal-condition loop)
+         (cut k/reconnect! channel-number->channel-map)))))
 
     (define-record-type (<cadet-address> make-cadet-address cadet-address?)
       (fields (immutable peer cadet-address-peer)

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]