gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 130/324: mq: New module, replacing message-io.


From: gnunet
Subject: [gnunet-scheme] 130/324: mq: New module, replacing message-io.
Date: Tue, 21 Sep 2021 13:22:50 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit ab8f8d42a81db198d2014e29f5319d1912bbdab8
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Sun May 30 22:53:49 2021 +0200

    mq: New module, replacing message-io.
    
    The old module was rather inconvenient in usage.
    Some TODOs: message cancellation, message handlers,
    closing queues, error handling, fixing a guile-fibers
    bug ...
    
    * gnu/gnunet/mq.scm: New module.
    * gnu/gnunet/mq/envelope.scm: Export bind-atomic-boxen
      for (gnu gnunet mq), pending a move into a separate module.
    * tests/mq.scm: Test the new module.
      The first test is based on a test from upstream.
---
 Makefile.am                |   3 +-
 gnu/gnunet/mq.scm          | 232 ++++++++++++++++++++
 gnu/gnunet/mq/envelope.scm |   4 +-
 tests/mq.scm               | 520 +++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 757 insertions(+), 2 deletions(-)

diff --git a/Makefile.am b/Makefile.am
index 8b15711..4ab6680 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -41,7 +41,7 @@ modules = \
   gnu/gnunet/mq/handler.scm \
   gnu/gnunet/mq/prio-prefs.scm \
   gnu/gnunet/mq/prio-prefs2.scm \
-  gnu/gnunet/mq/message-io.scm \
+  gnu/gnunet/mq.scm \
   \
   gnu/gnunet/utils/bv-slice.scm \
   gnu/gnunet/utils/hat-let.scm \
@@ -98,6 +98,7 @@ SCM_LOG_DRIVER = \
 SCM_TESTS = \
   tests/envelope.scm \
   tests/message-handler.scm \
+  tests/mq.scm \
   tests/update.scm \
   tests/message-io.scm \
   tests/bv-slice.scm \
diff --git a/gnu/gnunet/mq.scm b/gnu/gnunet/mq.scm
new file mode 100644
index 0000000..5884f0e
--- /dev/null
+++ b/gnu/gnunet/mq.scm
@@ -0,0 +1,232 @@
+;; This file is part of GNUnet.
+;; Copyright (C) 2012-2019 GNUnet e.V.
+;; Copyright (C) 2021 Maxime Devos (<maximedevos@telenet.be>)
+;;
+;; GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+;; Author: Florian Dold
+;; Author: Maxime Devos
+;; C file: util/mq.c
+;; Scheme module: (gnu gnunet mq)
+;;
+;; A message queue for GNUnet messages.
+;; Messages are made of bytes. In particular,
+;; messages must be prefixed by a /:message-header.
+;;
+;; TODO cancelling, injecting messages, message handlers ...
+;; These are not implemented yet or untested, or need
+;; more documentation!
+(define-library (gnu gnunet mq)
+  (export <message-queue> make-message-queue message-queue?
+         make-one-by-one-sender
+         inject-message! send-message!
+         message-queue-length
+         try-send-again!)
+  (import (gnu gnunet mq handler)
+         (gnu gnunet mq envelope)
+         (gnu gnunet utils hat-let)
+         (only (gnu gnunet utils bv-slice)
+               slice?)
+         (only (gnu gnunet util struct)
+               /:message-header)
+         (only (gnu gnunet netstruct syntactic)
+               sizeof read%)
+         (only (guile) define* exact-integer?)
+         (only (ice-9 atomic)
+               make-atomic-box atomic-box-ref
+               atomic-box-compare-and-swap!)
+         (only (rnrs base)
+               lambda assert let begin define
+               procedure? eq? >= = <= < if quote
+               values and let*)
+         (only (rnrs control)
+               when unless)
+         (only (rnrs conditions)
+               define-condition-type &warning
+               make-who-condition condition)
+         (only (rnrs exceptions)
+               raise-continuable)
+         (only (rnrs records syntactic) define-record-type)
+         (only (srfi srfi-8) receive)
+         (prefix (only (pfds queues)
+                       make-queue dequeue enqueue queue-length
+                       queue-empty?)
+                 #{pfds:}#))
+  (begin
+    (define-record-type (<message-queue> make-message-queue message-queue?)
+      (fields (immutable handlers message-queue-handler)
+             (immutable error-handler message-queue-error-handler)
+             ;; Atomic box of a queue of messages to send
+             ;; (as @code{<envelope>} objects).
+             (immutable messages/box message-queue-messages/box)
+             ;; A procedure for actually sending the messages.
+             ;; It accepts a single argument, the message queue.
+             ;;
+             ;; It is run each time a message a message has been
+             ;; enqueued. It is not obligated to send the messages
+             ;; right now, though it probably should send them
+             ;; soonish. It can be run at any time, with
+             ;; @code{try-send-again!}.
+             (immutable sender message-queue-sender))
+      (protocol
+       (lambda (%make)
+        (lambda (handlers error-handler sender)
+          "Make a message queue with message handlers @var{handlers}.
+
+The message handlers are expected to handle bytevector slices
+that start with a @code{/:message-header}.
+
+XXX and error handler @var{error-handler}
+
+Messages are sent with @var{sender}. It can be created with
+@code{make-one-by-one-sender}."
+          ;; Predicate does not exist yet ...
+          #;(assert (message-handlers? handlers))
+          #;(assert (message-handler? error-handler))
+          (%make handlers error-handler
+                 (make-atomic-box (pfds:make-queue))
+                 sender)))))
+
+    (define (make-one-by-one-sender proc)
+      "Make a message sender, sending messages one-by-one with @var{proc}.
+
+The procedure @var{proc} must accept a single argument,
+the message envelope to send. This procedure should
+use @code{attempt-irrevocable-sent!} when it feels ready.
+It must not return any values currently.
+
+The message does not need to be send directly.
+However, remember that unless the priority allows otherwise,
+messages must be sent in-order (TODO really received in-order?)."
+      (assert (procedure? proc))
+      (lambda (mq)
+       (assert (message-queue? mq))
+       (%%bind-atomic-boxen
+        ((queue (message-queue-messages/box mq) swap!))
+        ;; First extract an envelope ...
+        (let spin ((old queue))
+          ;; ... unless there isn't anything to remove anymore.
+          ;; This check cannot be moved outside the (let spin ...),
+          ;; as message senders may be called at any time
+          ;; (even if there are no messages!). Also, in case of
+          ;; concurrency, the queue may become empty after a spin
+          ;; iteration.
+          (unless (pfds:queue-empty? old)
+            (receive (envelope new) (pfds:dequeue old)
+              (when (eq? old (swap! old new))
+                ;; We extracted a message. Now do something
+                ;; with it!
+                ;;
+                ;; Make sure @var{proc} does not return
+                ;; any values, as we may want to assign
+                ;; meaning to return values later.
+                (receive ()
+                    ;; Process the message.
+                    (proc envelope)
+                  'nothing))
+              ;; Process the remaining messages
+              ;; / Someone modified the message queue
+              ;; before us; retry.
+              ;;
+              ;; TODO: if someone else modified the message queue,
+              ;; does that mean we don't have to anymore?
+              (spin queue)))))))
+
+    (define (inject-message! mq message)
+      "Call the message handler that was registered
+for the type of the message @var{mq} in the message queue var{mq}
+with the mesage @var{message}. In case the message is malformed
+(according to the message handler), inject a @code{&malformed-message}
+error instead.
+
+This procedure is intended to be used by the implementation
+of message queues."
+      (let* ((header (slice-slice message 0 (sizeof /:message-header '(type))))
+            (handler (message-handler-for
+                      (message-queue-handlers mq)
+                      (read% /:message-header '(type) header))))
+       (if (verify-message? handler message)
+           ;; TODO: maybe a good place to catch out-of-memory
+           ;; and stack overflow errors ...
+           (handle-message! handler message)
+           ;; XXX
+           (inject-error! handler message))))
+
+    (define (message-queue-length mq)
+      "How many messages are currently in the message queue @var{mq}?"
+      (pfds:queue-length
+       (atomic-box-ref (message-queue-messages/box mq))))
+
+    (define-condition-type &overly-full-queue-warning &warning
+      make-overly-full-queue-warning overly-full-queue-warning?
+      (current-length  overly-full-queue-length)
+      (suspicious-when overly-full-queue-suspicious-when))
+
+    (define %suspicious-length 10000)
+
+    (define* (send-message! mq message #:key (priority 0)
+                           (notify-sent! values))
+      "Send a message with the given message queue. A continuable
+@code{&warning} may be raised, e.g. a @code{&overly-full-queue-warning}
+in case the queue is suspiciously long. The message queue implementation
+can raise errors of its own as well, as usual.
+
+@var{priority} is a numeric priority-preference value for @var{message},
+from @code{(gnu gnunet mq prio-prefs)}. By default, @var{message} will be
+sent as reliable background traffic (@code{prio:background}).
+
+The in-order sending of ordered messages (when requested by @var{priority})
+is only guaranteed when supported by the message queue implementation
+and when @code{try-send-again!} and @code{send-message!} are not being
+used concurrently on the same message queue.
+
+When the message has been irrevocabily sent, the thunk @var{notify-sent!}
+will be called."
+      (define (cancel!)
+       (assert (and #f "cancel! not yet implemented")))
+      (assert (and (slice? message)
+                  (exact-integer? priority)
+                  (<= 0 priority) (< priority 512)))
+      (%%bind-atomic-boxen
+       ((queue (message-queue-messages/box mq) swap-queue!))
+       ;; Add the message to the queue. Also remember the
+       ;; length of the new queue; we'll need it later.
+       (let* ((envelope (make-envelope cancel!
+                                      message
+                                      #:priority priority
+                                      #:notify-sent! notify-sent!))
+             (queue-length
+              (let spin ((old queue))
+                (let ((new (pfds:enqueue old envelope)))
+                  (if (eq? old (swap-queue! old new))
+                      (pfds:queue-length new)
+                      (spin queue))))))
+        ;; The C implementation emits a warning if the queue has
+        ;; many entries, as this may indicate a bug (in the scheduler,
+        ;; in the queue implementation, ...). This seems a good idea.
+        (when (>= queue-length 10000)
+          (raise-continuable
+           (condition (make-overly-full-queue-warning
+                       queue-length %suspicious-length)
+                      ;; TODO: consider
+                      ;; (@ (gnu gnunet mq) send!) here and elsewhere.
+                      (make-who-condition 'send!))))
+        (try-send-again! mq))))
+
+    (define (try-send-again! mq)
+      "Try to send messages in the queue @var{mq} that were not yet sent.
+This is expected to be called from the message queue implementation."
+      ((message-queue-sender mq) mq))))
diff --git a/gnu/gnunet/mq/envelope.scm b/gnu/gnunet/mq/envelope.scm
index e0c94a2..76d91ea 100644
--- a/gnu/gnunet/mq/envelope.scm
+++ b/gnu/gnunet/mq/envelope.scm
@@ -26,7 +26,9 @@
 ;; so no type checks there.
 (define-library (gnu gnunet mq envelope)
   (export <envelope> make-envelope envelope?
-         attempt-cancel! attempt-irrevocable-sent!)
+         attempt-cancel! attempt-irrevocable-sent!
+         ;; TODO find a better place
+         (rename (bind-atomic-boxen %%bind-atomic-boxen)))
   (import (gnu gnunet utils hat-let)
          (only (guile) define* lambda* exact-integer?)
          (only (ice-9 match) match)
diff --git a/tests/mq.scm b/tests/mq.scm
new file mode 100644
index 0000000..bc05f9d
--- /dev/null
+++ b/tests/mq.scm
@@ -0,0 +1,520 @@
+;; This file is part of GNUnet.
+;; Copyright (C) 2012, 2018 GNUnet e.V.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program.  If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+;; Author: Florian Dold
+;; Author: Christian Grothoff
+;; Author: Maxime Devos
+
+(define-module (tests mq))
+
+(use-modules (ice-9 control)
+            (fibers conditions)
+            (fibers)
+            (srfi srfi-26)
+            (srfi srfi-43)
+            (srfi srfi-64)
+            (srfi srfi-111)
+            ((rnrs base) #:select (assert mod))
+            ((rnrs arithmetic bitwise)
+             #:select (bitwise-ior))
+            (gnu gnunet netstruct syntactic)
+            ((gnu gnunet netstruct procedural)
+             #:select (u32/big))
+            (gnu gnunet mq prio-prefs)
+            (gnu gnunet mq prio-prefs2)
+            (gnu gnunet util struct)
+            (gnu gnunet utils bv-slice)
+            ((gnu extractor enum)
+             #:select (symbol-value value->index))
+            (gnu gnunet message protocols)
+            (gnu gnunet mq)
+            (gnu gnunet mq envelope)
+            (gnu gnunet mq handler))
+
+;; The client code sends the numbers 0 to
+;; NUM_TRANSMISSIONS-1 over the message queue.
+;; The notify-sent callback verifies whether
+;; messages were sent in-order. The fake
+;; ‘sender’ procedure verifies whether it received
+;; the messages in order.
+;;
+;; Note that in more realistic situations, some
+;; queueing can happen! A very special case
+;; is being tested here.
+
+(define NUM_TRANSMISSIONS 100)
+
+(eval-when (expand load eval)
+  (define-type /:msg:our-test:dummy
+    (structure/packed
+     (synopsis "A test message, containing an index")
+     (documentation
+      "The first time, a message with index 0 is sent.
+Then each time the index is increased.")
+     (field (header /:message-header))
+     (field (index u32/big)))))
+
+(define (index->dummy i)
+  (let ((s (make-slice/read-write
+           (sizeof /:msg:our-test:dummy '()))))
+    (set%! /:msg:our-test:dummy '(header type) s
+          (value->index (symbol-value message-type msg:util:dummy)))
+    (set%! /:msg:our-test:dummy '(header size) s
+          (sizeof /:msg:our-test:dummy '()))
+    (set%! /:msg:our-test:dummy '(index) s i)
+    s))
+
+(define (dummy->index s)
+  (read% /:msg:our-test:dummy '(index) s))
+
+(define (client mq notify-sent-box sent-box)
+  (define (see i)
+    (if (= i (unbox notify-sent-box))
+       (set-box! notify-sent-box (+ 1 i))
+       (error "messages were sent out-of-order (index: ~a) (notify-sent: ~a) 
(sent: ~a)"
+              i
+              (unbox notify-sent-box)
+              (unbox sent-box))))
+  (do ((i 0 (+ 1 i)))
+      ((>= i NUM_TRANSMISSIONS))
+    (send-message! mq (index->dummy i)
+                  #:notify-sent! (cut see i))))
+
+(define (send-proc notify-sent-box sent-box envelope)
+  (attempt-irrevocable-sent!
+   envelope
+   ((go message priority)
+    (let ((index (dummy->index message)))
+      (unless (= (+ index 1) (unbox notify-sent-box))
+       (error "messages are being sent out-of-order or with queueing (index: 
~a) (notify-sent: ~a) (sent: ~a)"
+              index
+              (unbox notify-sent-box)
+              (unbox sent-box)))
+      (unless (= index (unbox sent-box))
+       (error "dunno (index: ~a) (notify-sent: ~a) (sent: ~a)"
+              index
+              (unbox notify-sent-box)
+              (unbox sent-box)))
+      (set-box! sent-box (+ 1 index))
+      (values)))
+   ((cancelled)
+    (error "how did this cancelling happen?"))
+   ((already-sent)
+    (error "forgot to remove envelope from queue"))))
+
+(define no-handlers (message-handlers))
+(define (no-error-handler . what)
+  (error "were did this error come from?"))
+
+(test-equal "in-order, no queuing"
+  (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS)
+  (let* ((notify-sent-box (box 0))
+        (sent-box (box 0))
+        (mq (make-message-queue no-handlers
+                                no-error-handler
+                                (make-one-by-one-sender
+                                  (cut send-proc notify-sent-box sent-box 
<>)))))
+    (client mq notify-sent-box sent-box)
+    (list (unbox notify-sent-box) (unbox sent-box))))
+
+
+
+;; Simulate buffering, by only ‘truly’ sending after each three messages.
+;; This does _not_ test the queueing code! See the next test for that.
+;; Make sure messages aren't lost, and they are still be sent in-order!
+;;
+;; (Assuming the sender is well-implemented. A buggy sender could send
+;; things out-of-order.)
+
+(define (send-proc2 notify-sent-box sent-box mod-box stashed envelope)
+  (let ((first-free (vector-index not stashed))
+       (expected-filled (unbox mod-box)))
+    (unless (= (or first-free 0) expected-filled)
+      (error "did we lose a message?"))
+    (set-box! mod-box (mod (+ 1 expected-filled) (vector-length stashed)))
+    (if (not first-free)
+       (begin
+         (vector-map!
+          (lambda (i envelope)
+            (send-proc notify-sent-box sent-box envelope)
+            #f)
+          stashed)
+         (vector-set! stashed 0 envelope))
+       ;; @var{stashed} is not yet full; send the
+       ;; envelope later!
+       (vector-set! stashed first-free envelope))
+    (values)))
+
+(define (expected-sent n k)
+  (- n (let ((mod (mod n k)))
+        (if (= mod 0)
+            k
+            mod))))
+
+(define k 3)
+
+(test-equal "in-order, some buffering"
+  (map (cut expected-sent <> 3)
+       (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
+  (let* ((notify-sent-box (box 0))
+        (sent-box (box 0))
+        (mod-box (box 0))
+        (stashed (make-vector k #f))
+        (mq (make-message-queue no-handlers
+                                no-error-handler
+                                (make-one-by-one-sender
+                                  (cut send-proc2 notify-sent-box sent-box 
mod-box stashed <>)))))
+    (client mq notify-sent-box sent-box)
+    (list (unbox notify-sent-box) (unbox sent-box))))
+
+
+
+;; Test the queueing code by only flushing
+;; the queue every N messages. Also check,
+;; using flushing-allowed?, that sending
+;; only happens when we expect it to happen.
+
+(define flushing-allowed?
+  (make-parameter #f))
+
+(define (send-proc/check notify-sent-box sent-box envelope)
+  (assert (flushing-allowed?))
+  (send-proc notify-sent-box sent-box envelope))
+
+(define (make-every-n proc k)
+  "Make a sender using @var{proc} every @var{k}
+invocations, and at other times doing nothing."
+  ;; Should theoretically be an atomic, but the test is singly-threaded,
+  ;; so don't bother.
+  (define n-mod-k 0)
+  (lambda (mq)
+    (assert (not (flushing-allowed?)))
+    (set! n-mod-k (+ 1 n-mod-k))
+    (when (>= n-mod-k k)
+      (set! n-mod-k 0)
+      (parameterize ((flushing-allowed? #t))
+       (proc mq)))))
+
+(test-equal "in-order, some queueing"
+  (map (cut expected-sent <> 3)
+       (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
+  (let* ((notify-sent-box (box 0))
+        (sent-box (box 0))
+        (mq (make-message-queue no-handlers
+                                no-error-handler
+                                (make-every-n
+                                 (make-one-by-one-sender
+                                  (cut send-proc/check notify-sent-box 
sent-box <>))
+                                 3))))
+    (client mq notify-sent-box sent-box)
+    (list (unbox notify-sent-box) (unbox sent-box))))
+
+
+
+;; Test that concurrency interacts well with queueing.
+;;
+;; The situation we consider, is a number
+;; of different threads concurrently sending messages.
+;; The test verifies whether all messages were, in fact, sent.
+;;
+;; To make things complicated, some queueing is introduced.
+;; The sender will only proceed each time the current thread
+;; has tried to send @var{k/thread} messages, and the sender
+;; will only try to send at most @code{(+ k/thread e)}, where
+;; @var{e} is a random number from @var{e/min} to @var{e/max}.
+
+;; The tests detect the following potential problems in the code
+;; by crashing (but not always, so you may need to re-run a few
+;; times, three times tends to be enough in practice for me):
+;;
+;;  * Replacing 'old' with 'queue' in
+;;    unless (pfds:queue-empty? old)
+;;  * Replacing 'old' with 'queue' in
+;;    receive (envelope new) (pfds:dequeue old)
+;;  * Replacing the first 'old' with 'queue' in
+;;    (eq? old (swap! old new)), in 'make-one-by-one-sender'
+;;  * Replacing the second 'old' with 'queue' in
+;;    (eq? old (swap! old new)), in 'make-one-by-one-sender'
+;;  * Replacing 'old' by 'queue' in
+;;    (pfds:enqueue old envelope)
+;;    (only detected infrequently, odds 1 to 7 or so)
+;;  * Replacing the first 'old' by 'queue' in
+;;    (eq? old (swap-queue! old new))
+;;    in 'send-message!'
+;;  * Replacing the second 'old' by 'queue' in
+;;    (eq? old (swap-queue! old new))
+;;    in 'send-message!'
+;;
+;; The following problems cause a hang when testing:
+;;  * Replacing 'queue' by 'old' in (spin queue)
+;;    in 'make-one-by-one-sender'
+;;  * Replacing 'queue' by 'old' in (spin queue)
+;;    in 'send-message!'.
+;;
+;; The following problems cause a hang in a preceding
+;; test:
+;;
+;;  * Replacing the first 'old' by 'new' in
+;;    (eq? old (swap-queue! old new))
+;;    in 'send-message!'
+;;  * Replacing 'queue' by 'old' in
+;;    (spin queue)
+;;    in 'send-message!'
+;;  * Replacing 'queue' by 'new' in
+;;    (spin queue)
+;;    in 'send-message!'
+;;
+;; Some potential problems currently remain undetected:
+;;  * Replacing the 'new' by 'queue' in
+;;    (pfds:queue-length new)
+;;
+;;    However, it is only for printing a warning
+;;    when the queue is rather full. Being slightly
+;;    off in queue length shouldn't be a problem
+;;    there, as the 'maximum reasonable bound'
+;;    is just a wild guess and not some exact
+;;    cut-off.
+
+(define random/thread
+  (fluid->parameter (make-unbound-fluid)))
+(define k/thread 12)
+(define e/min 2)
+(define e/max 7)
+(define N_MESSAGES 1000)
+(define N_THREAD 40)
+
+;; List of (thread-index . message-index)
+;; received by current thread.
+(define received/thread
+  (fluid->parameter (make-unbound-fluid)))
+(define i/thread
+  (fluid->parameter (make-unbound-fluid)))
+
+;; The sending is happening concurrently,
+;; so in-order delivery cannot be guaranteed.
+;; Thus, requesting in-order delivery seems
+;; silly.
+(define prio
+  (bitwise-ior
+   (prio->integer 'prio:background)
+   (value->index (symbol-value priority-preference
+                              pref:out-of-order))))
+
+(eval-when (expand load eval)
+  (define-type /:msg:our-test:concurrency
+    (structure/packed
+     (synopsis "A test message, containing an thread and message index")
+     (documentation
+      "The first time, a message with index 0 is sent.
+Then each time the index is increased.")
+     (field (header /:message-header))
+     (field (index u32/big))
+     (field (thread u32/big)))))
+
+(define (make-thread-message thread-index i)
+  (let ((s (make-slice/read-write
+           (sizeof /:msg:our-test:concurrency '()))))
+    (set%! /:msg:our-test:concurrency '(header type) s
+          (value->index (symbol-value message-type msg:util:dummy)))
+    (set%! /:msg:our-test:concurrency '(header size) s
+          (sizeof /:msg:our-test:concurrency '()))
+    (set%! /:msg:our-test:concurrency '(index) s i)
+    (set%! /:msg:our-test:concurrency '(thread) s thread-index)
+    s))
+
+(define (decode-thread-message s)
+  (cons (read% /:msg:our-test:concurrency '(thread) s)
+       (read% /:msg:our-test:concurrency '(index) s)))
+
+
+(define (make-every-n/thread proc k)
+  "Make a sender using @var{proc} every @var{k}
+invocations, and at other times doing nothing.
+@code{i/thread} is used for state."
+  (lambda (mq)
+    (assert (not (flushing-allowed?)))
+    (i/thread (+ 1 (i/thread)))
+    (when (>= (i/thread) k)
+      (i/thread 0)
+      (parameterize ((flushing-allowed? #t))
+       (proc mq)))))
+
+(define (thread mq thread-index)
+  (pk 'j thread-index)
+  (parameterize ((received/thread '())
+                (i/thread 0)
+                (random/thread
+                 (seed->random-state thread-index)))
+    (do ((i 0 (+ 1 i)))
+       ((>= i N_MESSAGES))
+      (send-message! mq (make-thread-message thread-index i)
+                    #:priority prio))
+    ;; If you try to debug "the draining bug bites you",
+    ;; notice that while there is (j 0) ... (j (- N_THREAD 1))
+    ;; in the output, some (i index) are missing.
+    ;;
+    ;; A bug in guile-fibers?
+    (pk 'i thread-index)
+    ;; See "the draining bug bites you".
+    ;; This assertion never happens FWIW.
+    (assert (or (pair? (received/thread))
+               (null? (received/thread))))
+    (received/thread)))
+
+(define (make-restricted-sender how-many make-sender inner-proc)
+  "Make a sender that, when called, tries to send @code{(how-many)}
+messages, using @var{make-sender} and @var{inner-proc}."
+  (define escape-thunk
+    (fluid->parameter (make-unbound-fluid)))
+  (define count
+    (fluid->parameter (make-unbound-fluid)))
+  (define max-count
+    (fluid->parameter (make-unbound-fluid)))
+  (define (count!)
+    (count (+ 1 (count)))
+    (when (= (count) (max-count))
+      (count 0)
+      ((escape-thunk))))
+  (lambda (mq)
+    (let/ec ec
+      (parameterize ((max-count (how-many))
+                    (count 0)
+                    (escape-thunk ec))
+       ((make-sender
+         (lambda (envelope)
+           (inner-proc envelope)
+           ;; Check 'count' AFTER some things
+           ;; have been sent! Otherwise, the
+           ;; message is lost.
+           (count!)
+           (values)))
+        mq)))))
+
+;; After all threads have exited, we'll ‘drain’ out
+;; the left-overs.
+(define drain? (make-parameter #f))
+
+(define (make-sender/choice y? x y)
+  "When @code{(y?)}, send with @code{y}. Else, send
+with @code{x}."
+  (lambda (mq)
+    (if (y?)
+       (y mq)
+       (x mq))))
+
+(define (inner-send envelope)
+  (attempt-irrevocable-sent!
+   envelope
+   ((go message priority)
+    (received/thread (cons (decode-thread-message message)
+                          (received/thread)))
+    (values))
+   ((cancelled) (error "what/cancelled"))
+   ((already-sent) (error "what/already-sent"))))
+
+(define sender/thread
+  (make-sender/choice
+   drain?
+   (make-every-n/thread
+    (make-restricted-sender
+     (lambda ()
+       (+ k/thread e/min
+         (random (- e/max e/min -1) (random/thread))))
+     make-one-by-one-sender
+     inner-send)
+    k/thread)
+   (make-one-by-one-sender inner-send)))
+
+(define (results->array per-thread-sent)
+  ;; A bit array of messages the send function has
+  ;; seen.
+  (define a (make-typed-array 'b #f N_MESSAGES N_THREAD))
+  (define (visit-message message)
+    (define thread-index (car message))
+    (define message-index (cdr message))
+    (array-set! a #t message-index thread-index))
+  (define (visit-per-thread _ messages)
+    ;; TODO: once, I got a
+    ;; (wrong-type-arg "for-each" "Not a list: ~S" (#<unspecified>) #f)
+    ;; _after_ (pk 'n-leftovers').
+    ;;
+    ;; After some debugging, it appears run-fibers with #:drain? #t
+    ;; sometimes forgets about a fiber.
+    (unless (or (null? messages) (pair? messages))
+      (pk 'the-sporadic-bug _ messages)
+      (backtrace)
+      (error "the draining bug bites you (_: ~s) (messages: ~s)"
+            _ messages))
+    (for-each visit-message messages))
+  (vector-for-each visit-per-thread per-thread-sent)
+  a)
+
+(define (array-missing a)
+  (define missing '())
+  (array-index-map! a
+                   (lambda (i j)
+                     (define found (array-ref a i j))
+                     (unless found
+                       (set! missing `((,i . ,j) . ,missing)))
+                     found))
+  missing)
+
+;; But possibly out-of-order!
+(test-equal "nothing lost when sending concurrently"
+  '()
+  (let* ((mq (make-message-queue no-handlers
+                                no-error-handler
+                                sender/thread))
+        (thread-indices (iota N_THREAD))
+        ;; The ‘drained-out’ messages are put
+        ;; at index N_THREAD.
+        (results (make-vector (+ 1 N_THREAD)))
+        (ready? (make-condition)))
+    (run-fibers
+     (lambda ()
+       (define (run! thread-index)
+        (spawn-fiber
+         (lambda ()
+           (wait ready?)
+           (vector-set! results thread-index
+                        (thread mq thread-index)))))
+       (for-each run! thread-indices)
+       ;; Try to start every thread at the same time!
+       (signal-condition! ready?))
+     #:drain? #t
+     ;; No need
+     #:install-suspendable-ports? #f
+     ;; More interrupts --> more switches
+     ;; --> more test coverage. At least,
+     ;; that's the idea. Not really tested.
+     #:hz 700)
+    ;; Drain the left-overs.
+    (parameterize ((drain? #t)
+                  (received/thread '()))
+      (try-send-again! mq)
+      (pk 'n-leftovers (length (received/thread)))
+      (vector-set! results N_THREAD (received/thread)))
+    (array-missing (results->array results))))
+
+;; See "the draining bug bites you"
+;; in results->array.
+(test-expect-fail 1)
+(test-eq "is the draining bug fixed?"
+  'yes
+  'no)

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]