gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] branch master updated (f5dc44e -> 58b0a65)


From: gnunet
Subject: [gnunet-scheme] branch master updated (f5dc44e -> 58b0a65)
Date: Sat, 10 Sep 2022 19:07:53 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a change to branch master
in repository gnunet-scheme.

    from f5dc44e  tests/config-parser: Use R7RS symbols instead of Guile's 
syntax.
     new ec53251  dht/server: Pass 'spawn' to connect/fibers.
     new acadf72  nse/client: Extract the reconnection loop.
     new e1304f7  dht/client: Extract message handlers.
     new b2039a8  dht/client: Eliminate mutation from the control loop.
     new f614867  dht/client: Bring API of reconnect mostly in line with (gnu 
gnunet server).
     new a2b1ee8  server: Bring the reconnect loop state into a single 
structure.
     new 6a62918  server: Rename 'primitive-reconnect' to 'run-loop'.
     new 6ce548d  nse/client: Simplify state passing with a new subtype of 
<loop>.
     new fda9f65  server: Make #:message-queue a regular argument.
     new 3d08912  server: Only accept a single 'state' argument.
     new 1aaa33c  dht: Use <loop> for state where possible.
     new e9d8193  Revert "server: Only accept a single 'state' argument."
     new b409f05  dht/client: Rewrite in terms of (gnu gnunet server).
     new 8512aa4  server: Deduplicate make-error-handler*.
     new a543420  server: New procedure for making the arguments to make-loop.
     new a5969fc  cadet/client: Avoid (mutating) hash tables.
     new 3c649c8  cadet/client: Use <loop> for various objects where possible.
     new e7f9505  cadet/client: Rewrite with run-loop.
     new b76f9a8  cadet/client: Minimise imports.
     new 37668b8  server: Add default arguments to 'make-loop'.
     new d4bf64f  server: Unify loop spawning.
     new dcc5f79  nse/indent: Re-indent.
     new 4e6963a  dht/client: Re-indent.
     new 97e0228  cadet/client: Re-indent.
     new 485f8a8  server: Re-indent.
     new bc966ca  server: Inline single-use server->loop-arguments.
     new adf27f0  cadet/client: Simplify more.
     new c6e4c06  server: Inline primitive-disconnect!.
     new e5ccd5a  doc/service-communication: Document <server>.
     new b675758  doc/service-communication: Document the control loop.
     new acb81b2  doc/service-communication: Document spawn-server-loop.
     new fa0f2f4  server: Add type checking to make-loop.
     new 31b340d  server: Document 'make-loop'.
     new 0d3cd19  server: Rename control -> continue.
     new d7b95b6  doc/service-communication: Document #:control-message-handler.
     new 6c53e3a  doc/concurrency: Add missing label for lost-and-found.
     new f988c2d  doc/service-communication: Document run-loop.
     new e81e5d9  doc/service-communication: Add procedures to the index.
     new c49d527  server: Add type checking.
     new d73c8a5  doc/service-communication: Add missing argument of 
make-disconnect!.
     new 8b1fc81  NEWS: Update.
     new 58b0a65  Merge branch 'server-unification'

The 42 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 NEWS                         |  10 +
 doc/concurrency.tm           |   2 +-
 doc/service-communication.tm | 168 +++++++++++++--
 gnu/gnunet/cadet/client.scm  | 474 ++++++++++++++++++++-----------------------
 gnu/gnunet/dht/client.scm    | 368 +++++++++++++++------------------
 gnu/gnunet/nse/client.scm    | 179 +++++++---------
 gnu/gnunet/server.scm        | 146 +++++++++++--
 7 files changed, 759 insertions(+), 588 deletions(-)

diff --git a/NEWS b/NEWS
index 69dc480..5efe13e 100644
--- a/NEWS
+++ b/NEWS
@@ -6,6 +6,16 @@
 # notice and this notice are preserved.  This file is offered as-is,
 # without any warranty.
 
+* Changes since 0.3
+** New functionality
+   - New tools for writing services -- see ‘Writing service communication code’
+     in the manual.  These tools have been used to reduce duplication between
+     client code of different services, so tests targeting a single service
+     automatically also test the other services a bit.
+** Documentation
+   - For 'make-disconnect!', only a single argument was mentioned in the
+     manual, but there were actually two (non-optional!) arguments.  This
+     has been rectified.
 * Changes since 0.2
 ** New functionality
    - New tools for writing services -- have a look at ‘Writing service
diff --git a/doc/concurrency.tm b/doc/concurrency.tm
index ab662e5..9148742 100644
--- a/doc/concurrency.tm
+++ b/doc/concurrency.tm
@@ -47,7 +47,7 @@
     lost again. However, it will only be found the first time it became lost.
 
     <item*|lost-and-found>Each found object is put inside their corresponding
-    <dfn|lost-and-found><index|lost-and-found>, if any.
+    <dfn|lost-and-found><index|lost-and-found><label|lost-and-found>, if any.
   </description>
 
   <\explain>
diff --git a/doc/service-communication.tm b/doc/service-communication.tm
index cf8d96f..3681d2f 100644
--- a/doc/service-communication.tm
+++ b/doc/service-communication.tm
@@ -11,9 +11,9 @@
     The C implementation supports Internet sockets as well.
   </footnote> somewhere on the file system and the client (possibly another
   service) must connect to it.<space|1em>Connections to a service can be made
-  with the <scm|connect/fibers><index|connect/fibers> procedure from
-  <scm|(gnu gnunet mq-impl stream)><index|(gnu gnunet mq-impl stream)>, like
-  this:
+  with the <scm|connect/fibers><index|connect/fibers><label|connect/fibers>
+  procedure from <scm|(gnu gnunet mq-impl stream)><index|(gnu gnunet mq-impl
+  stream)>, like this:
 
   <\scm-code>
     (define mq (connect/fibers config "nse" handlers error-handler))
@@ -378,7 +378,8 @@
   code:
 
   <\explain>
-    <scm|(garbage-collectable <var|service> <var|connect>)>
+    <scm|(garbage-collectable <var|service>
+    <var|connect>)><index|garbage-collectable>
   <|explain>
     Test that the server object is properly garbage collectable \U i.e., make
     sure that when the server object is not used anymore, all new fibers are
@@ -398,7 +399,7 @@
 
   <\explain>
     <scm|(close-not-connected-no-callbacks <var|service> <var|connect>
-    <var|disconnect!> #:rest)>
+    <var|disconnect!> #:rest)><index|close-not-connected-no-callbacks>
   <|explain>
     This tests the connection and disconnection callbacks. It verifies that
     if the service daemon is down, the connection and disconnection callbacks
@@ -409,7 +410,8 @@
   </explain>
 
   <\explain>
-    <scm|(connect-after-eof-after-connected <var|service> <var|connect>)>
+    <scm|(connect-after-eof-after-connected <var|service>
+    <var|connect>)><index|connect-after-eof-after-connected>
   </explain|This tests the connection and disconnection callbacks, in case
   the server disconnects without sending or receiving anything.<space|1em>It
   verifies that the connection and disconnection callback is called and that
@@ -417,7 +419,7 @@
   reconnection.>
 
   <\explain>
-    <scm|(reconnects service <var|service> <var|connect>)>
+    <scm|(reconnects service <var|service> <var|connect>)><index|reconnects>
   </explain|This tests the reconnection logic, by repeatedly closing the
   connection from the server side and verifying that the connection and
   disconnection callbacks are called in the right order and sufficiently
@@ -425,7 +427,8 @@
 
   <\explain>
     <scm|(determine-reported-errors <var|service> <var|connect> <var|proc>
-    #:key (<var|n-connections> 1) (<var|n-errors> 1))>
+    #:key (<var|n-connections> 1) (<var|n-errors>
+    1))><index|determine-reported-errors>
   <|explain>
     This is not a test by itself, but can be used as basis for writing tests
     on error reporting logic. It connects to a service simulated by
@@ -487,16 +490,154 @@
   to the control channel, or wait for the terminal condition to be signalled
   (but not both!).
 
+  The control loop needs various information \V this information is stored in
+  an object of type <scm|\<less\>loop\<gtr\>>, which is simply called a
+  <dfn|loop>. The procedure <scm|make-loop> makes a new loop, <scm|run-loop>
+  enters the loop and <scm|spawn-server-loop> combines making a new loop with
+  entering the loop in the background. In the <em|connect> procedure,
+  <scm|spawn-server-loop> would be called.
+
+  To start a new loop, the procedure <scm|spawn-server-loop> is used
+
   <todo|unify implementation of control loop?>
 
   <todo|TODO: module is not yet complete!>
 
   <\explain>
     
<scm|\<less\>server\<gtr\>><index|\<less\>server\<gtr\>><label|\<less\>server\<gtr\>>
-  </explain|<todo|???>>
+  <|explain>
+    The record type of server objects. The control channel and terminal
+    condition can be retrieved with <scm|server-terminal-condition> and
+    <scm|server-control-channel> respectively. In practice, you will need to
+    define a subtype of <scm|\<less\>server\<gtr\>> with
+    <scm|define-record-type> from R6RS. That way, new fields can be added and
+    a type predicate becomes available.
+
+    <\example>
+      The server type of <acronym|NSE> (network size estimation) is
+      implemented more-or-less like this:
+
+      <\scm>
+        (define-record-type (\<less\>server:nse\<gtr\> make-server
+        server:nse?)
+
+        \ \ (parent \<less\>server\<gtr\>)
+
+        \ \ (fields (immutable estimate/box server-estimate/box))
+
+        \ \ (protocol
+
+        \ \ \ \ (lambda (%make)
+
+        \ \ \ \ \ \ (lambda ()
+
+        \ \ \ \ \ \ \ \ ((%make) (make-atomic-box #false))))))
+
+        (define server (make-server))
+
+        (server:nse? server) ; -\<gtr\> #true
+
+        (atomic-box? (server-estimate/box)) ; -\<gtr\> #true
+      </scm>
+    </example>
+  </explain>
 
   <\explain>
-    <scm|(maybe-send-control-message! <var|server> . <var|message>)>
+    <scm|(spawn-server-loop server #:make-loop #:initial-extra-loop-arguments
+    <text-dots>)><index|spawn-server-loop>
+  <|explain>
+    Make the loop for <var|server> and enter it in the background.
+
+    All the keyword arguments are passed to <scm|#:make-loop>. Additionally,
+    <scm|#:terminal-condition>, <scm|#:control-channel> and
+    <scm|#:lost-and-found> are set to the terminal condition, control channel
+    and lost-and-found of <var|server> respectively. Consult the
+    documentation of <scm|#:make-loop> to determine what arguments have
+    default values and what arguments need to be set explicitly.
+
+    <scm|#:spawn> must be set, even if <scm|#:make-loop> does not require it.
+
+    To make an instance of a proper subtype of <scm|\<less\>loop\<gtr\>>, set
+    <scm|#:make-loop> to the constructor of the subtype. By default, the
+    constructor <scm|make-loop> itelf is used.
+
+    To pass additional state arguments, set
+    <var|initial-extra-loop-arguments> to a list of additional values to
+    pass. By default, it is an empty list. When entering the loop, the loop
+    itself and the <var|initial-extra-loop-arguments> are passed to the
+    <scm|#:control-message-handler>, in that order, as state arguments.
+
+    What \<#2018\>entering a loop\<#2019\> entails, is described in the
+    documentation of <scm|run-loop>.
+  </explain>
+
+  <\explain>
+    <scm|(run-loop <var|loop> . <var|other-state>)><index|run-loop>
+  </explain|Enter the loop <var|loop>, in tail position. The initial state
+  arguments are <scm|loop . other-state>. This asynchronuously connects to
+  the service with <reference|connect/fibers>, using the service name and
+  configuration of <var|loop> and calls the control message handler with the
+  first control message, the procedures <scm|continue> and <scm|continue*>
+  mentioned in the documentation of <scm|make-loop>, the message queue of
+  <scm|connect/fibers>, <var|loop> and the <var|other-state> arguments, in
+  tail position.>
+
+  <\explain>
+    <scm|(make-loop keyword-arguments <text-dots>)><index|make-loop>
+  <|explain>
+    Make a loop. Unless mentioned otherwise, all mentioned keyword arguments
+    are required to be set. Non-mentioned keyword arguments can be passed to
+    <scm|make-loop> but will be ignored.
+
+    <\description>
+      <item*|<scm|#:connected>, <scm|#:disconnected>>(optional) See the
+      <reference|server object> pattern. They can be accessed with
+      <scm|loop:connected> and <scm|loop:disconnected> respectively.
+
+      <item*|<scm|#:configuration>>The <reference|configuration> to use for
+      this loop. This module uses it for connecting to the service, but it
+      can be used by the service client code as well, by accessing it with
+      <scm|loop:configuration>.
+
+      <item*|<scm|#:service-name>>The name of the service to pass to
+      <scm|connect/fibers> (a string). This must match the name used in the
+      configuration. It can be accessed with <scm|loop:service-name>.
+
+      <item*|<scm|#:terminal-condition>>The terminal condition of the server.
+
+      <item*|<scm|#:control-channel>>The control channel of the server.
+
+      <item*|<scm|#:lost-and-found>>The <reference|lost-and-found> of the
+      server.
+
+      <item*|<scm|#:control-message-handler>>The control message handler is a
+      procedure accepting a control message from the control channel, a
+      procedure <scm|loop>, a procedure <scm|loop*>, a message queue and the
+      state arguments, in that order.
+
+      It must handle the control message (<scm|handle-control-message!> can
+      be useful). After it is handled, it can decide to continue the control
+      message handling loop by calling <scm|loop> or <scm|loop*> in tail
+      position.
+
+      The procedure <scm|loop> accepts as arguments the new state argument
+      and calls the <scm|#:control-message-handler> (i.e. in tail position,
+      and taking the control message handler of the new loop, not the old
+      loop) message and the new state arguments.
+
+      The procedure <scm|loop*> functions likewise, but it has an additional
+      message argument before the other arguments, and instead of taking the
+      next message from the control channel, it uses the passed message.
+
+      <item*|<scm|#:make-message-handlers>>This procedure takes the state
+      arguments and returns the message handlers (<todo|TODO: document
+      <scm|message-handlers>>).
+    </description>
+  </explain>
+
+  <\explain>
+    <scm|(maybe-send-control-message! <var|server> .
+    <var|message>)><index|maybe-send-control-message!>
   <|explain>
     Maybe-send the message <var|message> to the service. If sent, return
     <scm|#true>. If the control loop is not active anymore (i.e., the
@@ -505,13 +646,13 @@
 
   <\explain>
     <scm|(maybe-send-control-message!* <var|terminal-condition>
-    <var|control-channel> . <var|message>)>
+    <var|control-channel> . <var|message>)><index|maybe-send-control-message!*>
   </explain|This is like <scm|maybe-send-control-message!>, except it doesn't
   need a reference to the <scm|\<less\>server\<gtr\>> object, which sometimes
   is required for GC reasons.>
 
   <\explain>
-    <scm|(make-disconnect! <var|type?>)><index|make-disconnect!>
+    <scm|(make-disconnect! <var|name> <var|type?>)><index|make-disconnect!>
   <|explain>
     It is assumed the <reference|server object> type is a subtype of
     <scm|<reference|\<less\>server\<gtr\>>>.
@@ -525,6 +666,9 @@
 
     More technically, this <em|maybe-sends> a <scm|disconnect!> message to
     the control channel of the server object.
+
+    <var|name> must be a symbol, it is used in error messages to distinguish
+    between different service types, as the name of the service.
   </explain>
 </body>
 
diff --git a/gnu/gnunet/cadet/client.scm b/gnu/gnunet/cadet/client.scm
index bc1b8b8..c37b908 100644
--- a/gnu/gnunet/cadet/client.scm
+++ b/gnu/gnunet/cadet/client.scm
@@ -53,8 +53,7 @@
          (only (gnu gnunet crypto struct)
                /peer-identity)
          (only (gnu gnunet concurrency lost-and-found)
-               make-lost-and-found collect-lost-and-found-operation
-               losable-lost-and-found)
+               losable-lost-and-found)
          (only (gnu gnunet mq handler)
                message-handlers message-handler)
          (only (gnu gnunet mq)
@@ -66,15 +65,15 @@
                maybe-ask* answer
                maybe-send-control-message!
                maybe-send-control-message!*
-               make-error-handler
                make-disconnect!
                server-terminal-condition
                server-control-channel
-               handle-control-message!)
+               handle-control-message!
+               make-loop run-loop spawn-server-loop loop:control-channel
+               loop:terminal-condition)
          (only (gnu gnunet hashcode struct)
                /hashcode:512)
          (only (gnu gnunet message protocols) message-type)
-         (only (gnu gnunet mq-impl stream) connect/fibers)
          (only (gnu gnunet mq)
                make-message-queue inject-message!)
          (only (gnu gnunet netstruct syntactic)
@@ -88,22 +87,19 @@
                let^)
          (only (rnrs base)
                begin define lambda assert quote cons apply values
-               case else = define-syntax + expt - let* let and >
-               not if eq?)
+               case else = define-syntax + expt - let and >
+               not if < append list)
          (only (rnrs control)
-               when unless)
-         (only (rnrs hashtables)
-               make-eqv-hashtable hashtable-ref hashtable-set!)
+               when)
+         (only (pfds bbtrees)
+               bbtree-set make-bbtree bbtree-ref)
          (only (rnrs records syntactic) define-record-type)
          (only (ice-9 control) let/ec)
          (only (ice-9 match) match)
          (only (guile) define* error)
          (only (fibers) spawn-fiber)
-         (only (fibers channels) get-operation put-operation make-channel)
-         (only (fibers conditions) make-condition wait-operation
-               signal-condition!)
-         (only (fibers operations)
-               wrap-operation choice-operation perform-operation)
+         (only (srfi srfi-26)
+               cut)
          (only (srfi srfi-45)
                delay force))
   (begin
@@ -150,250 +146,230 @@
                    ((%make (losable-lost-and-found server)) server
                     destination options #false #false message-queue 0)))))
 
-    (define* (connect config #:key (connected values) (disconnected values)
-                     (spawn spawn-fiber))
+    (define empty-bbtree (make-bbtree <))
+
+    (define* (connect config #:key connected disconnected spawn #:rest r)
       "Asynchronuously connect to the CADET service, using the configuration
 @var{config}, returning a CADET server object."
-      (define server (%make-server))
-      (spawn-procedure spawn config
-                      (server-terminal-condition server)
-                      (server-control-channel server)
-                      connected disconnected spawn
-                      (losable-lost-and-found server)
-                      ;; integers cannot be compares with eq?,
-                      ;; but they can be with eqv?
-                      (make-eqv-hashtable))
-      server)
-
-    ;; TODO: reduce duplication with (gnu gnunet dht client)
-    (define (spawn-procedure spawn . rest)
-      (spawn (lambda () (apply reconnect rest))))
+      (apply spawn-server-loop (%make-server)
+            #:make-message-handlers make-message-handlers
+            #:control-message-handler control-message-handler
+            #:service-name "cadet"
+            #:configuration config
+            #:initial-extra-loop-arguments
+            (list empty-bbtree %minimum-local-channel-id) r))
 
-    ;; channel-number->channel-hash-map:
-    ;;   A hash map from channel numbers to their corresponding
+    ;; channel-number->channel-map:
+    ;;   A 'bbtree' from channel numbers to their corresponding
     ;;   <channel> object, or nothing if the control loop
     ;;   has not processes 'open-channel!' yet or if the channel
     ;;   has been closed.
     ;;
     ;;   TODO: GC problems, split in external and internal parts
-    (define (reconnect config terminal-condition control-channel
-                      connected disconnected spawn
-                      lost-and-found
-                      channel-number->channel-hash-map)
-      (define loop-operation
-       (choice-operation
-        (get-operation control-channel)
-        (wrap-operation (collect-lost-and-found-operation lost-and-found)
-                        (lambda (lost) (cons 'lost lost)))))
-      (define handlers
-       (message-handlers
-        (message-handler
-         (type (symbol-value message-type msg:cadet:local:data))
-         ((interpose exp) exp)
-         ((well-formed? slice) #true)
-         ((handle! slice)
-          (let^ ((! cadet-data-length (sizeof /:msg:cadet:local:data '()))
-                 (! header (slice-slice slice 0 cadet-data-length))
-                 (! tail (slice-slice slice cadet-data-length))
-                 (! channel-number
-                    (read% /:msg:cadet:local:data '(channel-number) header))
-                 (! channel
-                    (maybe-ask* terminal-condition control-channel 'channel
-                                channel-number))
-                 (? (not channel)
-                    ???))
-                ;; TODO: while the message is being processed, other messages
-                ;; cannot be accepted -- document this limitation.
-                (inject-message! (channel-message-queue channel) tail))))
-        (message-handler
-         (type (symbol-value message-type msg:cadet:local:acknowledgement))
-         ((interpose exp) exp)
-         ((well-formed? slice)
-          (= (slice-length slice) (sizeof /:msg:cadet:local:acknowledgement 
'())))
-         ((handle! slice)
-          ;; The slice needs to be read here (and not in 'control'), as it 
might
-          ;; later be reused for something different.
-          (let ((channel-number (analyse-local-acknowledgement slice)))
-            (maybe-send-control-message!* terminal-condition control-channel
-                                          'acknowledgement
-                                          channel-number))))))
-      (define error-handler
-       (make-error-handler connected disconnected terminal-condition
-                           control-channel))
-      (define mq (connect/fibers config "cadet" handlers error-handler
-                                #:spawn spawn))
-      (define (k/reconnect!)
-       (reconnect config terminal-condition control-channel connected
-                  disconnected spawn lost-and-found
-                  channel-number->channel-hash-map))
-      (define (control next-free-channel-number)
-       "The main event loop."
-       (control* next-free-channel-number
-                 (perform-operation loop-operation)))
-      (define (close-if-possible! channel)
-       ;; Pre-conditions:
-       ;;  * the channel is open
-       ;;  * and a close has been requested
-       ;;
-       ;; TODO: untested.
-       (when (= (message-queue-length (channel-message-queue channel)) 0)
-         (send-message! mq
-                        (construct-local-channel-destroy
-                         (channel-channel-number channel)))
-         ;; We don't need the envelope.
-         (values)))
-      (define (control* next-free-channel-number message)
-       (define (continue)
-         (control next-free-channel-number))
-       (define (continue* message)
-         (control* next-free-channel-number message))
-       ;; TODO: what about closed channels?
-       (define (send-channel-stuff! channel)
-         ;; Send messages one-by-one, keeping in mind that we might not be able
-         ;; to send all messages to the service at once, only 
'channel-allow-send'
-         ;; messages can be sent and this decreases by sending messages.
-         ;;
-         ;; TODO: use priority information, somehow when cancelling a message
-         ;; cancel the corresponding message to be sent to the CADET service 
when
-         ;; there is still time, zero-copy networking.
-         (let/ec
-          stop
-          (define (stop-if-exhausted)
-            ;; The mutation 'replace > by >=' is caught by
-            ;; "data is not sent before an acknowledgement"
-            ;; in form of a hang.
-            (if (> (channel-allow-send channel) 0)
-                ;; (unless ...) and (when ...) can return *unspecified*,
-                ;; but (gnu gnunet mq) expects no return values. Detected
-                ;; by the "data is properly sent in response to 
acknowledgements, in-order"
-                ;; test.
-                (values)
-                (stop)))
-          ;; Tested by ‘data is properly sent in response to acknowledgements, 
in-order’
-          ;; -- it catches the mutation 'replace 1 by zero' (as a hang)
-          (define (decrement!)
-            (set-channel-allow-send! channel
-                                     (- (channel-allow-send channel) 1)))
-          ;; It is important to check that a message can be sent before
-          ;; send! is called, otherwise the message will be removed from
-          ;; the message queue and be forgotten without being ever sent.
+
+    (define (make-message-handlers loop . _)
+      (message-handlers
+       (message-handler
+       (type (symbol-value message-type msg:cadet:local:data))
+       ((interpose exp) exp)
+       ((well-formed? slice) #true)
+       ((handle! slice)
+        (let^ ((! cadet-data-length (sizeof /:msg:cadet:local:data '()))
+               (! header (slice-slice slice 0 cadet-data-length))
+               (! tail (slice-slice slice cadet-data-length))
+               (! channel-number
+                  (read% /:msg:cadet:local:data '(channel-number) header))
+               (! channel
+                  (maybe-ask* (loop:terminal-condition loop)
+                              (loop:control-channel loop) 'channel
+                              channel-number))
+               (? (not channel)
+                  ???))
+              ;; TODO: while the message is being processed, other messages
+              ;; cannot be accepted -- document this limitation.
+              (inject-message! (channel-message-queue channel) tail))))
+       (message-handler
+       (type (symbol-value message-type msg:cadet:local:acknowledgement))
+       ((interpose exp) exp)
+       ((well-formed? slice)
+        (= (slice-length slice)
+           (sizeof /:msg:cadet:local:acknowledgement '())))
+       ((handle! slice)
+        ;; The slice needs to be read here (and not in 'control'), as it might
+        ;; later be reused for something different.
+        (let ((channel-number (analyse-local-acknowledgement slice)))
+          (maybe-send-control-message!*
+           (loop:terminal-condition loop) (loop:control-channel loop)
+           'acknowledgement channel-number))))))
+
+    (define (close-if-possible! message-queue channel)
+      ;; Pre-conditions:
+      ;;  * the channel is open
+      ;;  * and a close has been requested
+      ;;
+      ;; TODO: untested.
+      (when (= (message-queue-length (channel-message-queue channel)) 0)
+       (send-message! message-queue
+                      (construct-local-channel-destroy
+                       (channel-channel-number channel)))
+       ;; We don't need the envelope.
+       (values)))
+
+    ;; TODO: what about closed channels?
+    (define (send-channel-stuff! message-queue channel)
+      ;; Send messages one-by-one, keeping in mind that we might not be able
+      ;; to send all messages to the service at once, only 'channel-allow-send'
+      ;; messages can be sent and this decreases by sending messages.
+      ;;
+      ;; TODO: use priority information, somehow when cancelling a message
+      ;; cancel the corresponding message to be sent to the CADET service when
+      ;; there is still time, zero-copy networking.
+      (let/ec
+       stop
+       (define (stop-if-exhausted)
+        ;; The mutation 'replace > by >=' is caught by
+        ;; "data is not sent before an acknowledgement"
+        ;; in form of a hang.
+        (if (> (channel-allow-send channel) 0)
+            ;; (unless ...) and (when ...) can return *unspecified*,
+            ;; but (gnu gnunet mq) expects no return values. Detected
+            ;; by the "data is properly sent in response to acknowledgements,
+            ;; in-order" test.
+            (values)
+            (stop)))
+       ;; Tested by ‘data is properly sent in response to acknowledgements,
+       ;; in-order’ -- it catches the mutation 'replace 1 by zero' (as a hang)
+       (define (decrement!)
+        (set-channel-allow-send! channel (- (channel-allow-send channel) 1)))
+       ;; It is important to check that a message can be sent before
+       ;; send! is called, otherwise the message will be removed from
+       ;; the message queue and be forgotten without being ever sent.
+       ;;
+       ;; Tested by ‘data is not sent before an acknowledgement’ -- it catches
+       ;; the mutation 'remove this line' (as a hang).
+       (stop-if-exhausted)
+       (define (send! envelope)
+        (attempt-irrevocable-sent!
+         envelope
+         ((go message priority)
+          ;; The mutation ‘don't call send-message!’ is caught by
+          ;; ‘data is properly sent in response to acknowledgements, in-order’
+          ;; as a hang and an exception.
           ;;
-          ;; Tested by ‘data is not sent before an acknowledgement’ -- it 
catches
-          ;; the mutation 'remove this line' (as a hang).
-          (stop-if-exhausted)
-          (define (send! envelope)
-            (attempt-irrevocable-sent!
-             envelope
-             ((go message priority)
-              ;; The mutation ‘don't call send-message!’ is caught by
-              ;; ‘data is properly sent in response to acknowledgements, 
in-order’
-              ;; as a hang and an exception.
+          ;; The mutation 'swap send-message!' and 'decrement!' is uncaught,
+          ;; but theoretically harmless.
+          ;; TODO: maybe get rid of the message queue limit in (gnu gnunet mq)
+          (send-message! message-queue
+                         (construct-local-data
+                          (channel-channel-number channel) ; TODO: multiple 
channels is untested
+                          0 ;; TODO: relation between priority and 
priority-preference?
+                          message)) ; TODO: sending the _right_ message is 
untested
+          ;; The mutation ‘don't call decrement!' is caught by
+          ;; ‘data is properly sent in response to acknowledgements, in-order’,
+          ;; as a hang with an exception.
+          (decrement!))
+         ((cancelled) (values)) ; TODO: untested
+         ((already-sent) (error "tried to send an envelope twice (CADET)")))
+        ;; Exit once nothing can be sent anymore (TODO check if
+        ;; make-one-by-one-sender allows non-local exits).
+        ;;
+        ;; The mutation 'don't call it' is caught by
+        ;; ‘data is properly sent in response to acknowledgements, in-order’
+        ;; as a hang and an exception?
+        ;;
+        ;; The mutation 'duplicate it' is uncaught, but theoretically harmless
+        ;; albeit inefficient.
+        (stop-if-exhausted))
+       ((make-one-by-one-sender send!) (channel-message-queue channel)))
+      (when (channel-desire-close? channel)
+       (close-if-possible! message-queue channel)))
+
+  (define (control-message-handler message continue continue* message-queue 
loop
+                                  channel-number->channel-map
+                                  next-free-channel-number)
+      "The main event loop"
+      (define (k/reconnect! channel-number->channel-map)
+       (run-loop loop channel-number->channel-map next-free-channel-number))
+      (define (continue/no-change)
+       (continue loop channel-number->channel-map next-free-channel-number))
+      (define (continue/no-change* message)
+       (continue* message loop channel-number->channel-map
+                  next-free-channel-number))
+      (match message
+        (('open-channel! channel)
+        (set-channel-channel-number! channel next-free-channel-number)
+        (send-local-channel-create! message-queue channel)
+        (continue loop
+                  ;; Keep track of the new <channel> object; it will be
+                  ;; required later by 'acknowledgement'.
+                  (bbtree-set channel-number->channel-map
+                              next-free-channel-number channel)
+                  ;; TODO: handle overflow, and respect bounds
+                 (+ 1 next-free-channel-number)))
+       (('close-channel! channel)
+        ;; 'close-channel!' can only be sent after the <channel> object
+        ;; was returned by the procedure 'open-channel!', because only
+        ;; then the channel becomes available. This procedure (synchronuously)
+        ;; sends a 'open-channel!' message and messages are processed by
+        ;; the control loop in-order, so the channel has already been opened.
+        ;;
+        ;; The only remaining states are: the channel is open / the channel
+        ;; is closed.
+        (let^ ((! channel-number (channel-channel-number channel))
+               (? (channel-desire-close? channel)
+                  ;; It has already been requested to close to channel
+                  ;; (maybe it even has already been closed).  This is fine,
+                  ;; as 'close-channel!' is idempotent.  Nothing to do!
+                  ;; TODO: untested.
+                  (continue/no-change)))
+              (set-channel-desire-close? channel #true)
+              ;; This procedure will take care of actually closing the channel
+              ;; (if currently possible).  If it's not currently possible
+              ;; due to a lack of acknowledgements, then a future 
'send-channel-stuff!'
+              ;; (in response to an 'acknowledgement' message) will take care 
of things.
               ;;
-              ;; The mutation 'swap send-message!' and 'decrement!' is 
uncaught,
-              ;; but theoretically harmless.
-              (send-message! mq ; TODO: maybe get rid of the message queue 
limit in (gnu gnunet mq)
-                             (construct-local-data
-                              (channel-channel-number channel) ; TODO: 
multiple channels is untested
-                              0 ;; TODO: relation between priority and 
priority-preference?
-                              message)) ; TODO: sending the _right_ message is 
untested
-              ;; The mutation ‘don't call decrement!' is caught by
-              ;; ‘data is properly sent in response to acknowledgements, 
in-order’,
-              ;; as a hang with an exception.
-              (decrement!))
-             ((cancelled) (values)) ; TODO: untested
-             ((already-sent) (error "tried to send an envelope twice 
(CADET)")))
-            ;; Exit once nothing can be sent anymore (TODO check if
-            ;; make-one-by-one-sender allows non-local exits).
-            ;;
-            ;; The mutation 'don't call it' is caught by
-            ;; ‘data is properly sent in response to acknowledgements, 
in-order’
-            ;; as a hang and an exception?
-            ;;
-            ;; The mutation 'duplicate it' is uncaught, but theoretically 
harmless
-            ;; albeit inefficient.
-            (stop-if-exhausted))
-          ((make-one-by-one-sender send!) (channel-message-queue channel)))
-         (when (channel-desire-close? channel)
-           (close-if-possible! channel)))
-       (match message
-         (('open-channel! channel)
-          (let* ((channel-number next-free-channel-number)
-                 ;; TODO: handle overflow, and respect bounds
-                 (next-free-channel-number (+ 1 next-free-channel-number)))
-            (set-channel-channel-number! channel channel-number)
-            ;; Keep track of the new <channel> object; it will be required
-            ;; later by 'acknowledgement'.
-            (hashtable-set! channel-number->channel-hash-map
-                            channel-number
-                            channel)
-            (send-local-channel-create! mq channel)
-            (control next-free-channel-number)))
-         (('close-channel! channel)
-          ;; 'close-channel!' can only be sent after the <channel> object
-          ;; was returned by the procedure 'open-channel!', because only
-          ;; then the channel becomes available. This procedure 
(synchronuously)
-          ;; sends a 'open-channel!' message and messages are processed by
-          ;; the control loop in-order, so the channel has already been opened.
-          ;;
-          ;; The only remaining states are: the channel is open / the channel
-          ;; is closed.
-          (let^ ((! channel-number (channel-channel-number channel))
-                 (? (channel-desire-close? channel)
-                    ;; It has already been requested to close to channel
-                    ;; (maybe it even has already been closed).  This is fine,
-                    ;; as 'close-channel!' is idempotent.  Nothing to do!
-                    ;; TODO: untested.
-                    (continue)))
-                (set-channel-desire-close? channel #true)
-                ;; This procedure will take care of actually closing the 
channel
-                ;; (if currently possible).  If it's not currently possible
-                ;; due to a lack of acknowledgements, then a future 
'send-channel-stuff!'
-                ;; (in response to an 'acknowledgement' message) will take 
care of things.
-                ;;
-                ;; TODO: untested.  TODO: untested in case of reconnects.
-                (close-if-possible! channel)
-                (continue)))
-         (('resend-old-operations!)
-          ;; TODO: no operations and no channels are implemented yet,
-          ;; so for now nothing can be done.
-          (continue))
-         (('acknowledgement channel-number)
-          (let^ ((! channel (hashtable-ref channel-number->channel-hash-map
-                                           channel-number
-                                           #false))
-                 (? (not channel)
-                    ???))
-                ;; The service is allowing us to send another message;
-                ;; update the number of allowed messages.
-                (set-channel-allow-send!
-                 channel (+ 1 (channel-allow-send channel)))
-                ;; Actually send some message, if there are any to send.
-                (send-channel-stuff! channel)
-                (continue)))
-         (('send-channel-stuff! message-queue channel)
-          ;; Tell the service to send the messages over CADET.
-          (send-channel-stuff! channel)
-          (continue))
-         ;; Respond to a query of the msg:cadet:local:data message handler.
-         (('channel answer-box channel-number)
-          (answer answer-box
-                  (hashtable-ref channel-number->channel-hash-map
-                                 channel-number #false))
-          (continue))
-         (('lost . lost)
-          (let loop ((lost lost))
-            (match lost
-              (() (continue))
-              ((object . rest)
-               (match object
-                 ((? channel? lost)
-                  TODO
-                  (loop rest))
-                 ((? server:cadet? lost)
-                  (continue* '(disconnect!))))))))
-         (rest (handle-control-message! rest mq terminal-condition 
k/reconnect!))))
-      ;; Start the main event loop.
-      (control %minimum-local-channel-id))
+              ;; TODO: untested.  TODO: untested in case of reconnects.
+              (close-if-possible! message-queue channel)
+              (continue/no-change)))
+       (('resend-old-operations!)
+        ;; TODO: no operations and no channels are implemented yet,
+        ;; so for now nothing can be done.
+        (continue/no-change))
+       (('acknowledgement channel-number)
+        ;; TODO: failure case
+        (let ((channel
+               (bbtree-ref channel-number->channel-map channel-number)))
+          ;; The service is allowing us to send another message;
+          ;; update the number of allowed messages.
+          (set-channel-allow-send! channel (+ 1 (channel-allow-send channel)))
+          ;; Actually send some message, if there are any to send.
+          (send-channel-stuff! message-queue channel)
+          (continue/no-change)))
+       (('send-channel-stuff! message-queue/channel channel)
+        ;; Tell the service to send the messages over CADET.
+        (send-channel-stuff! message-queue channel)
+        (continue/no-change))
+       ;; Respond to a query of the msg:cadet:local:data message handler.
+       (('channel answer-box channel-number)
+        (answer answer-box
+                (bbtree-ref channel-number->channel-map channel-number
+                            (lambda () #false)))
+        (continue/no-change))
+       (('lost . lost)
+        (let loop ((lost lost))
+          (match lost
+            (() (continue/no-change))
+            ((object . rest)
+             (match object
+               ((? channel? lost)
+                TODO
+                (loop rest))
+               ((? server:cadet? lost)
+                (continue/no-change* '(disconnect!))))))))
+       (rest
+        (handle-control-message!
+         rest message-queue (loop:terminal-condition loop)
+         (cut k/reconnect! channel-number->channel-map)))))
 
     (define-record-type (<cadet-address> make-cadet-address cadet-address?)
       (fields (immutable peer cadet-address-peer)
diff --git a/gnu/gnunet/dht/client.scm b/gnu/gnunet/dht/client.scm
index f2088ac..cb54f4d 100644
--- a/gnu/gnunet/dht/client.scm
+++ b/gnu/gnunet/dht/client.scm
@@ -77,36 +77,29 @@
          (gnu gnunet hashcode struct)
          (gnu gnunet mq)
          (gnu gnunet mq handler)
-         (gnu gnunet mq-impl stream)
          (gnu gnunet mq envelope)
          (only (gnu gnunet server)
                maybe-send-control-message! maybe-send-control-message!*
-               make-error-handler maybe-ask* answer
+               maybe-ask* answer
                <server> server-terminal-condition server-control-channel
-               make-disconnect! handle-control-message!)
+               make-disconnect! handle-control-message!
+               loop:terminal-condition loop:control-channel
+               run-loop spawn-server-loop)
          (only (guile)
                pk define-syntax-rule define* lambda* error
-               make-hash-table hashq-set! hashq-remove! hashv-set! hashv-ref
-               hashv-remove! hash-clear! hash-map->list
                ->bool and=>)
          (only (ice-9 atomic)
-               make-atomic-box atomic-box-ref atomic-box-set!)
+               make-atomic-box)
          (only (ice-9 match)
                match)
          (only (ice-9 weak-vector)
                weak-vector weak-vector-ref weak-vector?)
+         (only (pfds bbtrees)
+               bbtree-size bbtree-fold bbtree-set bbtree-contains?
+               bbtree-delete make-bbtree bbtree-ref)
          (only (gnu extractor enum)
                symbol-value)
-         (only (fibers)
-               spawn-fiber)
-         (only (fibers conditions)
-               make-condition signal-condition! wait-operation wait)
-         (only (fibers operations)
-               perform-operation choice-operation wrap-operation)
-         (only (fibers channels)
-               put-operation get-operation put-message)
          (only (gnu gnunet concurrency lost-and-found)
-               make-lost-and-found collect-lost-and-found-operation
                losable-lost-and-found)
          (gnu gnunet dht struct)
          (only (gnu gnunet message protocols)
@@ -121,11 +114,11 @@
          (only (gnu gnunet utils cut-syntax)
                cut-syntax)
          (only (rnrs base)
-               and >= = quote * / + - define begin ... let*
+               and < >= = quote * / + - define begin ... let*
                quote case else values apply let cond if > eq?
                <= expt assert exact? integer? lambda for-each
                not expt min max div-and-mod positive? define-syntax
-               vector cons)
+               vector cons append list)
          (only (rnrs control)
                unless when)
          (only (rnrs records syntactic)
@@ -133,7 +126,9 @@
          (only (rnrs conditions)
                &error condition make-who-condition define-condition-type)
          (only (rnrs exceptions)
-               raise))
+               raise)
+         (only (srfi srfi-26)
+               cut))
   (begin
     ;; The minimal and maximal replication levels the DHT service allows.
     ;; While the service won't reject replication levels outside this range,
@@ -737,25 +732,19 @@ message header is assumed to be correct."
       (make-disconnect! 'distributed-hash-table ; for error messages
                        server:dht?))
 
-    (define* (connect config #:key (connected values) (disconnected values)
-                     (spawn spawn-fiber))
+    (define* (connect config #:key connected disconnected spawn #:rest r)
       "Connect to the DHT service, using the configuration @var{config}.  The
 connection is made asynchronuously; the optional thunk @var{connected} is 
called
 when the connection has been made.  The connection can break; the optional 
thunk
 @var{disconnected} is called when it does. If the connection breaks, the client
 code automatically tries to reconnect, so @var{connected} can be called after
 @var{disconnected}.  This procedure returns a DHT server object."
-      (define old-id->operation-map (make-hash-table))
-      (define server (make-server))
-      ;; We could do @code{(spawn (lambda () (reconnect ...)))} here instead,
-      ;; but that causes ‘(DHT) garbage collectable’ to fail.
-      (spawn-procedure spawn (server-terminal-condition server) config
-                      old-id->operation-map (server-control-channel server)
-                      (losable-lost-and-found server) #:connected connected
-                      #:disconnected disconnected #:spawn spawn)
-      server)
-    (define (spawn-procedure spawn . rest)
-      (spawn (lambda () (apply reconnect rest))))
+      (apply spawn-server-loop (make-server)
+            #:make-message-handlers make-message-handlers
+            #:control-message-handler control-message-handler
+            #:configuration config
+            #:service-name "dht"
+            #:initial-extra-loop-arguments (list empty-bbtree empty-bbtree) r))
 
     ;; TODO: put in new module?
     (define (make-weak-reference to)
@@ -768,183 +757,156 @@ code automatically tries to reconnect, so 
@var{connected} can be called after
          (weak-vector-ref reference 0)
          reference))
 
-    (define* (reconnect terminal-condition config
-                       old-id->operation-map control-channel lost-and-found
-                       #:key (spawn spawn-fiber)
-                       connected disconnected
-                       #:rest rest)
-      ;; The 'id->operation-map' holds get operations that have
-      ;; been communicated to the service.  The 'old-id->operation-map'
-      ;; is used for reconnecting and holds get operations that need
-      ;; to be communicated to the service again.  'old-id->operation-map'
-      ;; only shrinks, while 'id->operation-map' can both grow and shrink.
-      ;;
-      ;; To avoid races, 'id->operation-map' and 'old-id->operation-map'
-      ;; are only accessed from 'control'.
-      ;;
-      ;; To allow cancelling operations when they become unreachable, 
operations
-      ;; are wrapped in a weak reference (unless linger? is #true).  Otherwise,
-      ;; they won't ever become unreachable.  Keep in mind that, at least in
-      ;; Guile 3.0.7, weak references are broken when the object is returned
-      ;; from the guardian (and probably earlier) -- this seems to be a
-      ;; difficult to fix bug.
-      ;;
-      ;; This code is written to support both the correct and incorrect 
behaviour
-      ;; of guardians+weak vectors.
-      (define id->operation-map (make-hash-table))
+    (define* (make-message-handlers loop _1 _2)
       (define (request-search-result-iterator unique-id)
        "Ask @code{control} what is the iterator for the get operation with
 unique id @var{unique-id}.  If there is no such get operation, or the get
 operation is cancelled, return @code{#false} instead."
-       ;; It is possible to look at id->operation-map directly instead,
-       ;; but hash tables are thread-unsafe.
        ;; TODO: is the 'terminal-condition' case needed?
-       (maybe-ask* terminal-condition control-channel
-                   'request-search-result-iterator
-                   unique-id))
-      (define handlers
-       (message-handlers
-        (message-handler
-         (type (symbol-value message-type msg:dht:monitor:get))
-         ((interpose exp) exp)
-         ((well-formed? slice)
-          ;; The C implementation verifies that 'get-path-length' at most
-          ;; (- (expt 2 16) 1), but this seems only to prevent integer 
overflow,
-          ;; which cannot happen in Scheme due to the use of bignums.
-          ;;
-          ;; This message does _not_ have a payload, so use = instead of >=.
-          (well-formed?/path-length slice /:msg:dht:monitor:get-response
-                                    (get-path-length) =))
-         ((handle! slice) ???))
-        (message-handler
-         (type (symbol-value message-type msg:dht:monitor:get-response))
-         ((interpose exp) exp)
-         ((well-formed? slice)
-          ;; Payload follows, hence >= instead of =.
-          (well-formed?/path-length slice /:msg:dht:monitor:get-response
-                                    (get-path-length put-path-length) >=))
-         ((handle! slice) ???))
-        (message-handler
-         (type (symbol-value message-type msg:dht:monitor:put))
-         ((interpose exp) exp)
-         ((well-formed? slice)
-          ;; Payload follows, hence >= instead of =.
-          (well-formed?/path-length slice /:msg:dht:monitor:put
-                                    (put-path-length) >=))
-         ((handle! slice) ???))
-        (message-handler
-         (type (symbol-value message-type msg:dht:client:result))
-         ((interpose exp) exp)
-         ((well-formed? slice)
-          ;; Actual data follows, hence >= instead of =.
-          (well-formed?/path-length slice /:msg:dht:client:result
-                                    (get-path-length put-path-length) >=))
-         ((handle! slice)
-          ;; The DHT service found some data we were looking for.
-          (let^ ((<-- (search-result unique-id)
-                      ;; TODO: maybe verify the type and key?
-                      (analyse-client-result slice))
-                 (! handle (request-search-result-iterator unique-id))
-                 (? (not handle)
-                    ;; Perhaps the search object became unreachable;
-                    ;; 'process-stop-search' (see next commit) will be
-                    ;; called soon to inform the DHT service.
-                    (values))
-                 (? (get? handle)
-                    ;; TODO might not be true once monitoring operations
-                    ;; are supported.
-                    ((get:iterator handle) search-result)))
-                ;; TODO: wrong type (maybe a put handle?).
-                TODO-error-reporting/2)))))
-      (define error-handler
-       (make-error-handler connected disconnected terminal-condition
-                           control-channel))
-      (define mq (connect/fibers config "dht" handlers error-handler))
-      (define (process-stop-search get)
-       ;; TODO: tests!
-       ;; TODO: cancel outstanding messages to the DHT services for this
-       ;; get operation (including the request to start searching), if
-       ;; any.
-       (hashv-remove! old-id->operation-map (get:unique-id get))
-       (when (hashv-ref id->operation-map (get:unique-id get))
-         (hashv-remove! id->operation-map (get:unique-id get))
-         (send-stop-get! mq get)))
+       (maybe-ask* (loop:terminal-condition loop) (loop:control-channel loop)
+                   'request-search-result-iterator unique-id))
+      (message-handlers
+       (message-handler
+       (type (symbol-value message-type msg:dht:monitor:get))
+       ((interpose exp) exp)
+       ((well-formed? slice)
+        ;; The C implementation verifies that 'get-path-length' at most
+        ;; (- (expt 2 16) 1), but this seems only to prevent integer overflow,
+        ;; which cannot happen in Scheme due to the use of bignums.
+        ;;
+        ;; This message does _not_ have a payload, so use = instead of >=.
+        (well-formed?/path-length slice /:msg:dht:monitor:get-response
+                                  (get-path-length) =))
+       ((handle! slice) ???))
+       (message-handler
+       (type (symbol-value message-type msg:dht:monitor:get-response))
+       ((interpose exp) exp)
+       ((well-formed? slice)
+        ;; Payload follows, hence >= instead of =.
+        (well-formed?/path-length slice /:msg:dht:monitor:get-response
+                                  (get-path-length put-path-length) >=))
+       ((handle! slice) ???))
+       (message-handler
+       (type (symbol-value message-type msg:dht:monitor:put))
+       ((interpose exp) exp)
+       ((well-formed? slice)
+        ;; Payload follows, hence >= instead of =.
+        (well-formed?/path-length slice /:msg:dht:monitor:put
+                                  (put-path-length) >=))
+       ((handle! slice) ???))
+       (message-handler
+       (type (symbol-value message-type msg:dht:client:result))
+       ((interpose exp) exp)
+       ((well-formed? slice)
+        ;; Actual data follows, hence >= instead of =.
+        (well-formed?/path-length slice /:msg:dht:client:result
+                                  (get-path-length put-path-length) >=))
+       ((handle! slice)
+        ;; The DHT service found some data we were looking for.
+        (let^ ((<-- (search-result unique-id)
+                    ;; TODO: maybe verify the type and key?
+                    (analyse-client-result slice))
+               (! handle (request-search-result-iterator unique-id))
+               (? (not handle)
+                  ;; Perhaps the search object became unreachable;
+                  ;; 'process-stop-search' (see next commit) will be
+                  ;; called soon to inform the DHT service.
+                  (values))
+               (? (get? handle)
+                  ;; TODO might not be true once monitoring operations
+                  ;; are supported.
+                  ((get:iterator handle) search-result)))
+              ;; TODO: wrong type (maybe a put handle?).
+              TODO-error-reporting/2)))))
+
+    (define (process-stop-search old-id->operation-map id->operation-map
+                                message-queue get)
+      ;; TODO: tests!
+      ;; TODO: cancel outstanding messages to the DHT services for this
+      ;; get operation (including the request to start searching), if
+      ;; any.
+      (let^ ((! old-id->operation-map
+               (bbtree-delete old-id->operation-map (get:unique-id get)))
+            (? (not (bbtree-contains? id->operation-map (get:unique-id get)))
+               (values old-id->operation-map id->operation-map))
+            (! id->operation-map
+               (bbtree-delete id->operation-map (get:unique-id get))))
+           (send-stop-get! message-queue get)
+           (values old-id->operation-map id->operation-map)))
+
+    (define (control-message-handler message continue continue* message-queue
+                                    loop old-id->operation-map
+                                    id->operation-map)
+      (define (continue/no-change)
+       (continue loop old-id->operation-map id->operation-map))
       (define (k/reconnect!)
-       (apply reconnect terminal-condition config id->operation-map
-              control-channel lost-and-found rest))
-      (define loop-operation
-       (choice-operation
-        (get-operation control-channel)
-        (wrap-operation (collect-lost-and-found-operation lost-and-found)
-                        (lambda (lost) (cons 'lost lost)))))
-      (define (control)
-       "The main event loop."
-       (control* (perform-operation loop-operation)))
-      (define (control* message)
-       (match message
-         (('start-get! get)
-          ;; Register the new get operation, such that we remember
-          ;; where to send responses to.
-          (hashv-set! id->operation-map (get:unique-id get)
-                      ((if (get:linger? get)
-                          make-strong-reference
-                          make-weak-reference) get))
+       ;; Self-check to make sure no information will be lost.
+       (assert (= (bbtree-size old-id->operation-map) 0))
+       (run-loop loop id->operation-map empty-bbtree))
+      (match message
+        (('start-get! get)
+        ;; Register the new get operation, such that we remember
+        ;; where to send responses to.
+        (let ((id->operation-map
+               (bbtree-set id->operation-map (get:unique-id get)
+                           ((if (get:linger? get)
+                                make-strong-reference
+                                make-weak-reference) get))))
           ;; (Asynchronuously) send the GET message.
-          (send-get! mq get)
+          (send-get! message-queue get)
           ;; Continue!
-          (control))
-         (('stop-search! get)
-          (process-stop-search get)
-          ;; Continue!
-          (control))
-         (('put! put)
-          ;; Send the put operation to the DHT service.
-          (send-message! mq (put:message put))
-          ;; Continue!
-          (control))
-         ;; Send by @code{request-search-result-iterator}.
-         (('request-search-result-iterator answer-box unique-id)
-          (answer answer-box
-                  (and=> (hashv-ref id->operation-map unique-id)
-                         dereference))
-          ;; Continue!
-          (control))
-         (('resend-old-operations!)
-          ;; Restart old operations.  Only get operations need to be submitted
-          ;; again.
-          ;;
-          ;; TODO: restarting monitoring operations
-          (for-each (lambda (reference)
-                      (define get (dereference reference))
-                      ;; If the (weak) reference is broken, that means the
-                      ;; operation is unreachable, so then there is no point
-                      ;; to resending the get operation.
-                      (when get
-                        (hashv-set! id->operation-map (get:unique-id get)
-                                    reference)
-                        (send-get! mq get)))
-                    ;; XXX: @code{hash-for-each} forms a continuation barrier,
-                    ;; so turn the hash table into a list before iterating.
-                    (hash-map->list (lambda (x reference) reference)
-                                    old-id->operation-map))
-          ;; Free some memory.
-          (hash-clear! old-id->operation-map)
-          ;; Continue!
-          (control))
-         ;; Some handles became unreachable and can be cancelled.
-         (('lost . lost)
-          (let loop ((lost lost))
-            (match lost
-              ;; Continue!
-              (() (control))
-              ((object . rest)
-               (match object
-                 ((? get? get)
-                  (process-stop-search get)
-                  (loop rest))
-                 ((? server:dht? server)
-                  (control* '(disconnect!))))))))
-         (rest (handle-control-message!
-                rest mq terminal-condition k/reconnect!))))
-      ;; Start the main event loop.
-      (control))))
+          (continue loop old-id->operation-map id->operation-map)))
+       (('stop-search! get)
+        (let^ ((<-- (old-id->operation-map id->operation-map)
+                    (process-stop-search old-id->operation-map
+                                         id->operation-map message-queue get)))
+              (continue loop old-id->operation-map id->operation-map)))
+       (('put! put)
+        ;; Send the put operation to the DHT service.
+        (send-message! message-queue (put:message put))
+        (continue/no-change))
+       ;; Send by @code{request-search-result-iterator}.
+       (('request-search-result-iterator answer-box unique-id)
+        (answer answer-box
+                (and=> (bbtree-ref id->operation-map unique-id) dereference))
+        (continue/no-change))
+       (('resend-old-operations!)
+        ;; Restart old operations.  Only get operations need to be submitted
+        ;; again.
+        ;;
+        ;; TODO: restarting monitoring operations
+        (continue loop empty-bbtree
+                  (bbtree-fold
+                   (lambda (id reference id->operation-map)
+                     (let^ ((! get (dereference reference))
+                            ;; If the (weak) reference is broken, that means
+                            ;; the operation is unreachable, so then there is
+                            ;; no point to resending the get operation.
+                            (? (not get) id->operation-map)
+                            (! id->operation-map
+                               (bbtree-set id->operation-map id reference)))
+                           (send-get! message-queue get)
+                           id->operation-map))
+                   id->operation-map old-id->operation-map)))
+       ;; Some handles became unreachable and can be cancelled.
+       (('lost . lost)
+        (let next ((lost lost) (old-id->operation-map old-id->operation-map)
+                   (id->operation-map id->operation-map))
+          (match lost
+            (() (continue loop old-id->operation-map id->operation-map))
+            ((object . rest)
+             (match object
+               ((? get? get)
+                (let^ ((<-- (old-id->operation-map id->operation-map)
+                            (process-stop-search old-id->operation-map
+                                                 id->operation-map
+                                                 message-queue get)))
+                      (next rest old-id->operation-map id->operation-map)))
+               ((? server:dht? server)
+                (continue* '(disconnect!) loop old-id->operation-map
+                           id->operation-map)))))))
+       (rest (handle-control-message!
+              rest message-queue (loop:terminal-condition loop)
+              k/reconnect!))))
+
+    (define empty-bbtree (make-bbtree <))))
diff --git a/gnu/gnunet/nse/client.scm b/gnu/gnunet/nse/client.scm
index 9487497..fda698f 100644
--- a/gnu/gnunet/nse/client.scm
+++ b/gnu/gnunet/nse/client.scm
@@ -36,10 +36,8 @@
          disconnect!
          estimate)
   (import (only (rnrs base)
-               begin define quote lambda case values expt = else apply
+               begin define quote lambda values expt = apply
                and >= let or nan?)
-         (only (rnrs control)
-               when unless)
          (only (rnrs records syntactic)
                define-record-type)
           (only (ice-9 atomic)
@@ -48,20 +46,12 @@
                match)
           (only (fibers)
                spawn-fiber)
-         (only (fibers conditions)
-               make-condition wait wait-operation signal-condition!)
-         (only (fibers operations)
-               choice-operation perform-operation wrap-operation)
-         (only (fibers channels)
-               get-operation)
          (only (gnu extractor enum)
                symbol-value value->index)
          (only (guile)
-               define* const)
+               lambda* define*)
          (only (gnu gnunet concurrency lost-and-found)
-               make-lost-and-found <losable>
-               losable-lost-and-found
-               collect-lost-and-found-operation)
+               losable-lost-and-found)
          (only (gnu gnunet util struct)
                /:message-header)
          (only (gnu gnunet utils bv-slice)
@@ -72,18 +62,12 @@
                message-handler
                message-handlers)
           (only (gnu gnunet mq)
-               send-message! close-queue!)
-          (only (gnu gnunet mq-impl stream)
-               connect/fibers)
-         (only (gnu gnunet mq error-reporting)
-               report-error)
+               send-message!)
           (gnu gnunet message protocols)
          (only (gnu gnunet server)
                <server> make-disconnect!
-               server-terminal-condition
-               server-control-channel
-               make-error-handler
-               handle-control-message!)
+               handle-control-message!
+               <loop> spawn-server-loop run-loop loop:terminal-condition)
           (only (gnu gnunet nse struct)
                /:msg:nse:estimate))
   (begin
@@ -135,77 +119,72 @@ timestamp."
     (define disconnect!
       (make-disconnect! 'network-size server:nse?))
 
+    (define-record-type (<loop:nse> make-loop:nse loop:nse?)
+      (parent <loop>)
+      (fields (immutable updated loop:updated)
+             (immutable estimate/box loop:estimate/box))
+      (protocol
+       (lambda (%make)
+        (lambda* (#:key (updated values) estimate/box #:allow-other-keys
+                  #:rest r)
+          ((apply %make r) updated estimate/box)))))
+
     ;; See 'connect'.  TODO: gc test fails
-    (define* (reconnect terminal-condition config
-                       control-channel lost-and-found
-                       estimate/box
-                       #:key
-                       updated connected disconnected spawn #:rest rest)
-      (define (handle-estimate! estimate-slice)
-       (define estimate
-         (%make-estimate
-          (read% /:msg:nse:estimate '(size-estimate) estimate-slice)
-          (read% /:msg:nse:estimate '(std-deviation) estimate-slice)
-          (read% /:msg:nse:estimate '(timestamp) estimate-slice)))
-       (atomic-box-set! estimate/box estimate)
-       (updated estimate))
-      (define handlers
-       (message-handlers
-        (message-handler
-         (type (symbol-value message-type msg:nse:estimate))
-         ((interpose code) code)
-         ((well-formed? slice)
-          (and (= (slice-length slice)
-                  (sizeof /:msg:nse:estimate '()))
-               ;; XXX: there is no test verifying these two expressions
-               ;; are present
-               (>= (read% /:msg:nse:estimate '(size-estimate) slice) 0)
-               ;; See <https://bugs.gnunet.org/view.php?id=7021#c18399> for
-               ;; situations in which the deviation can be infinite or NaN.
-               (let ((stddev
-                      (read% /:msg:nse:estimate '(std-deviation) slice)))
-                 (or (>= stddev 0)
-                     (nan? stddev)))))
-         ((handle! slice) (handle-estimate! slice)))))
-      (define (send-start!)
-       ;; The service only starts sending estimates once
-       ;; /:msg:nse:start is sent.
-       (define s (make-slice/read-write (sizeof /:message-header '())))
-       (set%! /:message-header '(size) s (sizeof /:message-header '()))
-       (set%! /:message-header '(type) s
-              (value->index (symbol-value message-type msg:nse:start)))
-       (send-message! mq s))
-      (define error-handler
-       (make-error-handler connected disconnected terminal-condition
-                           control-channel))
-      (define mq (connect/fibers config "nse" handlers error-handler
-                                #:spawn spawn))
+    (define* (handle-estimate! estimate-slice estimate/box updated)
+      (define estimate
+       (%make-estimate
+        (read% /:msg:nse:estimate '(size-estimate) estimate-slice)
+        (read% /:msg:nse:estimate '(std-deviation) estimate-slice)
+        (read% /:msg:nse:estimate '(timestamp) estimate-slice)))
+      (atomic-box-set! estimate/box estimate)
+      (updated estimate))
+
+    (define (make-message-handlers loop)
+      (message-handlers
+       (message-handler
+       (type (symbol-value message-type msg:nse:estimate))
+       ((interpose code) code)
+       ((well-formed? slice)
+        (and (= (slice-length slice)
+                (sizeof /:msg:nse:estimate '()))
+             ;; XXX: there is no test verifying these two expressions
+             ;; are present
+             (>= (read% /:msg:nse:estimate '(size-estimate) slice) 0)
+             ;; See <https://bugs.gnunet.org/view.php?id=7021#c18399> for
+             ;; situations in which the deviation can be infinite or NaN.
+             (let ((stddev (read% /:msg:nse:estimate '(std-deviation) slice)))
+               (or (>= stddev 0) (nan? stddev)))))
+       ((handle! slice) (handle-estimate! slice (loop:estimate/box loop)
+                                          (loop:updated loop))))))
+
+    (define (send-start! message-queue)
+      ;; The service only starts sending estimates once
+      ;; /:msg:nse:start is sent.
+      (define s (make-slice/read-write (sizeof /:message-header '())))
+      (set%! /:message-header '(size) s (sizeof /:message-header '()))
+      (set%! /:message-header '(type) s
+            (value->index (symbol-value message-type msg:nse:start)))
+      (send-message! message-queue s))
+
+    (define (control-message-handler message continue continue* message-queue
+                                    loop)
       (define (k/reconnect!)
-       (apply reconnect terminal-condition config control-channel 
lost-and-found estimate/box rest))
-      (define loop-operation
-       (choice-operation
-        (get-operation control-channel)
-        (wrap-operation (collect-lost-and-found-operation lost-and-found)
-                        (lambda (ourself) 'lost)))) ; it will only be 
performed once, so no need to recompute it
-      (define (control)
-       "The main event loop."
-       (control* (perform-operation loop-operation)))
-      (define (control* message)
-       (match message
-         (('resend-old-operations!)
-          (send-start!)
-          (control)) ; continue
-         ('lost
-          ;; We lost ourselves, that means the server became unreachable.
-          ;; The presence of this line is tested by the "garbage collectable"
-          ;; test.
-          (control* '(disconnect!)))
-         (rest (handle-control-message! message mq terminal-condition 
k/reconnect!))))
-      ;; Start main the event loop.
-      (control))
-
-    (define* (connect config #:key (updated values) (connected values)
-                     (disconnected values) (spawn spawn-fiber))
+       (run-loop loop))
+      (match message
+        (('resend-old-operations!)
+        (send-start! message-queue)
+        (continue loop))
+       (('lost . _)
+        ;; We lost ourselves, that means the server became unreachable.
+        ;; The presence of this line is tested by the "garbage collectable"
+        ;; test.
+        (continue* '(disconnect!) loop))
+       (rest
+        (handle-control-message! message message-queue
+                                 (loop:terminal-condition loop) 
k/reconnect!))))
+
+    (define* (connect config #:key updated connected disconnected spawn
+                     #:rest r)
       "Connect to the NSE service in the background.
 
 When connected, the thunk @var{connected} is called and estimates
@@ -218,15 +197,9 @@ shortly after calling @var{disconnected}.
 
 The procedures @var{updated}, @var{connected} and @var{disconnected} are 
optional."
       (define server (%make-server))
-      (spawn-procedure spawn (server-terminal-condition server) config
-                      (server-control-channel server)
-                      (losable-lost-and-found server)
-                      (server-estimate/box server)
-                      #:updated updated
-                      #:connected connected
-                      #:disconnected disconnected
-                      #:spawn spawn)
-      server)
-
-    (define (spawn-procedure spawn . rest)
-      (spawn (lambda () (apply reconnect rest))))))
+      (apply spawn-server-loop server #:make-loop make-loop:nse
+            #:make-message-handlers make-message-handlers
+            #:control-message-handler control-message-handler
+            #:service-name "nse"
+            #:configuration config
+            #:estimate/box (server-estimate/box server) r))))
diff --git a/gnu/gnunet/server.scm b/gnu/gnunet/server.scm
index e1dd031..85e28c5 100644
--- a/gnu/gnunet/server.scm
+++ b/gnu/gnunet/server.scm
@@ -21,30 +21,44 @@
 (define-library (gnu gnunet server)
   (export maybe-send-control-message!* maybe-send-control-message!
          maybe-ask* answer
-         make-error-handler
+         make-error-handler make-error-handler*/loop
          <server> server-terminal-condition server-control-channel
          make-disconnect!
-         handle-control-message!)
+         handle-control-message!
+         <loop> make-loop
+         loop:connected loop:disconnected loop:terminal-condition
+         loop:control-channel loop:configuration loop:service-name
+         loop:spawner loop:lost-and-found run-loop spawn-server-loop)
   (import (only (rnrs base)
-               begin define case else apply values quote lambda
-               if error list let and)
+               begin define cons case else apply values quote lambda
+               if error list let and append assert string? procedure?
+               list? symbol?)
          (only (rnrs records syntactic)
                define-record-type)
+         (only (fibers)
+               spawn-fiber)
          (only (fibers conditions)
-               make-condition wait-operation signal-condition!)
+               make-condition wait-operation signal-condition! condition?)
          (only (fibers channels)
-               make-channel put-operation put-message get-message)
+               make-channel put-operation get-operation put-message
+               get-message channel?)
+         (only (gnu gnunet config db)
+               configuration?)
          (only (fibers operations)
                choice-operation perform-operation wrap-operation)
          (only (gnu gnunet concurrency lost-and-found)
                make-lost-and-found collect-lost-and-found-operation
-               losable-lost-and-found)
+               losable-lost-and-found lost-and-found?)
          (only (gnu gnunet mq)
-               close-queue!)
+               message-queue? close-queue!)
          (only (gnu gnunet mq error-reporting)
                report-error)
+          (only (gnu gnunet mq-impl stream)
+               connect/fibers)
          (only (ice-9 match)
-               match))
+               match)
+         (only (guile)
+               lambda* define*))
   (begin
     ;; Define them here to avoid creating these objects multiple times.
     (define thunk-false (lambda () #false))
@@ -58,6 +72,8 @@ This sends a @var{message} to @var{control-channel} or waits 
for
 @var{terminal-condition} to be signalled, whichever happens first.
 If the message is sent, @code{#true} is returned.  Otherwise, if
 @var{terminal-condition} was signalled, return @code{#false} instead."
+      (assert (condition? terminal-condition))
+      (assert (channel? control-channel))
       (perform-operation
        (choice-operation
        ;; Nothing to do when the <server> is permanently disconnected,
@@ -122,6 +138,11 @@ The type of @var{answer-box} is an implementation detail."
           (values))))
       error-handler)
 
+    (define (make-error-handler*/loop loop . _)
+      (make-error-handler
+       (loop:connected loop) (loop:disconnected loop)
+       (loop:terminal-condition loop) (loop:control-channel loop)))
+
     (define-record-type (<server> %make-server server?)
       (parent <losable>) ; for automatic fibers disposal when the <server> is 
unreachable
       ;; terminal-condition: a disconnect has been requested.
@@ -133,27 +154,62 @@ The type of @var{answer-box} is an implementation detail."
                     (make-condition)
                     (make-channel))))))
 
-    (define (primitive-disconnect! server)
-      "Asynchronuously disconnect from the service and stop reconnecting,
-even if not connected.  This is an idempotent operation.  This is an
-asynchronuous request; it won't be fulfilled immediately.
-
-This maybe-sends @code{disconnect!} to the control channel."
-      (maybe-send-control-message! server 'disconnect!))
-
     (define (make-disconnect! name type?)
       ;; for backtrace purposes, 'lambda' is not used here.
       (define (disconnect! server)
        "Asynchronuously disconnect from the service and stop reconnecting,
 even if not connected.  This is an idempotent operation.  This is an
-asynchronuous request; it won't be fulfilled immediately."
+asynchronuous request; it won't be fulfilled immediately.  More technically,
+this maybe-sends @code{disconnect!} to the control channel."
        (if (type? server)
-           (primitive-disconnect! server)
+           (maybe-send-control-message! server 'disconnect!)
            (error 'disconnect! ; TODO: test
                   "wrong server object type"
                   (list name type? server))))
+      (assert (symbol? name)) ; XXX not sure which
+      (assert (procedure? type?))
       disconnect!)
 
+    
+    ;; Originally, lots of keyword arguments were passed, but having a single
+    ;; structure with all the persistent state is more convenient.
+    (define-record-type (<loop> make-loop loop?)
+      (fields (immutable service-name loop:service-name)
+             (immutable control-message-handler loop:control-message-handler)
+             (immutable make-message-handlers loop:message-handlers-maker)
+             (immutable make-error-handler loop:error-handler*-maker)
+             (immutable configuration loop:configuration)
+             (immutable spawn loop:spawner) ; like spawn-fiber
+             ;; string (e.g. "dht", "cadet", ...)
+             (immutable terminal-condition loop:terminal-condition) ; condition
+             (immutable control-channel loop:control-channel) ; <channel>
+             (immutable lost-and-found loop:lost-and-found)
+             (immutable connected loop:connected)
+             (immutable disconnected loop:disconnected))
+      (protocol
+       (lambda (%make)
+        (lambda* (#:key service-name control-message-handler
+                  make-message-handlers
+                  (make-error-handler* make-error-handler*/loop)
+                  configuration (spawn spawn-fiber) terminal-condition
+                  control-channel lost-and-found (connected values)
+                  (disconnected values)
+                  #:allow-other-keys)
+         (assert (string? service-name))
+         (assert (procedure? control-message-handler))
+         (assert (procedure? make-message-handlers))
+         (assert (configuration? configuration))
+         (assert (procedure? spawn))
+         (assert (condition? terminal-condition))
+         (assert (channel? control-channel))
+         (assert (lost-and-found? lost-and-found))
+         (assert (procedure? connected))
+         (assert (procedure? disconnected))
+         (%make service-name control-message-handler
+                make-message-handlers make-error-handler* configuration
+                spawn terminal-condition control-channel
+                lost-and-found connected disconnected)))))
+
     (define (handle-control-message! message mq terminal-condition 
k/reconnect!)
       "The following messages are handled:
 
@@ -164,6 +220,9 @@ asynchronuous request; it won't be fulfilled immediately."
 @item reconnect!, by calling the thunk @var{k/reconnect} in tail position
 
 TODO: maybe 'lost'"
+      (assert (message-queue? mq))
+      (assert (condition? terminal-condition))
+      (assert (procedure? k/reconnect!))
       (match message
         (('oops! key . arguments)
         ;; Some unknown error, report it (report-error) and close
@@ -184,4 +243,51 @@ TODO: maybe 'lost'"
         (values))
        (('reconnect!)
         ;; Restart the loop with a new message queue.
-        (k/reconnect!))))))
+        (k/reconnect!))))
+
+    ;; TODO: document, check types
+    ;; state: <loop>
+    (define (run-loop state . rest)
+      (assert (loop? state))
+      (define handlers (apply (loop:message-handlers-maker state) state rest))
+      (define error-handler
+       (apply (loop:error-handler*-maker state) state rest))
+      (define message-queue
+       (connect/fibers (loop:configuration state) (loop:service-name state)
+                       handlers error-handler #:spawn (loop:spawner state)))
+      (define loop-operation
+       (choice-operation
+        (get-operation (loop:control-channel state))
+        (wrap-operation
+         ;; TODO: wasn't it required to recreate this operation each
+         ;; time something was found?
+         (collect-lost-and-found-operation (loop:lost-and-found state))
+         (lambda (lost) (cons 'lost lost)))))
+      (define (continue* message state . rest)
+       ;; Let @var{control-message-handler} handle the message.
+       ;; It can decide to continue with @var{continue} or @var{continue*},
+       ;; in continuation-passing style.
+       (apply (loop:control-message-handler state) message continue continue*
+              message-queue state rest))
+      (define (continue state . rest)
+       "The main event loop."
+       (apply continue* (perform-operation loop-operation) state rest))
+      (apply continue state rest))
+
+    (define* (spawn-server-loop server #:key (make-loop make-loop)
+                               (initial-extra-loop-arguments '())
+                               (spawn spawn-fiber) #:allow-other-keys #:rest r)
+      (assert (server? server))
+      (assert (procedure? make-loop))
+      (assert (list? initial-extra-loop-arguments))
+      (assert (procedure? spawn))
+      "[TODO] and return @var{server}"
+      (define loop-arguments
+       (append (list #:terminal-condition (server-terminal-condition server)
+                     #:control-channel (server-control-channel server)
+                     #:lost-and-found (losable-lost-and-found server))
+               r))
+      (spawn (lambda ()
+              (apply run-loop (apply make-loop loop-arguments)
+                     initial-extra-loop-arguments)))
+      server)))

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]