[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnunet-scheme] 130/324: mq: New module, replacing message-io.
From: |
gnunet |
Subject: |
[gnunet-scheme] 130/324: mq: New module, replacing message-io. |
Date: |
Tue, 21 Sep 2021 13:22:50 +0200 |
This is an automated email from the git hooks/post-receive script.
maxime-devos pushed a commit to branch master
in repository gnunet-scheme.
commit ab8f8d42a81db198d2014e29f5319d1912bbdab8
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Sun May 30 22:53:49 2021 +0200
mq: New module, replacing message-io.
The old module was rather inconvenient in usage.
Some TODOs: message cancellation, message handlers,
closing queues, error handling, fixing a guile-fibers
bug ...
* gnu/gnunet/mq.scm: New module.
* gnu/gnunet/mq/envelope.scm: Export bind-atomic-boxen
for (gnu gnunet mq), pending a move into a separate module.
* tests/mq.scm: Test the new module.
The first test is based on a test from upstream.
---
Makefile.am | 3 +-
gnu/gnunet/mq.scm | 232 ++++++++++++++++++++
gnu/gnunet/mq/envelope.scm | 4 +-
tests/mq.scm | 520 +++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 757 insertions(+), 2 deletions(-)
diff --git a/Makefile.am b/Makefile.am
index 8b15711..4ab6680 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -41,7 +41,7 @@ modules = \
gnu/gnunet/mq/handler.scm \
gnu/gnunet/mq/prio-prefs.scm \
gnu/gnunet/mq/prio-prefs2.scm \
- gnu/gnunet/mq/message-io.scm \
+ gnu/gnunet/mq.scm \
\
gnu/gnunet/utils/bv-slice.scm \
gnu/gnunet/utils/hat-let.scm \
@@ -98,6 +98,7 @@ SCM_LOG_DRIVER = \
SCM_TESTS = \
tests/envelope.scm \
tests/message-handler.scm \
+ tests/mq.scm \
tests/update.scm \
tests/message-io.scm \
tests/bv-slice.scm \
diff --git a/gnu/gnunet/mq.scm b/gnu/gnunet/mq.scm
new file mode 100644
index 0000000..5884f0e
--- /dev/null
+++ b/gnu/gnunet/mq.scm
@@ -0,0 +1,232 @@
+;; This file is part of GNUnet.
+;; Copyright (C) 2012-2019 GNUnet e.V.
+;; Copyright (C) 2021 Maxime Devos (<maximedevos@telenet.be>)
+;;
+;; GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program. If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+;; Author: Florian Dold
+;; Author: Maxime Devos
+;; C file: util/mq.c
+;; Scheme module: (gnu gnunet mq)
+;;
+;; A message queue for GNUnet messages.
+;; Messages are made of bytes. In particular,
+;; messages must be prefixed by a /:message-header.
+;;
+;; TODO cancelling, injecting messages, message handlers ...
+;; These are not implemented yet or untested, or need
+;; more documentation!
+(define-library (gnu gnunet mq)
+ (export <message-queue> make-message-queue message-queue?
+ make-one-by-one-sender
+ inject-message! send-message!
+ message-queue-length
+ try-send-again!)
+ (import (gnu gnunet mq handler)
+ (gnu gnunet mq envelope)
+ (gnu gnunet utils hat-let)
+ (only (gnu gnunet utils bv-slice)
+ slice?)
+ (only (gnu gnunet util struct)
+ /:message-header)
+ (only (gnu gnunet netstruct syntactic)
+ sizeof read%)
+ (only (guile) define* exact-integer?)
+ (only (ice-9 atomic)
+ make-atomic-box atomic-box-ref
+ atomic-box-compare-and-swap!)
+ (only (rnrs base)
+ lambda assert let begin define
+ procedure? eq? >= = <= < if quote
+ values and let*)
+ (only (rnrs control)
+ when unless)
+ (only (rnrs conditions)
+ define-condition-type &warning
+ make-who-condition condition)
+ (only (rnrs exceptions)
+ raise-continuable)
+ (only (rnrs records syntactic) define-record-type)
+ (only (srfi srfi-8) receive)
+ (prefix (only (pfds queues)
+ make-queue dequeue enqueue queue-length
+ queue-empty?)
+ #{pfds:}#))
+ (begin
+ (define-record-type (<message-queue> make-message-queue message-queue?)
+ (fields (immutable handlers message-queue-handler)
+ (immutable error-handler message-queue-error-handler)
+ ;; Atomic box of a queue of messages to send
+ ;; (as @code{<envelope>} objects).
+ (immutable messages/box message-queue-messages/box)
+ ;; A procedure for actually sending the messages.
+ ;; It accepts a single argument, the message queue.
+ ;;
+ ;; It is run each time a message a message has been
+ ;; enqueued. It is not obligated to send the messages
+ ;; right now, though it probably should send them
+ ;; soonish. It can be run at any time, with
+ ;; @code{try-send-again!}.
+ (immutable sender message-queue-sender))
+ (protocol
+ (lambda (%make)
+ (lambda (handlers error-handler sender)
+ "Make a message queue with message handlers @var{handlers}.
+
+The message handlers are expected to handle bytevector slices
+that start with a @code{/:message-header}.
+
+XXX and error handler @var{error-handler}
+
+Messages are sent with @var{sender}. It can be created with
+@code{make-one-by-one-sender}."
+ ;; Predicate does not exist yet ...
+ #;(assert (message-handlers? handlers))
+ #;(assert (message-handler? error-handler))
+ (%make handlers error-handler
+ (make-atomic-box (pfds:make-queue))
+ sender)))))
+
+ (define (make-one-by-one-sender proc)
+ "Make a message sender, sending messages one-by-one with @var{proc}.
+
+The procedure @var{proc} must accept a single argument,
+the message envelope to send. This procedure should
+use @code{attempt-irrevocable-sent!} when it feels ready.
+It must not return any values currently.
+
+The message does not need to be send directly.
+However, remember that unless the priority allows otherwise,
+messages must be sent in-order (TODO really received in-order?)."
+ (assert (procedure? proc))
+ (lambda (mq)
+ (assert (message-queue? mq))
+ (%%bind-atomic-boxen
+ ((queue (message-queue-messages/box mq) swap!))
+ ;; First extract an envelope ...
+ (let spin ((old queue))
+ ;; ... unless there isn't anything to remove anymore.
+ ;; This check cannot be moved outside the (let spin ...),
+ ;; as message senders may be called at any time
+ ;; (even if there are no messages!). Also, in case of
+ ;; concurrency, the queue may become empty after a spin
+ ;; iteration.
+ (unless (pfds:queue-empty? old)
+ (receive (envelope new) (pfds:dequeue old)
+ (when (eq? old (swap! old new))
+ ;; We extracted a message. Now do something
+ ;; with it!
+ ;;
+ ;; Make sure @var{proc} does not return
+ ;; any values, as we may want to assign
+ ;; meaning to return values later.
+ (receive ()
+ ;; Process the message.
+ (proc envelope)
+ 'nothing))
+ ;; Process the remaining messages
+ ;; / Someone modified the message queue
+ ;; before us; retry.
+ ;;
+ ;; TODO: if someone else modified the message queue,
+ ;; does that mean we don't have to anymore?
+ (spin queue)))))))
+
+ (define (inject-message! mq message)
+ "Call the message handler that was registered
+for the type of the message @var{mq} in the message queue var{mq}
+with the mesage @var{message}. In case the message is malformed
+(according to the message handler), inject a @code{&malformed-message}
+error instead.
+
+This procedure is intended to be used by the implementation
+of message queues."
+ (let* ((header (slice-slice message 0 (sizeof /:message-header '(type))))
+ (handler (message-handler-for
+ (message-queue-handlers mq)
+ (read% /:message-header '(type) header))))
+ (if (verify-message? handler message)
+ ;; TODO: maybe a good place to catch out-of-memory
+ ;; and stack overflow errors ...
+ (handle-message! handler message)
+ ;; XXX
+ (inject-error! handler message))))
+
+ (define (message-queue-length mq)
+ "How many messages are currently in the message queue @var{mq}?"
+ (pfds:queue-length
+ (atomic-box-ref (message-queue-messages/box mq))))
+
+ (define-condition-type &overly-full-queue-warning &warning
+ make-overly-full-queue-warning overly-full-queue-warning?
+ (current-length overly-full-queue-length)
+ (suspicious-when overly-full-queue-suspicious-when))
+
+ (define %suspicious-length 10000)
+
+ (define* (send-message! mq message #:key (priority 0)
+ (notify-sent! values))
+ "Send a message with the given message queue. A continuable
+@code{&warning} may be raised, e.g. a @code{&overly-full-queue-warning}
+in case the queue is suspiciously long. The message queue implementation
+can raise errors of its own as well, as usual.
+
+@var{priority} is a numeric priority-preference value for @var{message},
+from @code{(gnu gnunet mq prio-prefs)}. By default, @var{message} will be
+sent as reliable background traffic (@code{prio:background}).
+
+The in-order sending of ordered messages (when requested by @var{priority})
+is only guaranteed when supported by the message queue implementation
+and when @code{try-send-again!} and @code{send-message!} are not being
+used concurrently on the same message queue.
+
+When the message has been irrevocabily sent, the thunk @var{notify-sent!}
+will be called."
+ (define (cancel!)
+ (assert (and #f "cancel! not yet implemented")))
+ (assert (and (slice? message)
+ (exact-integer? priority)
+ (<= 0 priority) (< priority 512)))
+ (%%bind-atomic-boxen
+ ((queue (message-queue-messages/box mq) swap-queue!))
+ ;; Add the message to the queue. Also remember the
+ ;; length of the new queue; we'll need it later.
+ (let* ((envelope (make-envelope cancel!
+ message
+ #:priority priority
+ #:notify-sent! notify-sent!))
+ (queue-length
+ (let spin ((old queue))
+ (let ((new (pfds:enqueue old envelope)))
+ (if (eq? old (swap-queue! old new))
+ (pfds:queue-length new)
+ (spin queue))))))
+ ;; The C implementation emits a warning if the queue has
+ ;; many entries, as this may indicate a bug (in the scheduler,
+ ;; in the queue implementation, ...). This seems a good idea.
+ (when (>= queue-length 10000)
+ (raise-continuable
+ (condition (make-overly-full-queue-warning
+ queue-length %suspicious-length)
+ ;; TODO: consider
+ ;; (@ (gnu gnunet mq) send!) here and elsewhere.
+ (make-who-condition 'send!))))
+ (try-send-again! mq))))
+
+ (define (try-send-again! mq)
+ "Try to send messages in the queue @var{mq} that were not yet sent.
+This is expected to be called from the message queue implementation."
+ ((message-queue-sender mq) mq))))
diff --git a/gnu/gnunet/mq/envelope.scm b/gnu/gnunet/mq/envelope.scm
index e0c94a2..76d91ea 100644
--- a/gnu/gnunet/mq/envelope.scm
+++ b/gnu/gnunet/mq/envelope.scm
@@ -26,7 +26,9 @@
;; so no type checks there.
(define-library (gnu gnunet mq envelope)
(export <envelope> make-envelope envelope?
- attempt-cancel! attempt-irrevocable-sent!)
+ attempt-cancel! attempt-irrevocable-sent!
+ ;; TODO find a better place
+ (rename (bind-atomic-boxen %%bind-atomic-boxen)))
(import (gnu gnunet utils hat-let)
(only (guile) define* lambda* exact-integer?)
(only (ice-9 match) match)
diff --git a/tests/mq.scm b/tests/mq.scm
new file mode 100644
index 0000000..bc05f9d
--- /dev/null
+++ b/tests/mq.scm
@@ -0,0 +1,520 @@
+;; This file is part of GNUnet.
+;; Copyright (C) 2012, 2018 GNUnet e.V.
+;; Copyright (C) 2021 Maxime Devos
+;;
+;; GNUnet is free software: you can redistribute it and/or modify it
+;; under the terms of the GNU Affero General Public License as published
+;; by the Free Software Foundation, either version 3 of the License,
+;; or (at your option) any later version.
+;;
+;; GNUnet is distributed in the hope that it will be useful, but
+;; WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Affero General Public License for more details.
+;;
+;; You should have received a copy of the GNU Affero General Public License
+;; along with this program. If not, see <http://www.gnu.org/licenses/>.
+;;
+;; SPDX-License-Identifier: AGPL3.0-or-later
+
+;; Author: Florian Dold
+;; Author: Christian Grothoff
+;; Author: Maxime Devos
+
+(define-module (tests mq))
+
+(use-modules (ice-9 control)
+ (fibers conditions)
+ (fibers)
+ (srfi srfi-26)
+ (srfi srfi-43)
+ (srfi srfi-64)
+ (srfi srfi-111)
+ ((rnrs base) #:select (assert mod))
+ ((rnrs arithmetic bitwise)
+ #:select (bitwise-ior))
+ (gnu gnunet netstruct syntactic)
+ ((gnu gnunet netstruct procedural)
+ #:select (u32/big))
+ (gnu gnunet mq prio-prefs)
+ (gnu gnunet mq prio-prefs2)
+ (gnu gnunet util struct)
+ (gnu gnunet utils bv-slice)
+ ((gnu extractor enum)
+ #:select (symbol-value value->index))
+ (gnu gnunet message protocols)
+ (gnu gnunet mq)
+ (gnu gnunet mq envelope)
+ (gnu gnunet mq handler))
+
+;; The client code sends the numbers 0 to
+;; NUM_TRANSMISSIONS-1 over the message queue.
+;; The notify-sent callback verifies whether
+;; messages were sent in-order. The fake
+;; ‘sender’ procedure verifies whether it received
+;; the messages in order.
+;;
+;; Note that in more realistic situations, some
+;; queueing can happen! A very special case
+;; is being tested here.
+
+(define NUM_TRANSMISSIONS 100)
+
+(eval-when (expand load eval)
+ (define-type /:msg:our-test:dummy
+ (structure/packed
+ (synopsis "A test message, containing an index")
+ (documentation
+ "The first time, a message with index 0 is sent.
+Then each time the index is increased.")
+ (field (header /:message-header))
+ (field (index u32/big)))))
+
+(define (index->dummy i)
+ (let ((s (make-slice/read-write
+ (sizeof /:msg:our-test:dummy '()))))
+ (set%! /:msg:our-test:dummy '(header type) s
+ (value->index (symbol-value message-type msg:util:dummy)))
+ (set%! /:msg:our-test:dummy '(header size) s
+ (sizeof /:msg:our-test:dummy '()))
+ (set%! /:msg:our-test:dummy '(index) s i)
+ s))
+
+(define (dummy->index s)
+ (read% /:msg:our-test:dummy '(index) s))
+
+(define (client mq notify-sent-box sent-box)
+ (define (see i)
+ (if (= i (unbox notify-sent-box))
+ (set-box! notify-sent-box (+ 1 i))
+ (error "messages were sent out-of-order (index: ~a) (notify-sent: ~a)
(sent: ~a)"
+ i
+ (unbox notify-sent-box)
+ (unbox sent-box))))
+ (do ((i 0 (+ 1 i)))
+ ((>= i NUM_TRANSMISSIONS))
+ (send-message! mq (index->dummy i)
+ #:notify-sent! (cut see i))))
+
+(define (send-proc notify-sent-box sent-box envelope)
+ (attempt-irrevocable-sent!
+ envelope
+ ((go message priority)
+ (let ((index (dummy->index message)))
+ (unless (= (+ index 1) (unbox notify-sent-box))
+ (error "messages are being sent out-of-order or with queueing (index:
~a) (notify-sent: ~a) (sent: ~a)"
+ index
+ (unbox notify-sent-box)
+ (unbox sent-box)))
+ (unless (= index (unbox sent-box))
+ (error "dunno (index: ~a) (notify-sent: ~a) (sent: ~a)"
+ index
+ (unbox notify-sent-box)
+ (unbox sent-box)))
+ (set-box! sent-box (+ 1 index))
+ (values)))
+ ((cancelled)
+ (error "how did this cancelling happen?"))
+ ((already-sent)
+ (error "forgot to remove envelope from queue"))))
+
+(define no-handlers (message-handlers))
+(define (no-error-handler . what)
+ (error "were did this error come from?"))
+
+(test-equal "in-order, no queuing"
+ (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS)
+ (let* ((notify-sent-box (box 0))
+ (sent-box (box 0))
+ (mq (make-message-queue no-handlers
+ no-error-handler
+ (make-one-by-one-sender
+ (cut send-proc notify-sent-box sent-box
<>)))))
+ (client mq notify-sent-box sent-box)
+ (list (unbox notify-sent-box) (unbox sent-box))))
+
+
+
+;; Simulate buffering, by only ‘truly’ sending after each three messages.
+;; This does _not_ test the queueing code! See the next test for that.
+;; Make sure messages aren't lost, and they are still be sent in-order!
+;;
+;; (Assuming the sender is well-implemented. A buggy sender could send
+;; things out-of-order.)
+
+(define (send-proc2 notify-sent-box sent-box mod-box stashed envelope)
+ (let ((first-free (vector-index not stashed))
+ (expected-filled (unbox mod-box)))
+ (unless (= (or first-free 0) expected-filled)
+ (error "did we lose a message?"))
+ (set-box! mod-box (mod (+ 1 expected-filled) (vector-length stashed)))
+ (if (not first-free)
+ (begin
+ (vector-map!
+ (lambda (i envelope)
+ (send-proc notify-sent-box sent-box envelope)
+ #f)
+ stashed)
+ (vector-set! stashed 0 envelope))
+ ;; @var{stashed} is not yet full; send the
+ ;; envelope later!
+ (vector-set! stashed first-free envelope))
+ (values)))
+
+(define (expected-sent n k)
+ (- n (let ((mod (mod n k)))
+ (if (= mod 0)
+ k
+ mod))))
+
+(define k 3)
+
+(test-equal "in-order, some buffering"
+ (map (cut expected-sent <> 3)
+ (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
+ (let* ((notify-sent-box (box 0))
+ (sent-box (box 0))
+ (mod-box (box 0))
+ (stashed (make-vector k #f))
+ (mq (make-message-queue no-handlers
+ no-error-handler
+ (make-one-by-one-sender
+ (cut send-proc2 notify-sent-box sent-box
mod-box stashed <>)))))
+ (client mq notify-sent-box sent-box)
+ (list (unbox notify-sent-box) (unbox sent-box))))
+
+
+
+;; Test the queueing code by only flushing
+;; the queue every N messages. Also check,
+;; using flushing-allowed?, that sending
+;; only happens when we expect it to happen.
+
+(define flushing-allowed?
+ (make-parameter #f))
+
+(define (send-proc/check notify-sent-box sent-box envelope)
+ (assert (flushing-allowed?))
+ (send-proc notify-sent-box sent-box envelope))
+
+(define (make-every-n proc k)
+ "Make a sender using @var{proc} every @var{k}
+invocations, and at other times doing nothing."
+ ;; Should theoretically be an atomic, but the test is singly-threaded,
+ ;; so don't bother.
+ (define n-mod-k 0)
+ (lambda (mq)
+ (assert (not (flushing-allowed?)))
+ (set! n-mod-k (+ 1 n-mod-k))
+ (when (>= n-mod-k k)
+ (set! n-mod-k 0)
+ (parameterize ((flushing-allowed? #t))
+ (proc mq)))))
+
+(test-equal "in-order, some queueing"
+ (map (cut expected-sent <> 3)
+ (list NUM_TRANSMISSIONS NUM_TRANSMISSIONS))
+ (let* ((notify-sent-box (box 0))
+ (sent-box (box 0))
+ (mq (make-message-queue no-handlers
+ no-error-handler
+ (make-every-n
+ (make-one-by-one-sender
+ (cut send-proc/check notify-sent-box
sent-box <>))
+ 3))))
+ (client mq notify-sent-box sent-box)
+ (list (unbox notify-sent-box) (unbox sent-box))))
+
+
+
+;; Test that concurrency interacts well with queueing.
+;;
+;; The situation we consider, is a number
+;; of different threads concurrently sending messages.
+;; The test verifies whether all messages were, in fact, sent.
+;;
+;; To make things complicated, some queueing is introduced.
+;; The sender will only proceed each time the current thread
+;; has tried to send @var{k/thread} messages, and the sender
+;; will only try to send at most @code{(+ k/thread e)}, where
+;; @var{e} is a random number from @var{e/min} to @var{e/max}.
+
+;; The tests detect the following potential problems in the code
+;; by crashing (but not always, so you may need to re-run a few
+;; times, three times tends to be enough in practice for me):
+;;
+;; * Replacing 'old' with 'queue' in
+;; unless (pfds:queue-empty? old)
+;; * Replacing 'old' with 'queue' in
+;; receive (envelope new) (pfds:dequeue old)
+;; * Replacing the first 'old' with 'queue' in
+;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
+;; * Replacing the second 'old' with 'queue' in
+;; (eq? old (swap! old new)), in 'make-one-by-one-sender'
+;; * Replacing 'old' by 'queue' in
+;; (pfds:enqueue old envelope)
+;; (only detected infrequently, odds 1 to 7 or so)
+;; * Replacing the first 'old' by 'queue' in
+;; (eq? old (swap-queue! old new))
+;; in 'send-message!'
+;; * Replacing the second 'old' by 'queue' in
+;; (eq? old (swap-queue! old new))
+;; in 'send-message!'
+;;
+;; The following problems cause a hang when testing:
+;; * Replacing 'queue' by 'old' in (spin queue)
+;; in 'make-one-by-one-sender'
+;; * Replacing 'queue' by 'old' in (spin queue)
+;; in 'send-message!'.
+;;
+;; The following problems cause a hang in a preceding
+;; test:
+;;
+;; * Replacing the first 'old' by 'new' in
+;; (eq? old (swap-queue! old new))
+;; in 'send-message!'
+;; * Replacing 'queue' by 'old' in
+;; (spin queue)
+;; in 'send-message!'
+;; * Replacing 'queue' by 'new' in
+;; (spin queue)
+;; in 'send-message!'
+;;
+;; Some potential problems currently remain undetected:
+;; * Replacing the 'new' by 'queue' in
+;; (pfds:queue-length new)
+;;
+;; However, it is only for printing a warning
+;; when the queue is rather full. Being slightly
+;; off in queue length shouldn't be a problem
+;; there, as the 'maximum reasonable bound'
+;; is just a wild guess and not some exact
+;; cut-off.
+
+(define random/thread
+ (fluid->parameter (make-unbound-fluid)))
+(define k/thread 12)
+(define e/min 2)
+(define e/max 7)
+(define N_MESSAGES 1000)
+(define N_THREAD 40)
+
+;; List of (thread-index . message-index)
+;; received by current thread.
+(define received/thread
+ (fluid->parameter (make-unbound-fluid)))
+(define i/thread
+ (fluid->parameter (make-unbound-fluid)))
+
+;; The sending is happening concurrently,
+;; so in-order delivery cannot be guaranteed.
+;; Thus, requesting in-order delivery seems
+;; silly.
+(define prio
+ (bitwise-ior
+ (prio->integer 'prio:background)
+ (value->index (symbol-value priority-preference
+ pref:out-of-order))))
+
+(eval-when (expand load eval)
+ (define-type /:msg:our-test:concurrency
+ (structure/packed
+ (synopsis "A test message, containing an thread and message index")
+ (documentation
+ "The first time, a message with index 0 is sent.
+Then each time the index is increased.")
+ (field (header /:message-header))
+ (field (index u32/big))
+ (field (thread u32/big)))))
+
+(define (make-thread-message thread-index i)
+ (let ((s (make-slice/read-write
+ (sizeof /:msg:our-test:concurrency '()))))
+ (set%! /:msg:our-test:concurrency '(header type) s
+ (value->index (symbol-value message-type msg:util:dummy)))
+ (set%! /:msg:our-test:concurrency '(header size) s
+ (sizeof /:msg:our-test:concurrency '()))
+ (set%! /:msg:our-test:concurrency '(index) s i)
+ (set%! /:msg:our-test:concurrency '(thread) s thread-index)
+ s))
+
+(define (decode-thread-message s)
+ (cons (read% /:msg:our-test:concurrency '(thread) s)
+ (read% /:msg:our-test:concurrency '(index) s)))
+
+
+(define (make-every-n/thread proc k)
+ "Make a sender using @var{proc} every @var{k}
+invocations, and at other times doing nothing.
+@code{i/thread} is used for state."
+ (lambda (mq)
+ (assert (not (flushing-allowed?)))
+ (i/thread (+ 1 (i/thread)))
+ (when (>= (i/thread) k)
+ (i/thread 0)
+ (parameterize ((flushing-allowed? #t))
+ (proc mq)))))
+
+(define (thread mq thread-index)
+ (pk 'j thread-index)
+ (parameterize ((received/thread '())
+ (i/thread 0)
+ (random/thread
+ (seed->random-state thread-index)))
+ (do ((i 0 (+ 1 i)))
+ ((>= i N_MESSAGES))
+ (send-message! mq (make-thread-message thread-index i)
+ #:priority prio))
+ ;; If you try to debug "the draining bug bites you",
+ ;; notice that while there is (j 0) ... (j (- N_THREAD 1))
+ ;; in the output, some (i index) are missing.
+ ;;
+ ;; A bug in guile-fibers?
+ (pk 'i thread-index)
+ ;; See "the draining bug bites you".
+ ;; This assertion never happens FWIW.
+ (assert (or (pair? (received/thread))
+ (null? (received/thread))))
+ (received/thread)))
+
+(define (make-restricted-sender how-many make-sender inner-proc)
+ "Make a sender that, when called, tries to send @code{(how-many)}
+messages, using @var{make-sender} and @var{inner-proc}."
+ (define escape-thunk
+ (fluid->parameter (make-unbound-fluid)))
+ (define count
+ (fluid->parameter (make-unbound-fluid)))
+ (define max-count
+ (fluid->parameter (make-unbound-fluid)))
+ (define (count!)
+ (count (+ 1 (count)))
+ (when (= (count) (max-count))
+ (count 0)
+ ((escape-thunk))))
+ (lambda (mq)
+ (let/ec ec
+ (parameterize ((max-count (how-many))
+ (count 0)
+ (escape-thunk ec))
+ ((make-sender
+ (lambda (envelope)
+ (inner-proc envelope)
+ ;; Check 'count' AFTER some things
+ ;; have been sent! Otherwise, the
+ ;; message is lost.
+ (count!)
+ (values)))
+ mq)))))
+
+;; After all threads have exited, we'll ‘drain’ out
+;; the left-overs.
+(define drain? (make-parameter #f))
+
+(define (make-sender/choice y? x y)
+ "When @code{(y?)}, send with @code{y}. Else, send
+with @code{x}."
+ (lambda (mq)
+ (if (y?)
+ (y mq)
+ (x mq))))
+
+(define (inner-send envelope)
+ (attempt-irrevocable-sent!
+ envelope
+ ((go message priority)
+ (received/thread (cons (decode-thread-message message)
+ (received/thread)))
+ (values))
+ ((cancelled) (error "what/cancelled"))
+ ((already-sent) (error "what/already-sent"))))
+
+(define sender/thread
+ (make-sender/choice
+ drain?
+ (make-every-n/thread
+ (make-restricted-sender
+ (lambda ()
+ (+ k/thread e/min
+ (random (- e/max e/min -1) (random/thread))))
+ make-one-by-one-sender
+ inner-send)
+ k/thread)
+ (make-one-by-one-sender inner-send)))
+
+(define (results->array per-thread-sent)
+ ;; A bit array of messages the send function has
+ ;; seen.
+ (define a (make-typed-array 'b #f N_MESSAGES N_THREAD))
+ (define (visit-message message)
+ (define thread-index (car message))
+ (define message-index (cdr message))
+ (array-set! a #t message-index thread-index))
+ (define (visit-per-thread _ messages)
+ ;; TODO: once, I got a
+ ;; (wrong-type-arg "for-each" "Not a list: ~S" (#<unspecified>) #f)
+ ;; _after_ (pk 'n-leftovers').
+ ;;
+ ;; After some debugging, it appears run-fibers with #:drain? #t
+ ;; sometimes forgets about a fiber.
+ (unless (or (null? messages) (pair? messages))
+ (pk 'the-sporadic-bug _ messages)
+ (backtrace)
+ (error "the draining bug bites you (_: ~s) (messages: ~s)"
+ _ messages))
+ (for-each visit-message messages))
+ (vector-for-each visit-per-thread per-thread-sent)
+ a)
+
+(define (array-missing a)
+ (define missing '())
+ (array-index-map! a
+ (lambda (i j)
+ (define found (array-ref a i j))
+ (unless found
+ (set! missing `((,i . ,j) . ,missing)))
+ found))
+ missing)
+
+;; But possibly out-of-order!
+(test-equal "nothing lost when sending concurrently"
+ '()
+ (let* ((mq (make-message-queue no-handlers
+ no-error-handler
+ sender/thread))
+ (thread-indices (iota N_THREAD))
+ ;; The ‘drained-out’ messages are put
+ ;; at index N_THREAD.
+ (results (make-vector (+ 1 N_THREAD)))
+ (ready? (make-condition)))
+ (run-fibers
+ (lambda ()
+ (define (run! thread-index)
+ (spawn-fiber
+ (lambda ()
+ (wait ready?)
+ (vector-set! results thread-index
+ (thread mq thread-index)))))
+ (for-each run! thread-indices)
+ ;; Try to start every thread at the same time!
+ (signal-condition! ready?))
+ #:drain? #t
+ ;; No need
+ #:install-suspendable-ports? #f
+ ;; More interrupts --> more switches
+ ;; --> more test coverage. At least,
+ ;; that's the idea. Not really tested.
+ #:hz 700)
+ ;; Drain the left-overs.
+ (parameterize ((drain? #t)
+ (received/thread '()))
+ (try-send-again! mq)
+ (pk 'n-leftovers (length (received/thread)))
+ (vector-set! results N_THREAD (received/thread)))
+ (array-missing (results->array results))))
+
+;; See "the draining bug bites you"
+;; in results->array.
+(test-expect-fail 1)
+(test-eq "is the draining bug fixed?"
+ 'yes
+ 'no)
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [gnunet-scheme] 136/324: mq: Verify message size during message injection., (continued)
- [gnunet-scheme] 136/324: mq: Verify message size during message injection., gnunet, 2021/09/21
- [gnunet-scheme] 92/324: guix: Add guile-quickcheck dependency., gnunet, 2021/09/21
- [gnunet-scheme] 138/324: mq: Fix &who for &overly-full-queue-warning., gnunet, 2021/09/21
- [gnunet-scheme] 142/324: mq: Implement envelope cancellation callback., gnunet, 2021/09/21
- [gnunet-scheme] 111/324: config: parser: Remove fixed TODO., gnunet, 2021/09/21
- [gnunet-scheme] 114/324: config: parser: More TODOs about the configuration format., gnunet, 2021/09/21
- [gnunet-scheme] 119/324: netstruct: Correct size of u8., gnunet, 2021/09/21
- [gnunet-scheme] 122/324: netstruct: Pass index for u8 (zero)., gnunet, 2021/09/21
- [gnunet-scheme] 123/324: netstruct: Select a part of the slice before trying to read., gnunet, 2021/09/21
- [gnunet-scheme] 124/324: netstruct: Use the correct size for bounds checking in 'select'., gnunet, 2021/09/21
- [gnunet-scheme] 130/324: mq: New module, replacing message-io.,
gnunet <=
- [gnunet-scheme] 132/324: mq: Do not hardcode suspicious queue length., gnunet, 2021/09/21
- [gnunet-scheme] 133/324: mq: Pluralise ‘message-queue-handler’., gnunet, 2021/09/21
- [gnunet-scheme] 137/324: mq: Make %suspicious-length a sort-of exported parameter., gnunet, 2021/09/21
- [gnunet-scheme] 139/324: mq: Make accessors of &overly-full-queue-warnings predictably named., gnunet, 2021/09/21
- [gnunet-scheme] 141/324: mq: Return the envelope after enqueueing and add more tests., gnunet, 2021/09/21
- [gnunet-scheme] 147/324: Merge branch 'master' into proper-mq, gnunet, 2021/09/21
- [gnunet-scheme] 97/324: utils: bv-slice: Define a record printer., gnunet, 2021/09/21
- [gnunet-scheme] 118/324: netstruct: Fix field lookup and offset calculation., gnunet, 2021/09/21
- [gnunet-scheme] 140/324: mq: Export &overly-full-queue-warning and friends., gnunet, 2021/09/21
- [gnunet-scheme] 144/324: mq: Test message cancellation., gnunet, 2021/09/21