gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 142/324: mq: Implement envelope cancellation callback.


From: gnunet
Subject: [gnunet-scheme] 142/324: mq: Implement envelope cancellation callback.
Date: Tue, 21 Sep 2021 13:23:02 +0200

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit 6103fb386915a6ef4523cce90a2d917c0d74aa93
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Thu Jun 17 22:09:38 2021 +0200

    mq: Implement envelope cancellation callback.
    
    * gnu/gnunet/mq.scm
      (<message-queue>)[messages/box]: Replace by ...
      (<messages+garbage/box>)[messages+garbage/box]: ... this field.
      (make-message-queue): Initialise new field appropriately.
      (make-one-by-one-sender): Use new field. Decrement the garbage
      counter when a cancelled envelope is encountered. Ignore cancelled
      envelopes.
      (message-queue-length): Use new field.
      (%message-queue-garbagitude): New procedure, for the test suite.
      (send-message!)[cancel!]: Unstub. If the message queue still
      exists, consider removing cancelled envelopes from the queue.
      (send-message!): Use new field.
      (queue-filter): New procedure.
      (increment-garbage&maybe-cleanup): Delete cancelled envelopes
      from the queue, but not too often, for efficiency.
---
 gnu/gnunet/mq.scm | 154 +++++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 118 insertions(+), 36 deletions(-)

diff --git a/gnu/gnunet/mq.scm b/gnu/gnunet/mq.scm
index 89d5f6e..af2a9fd 100644
--- a/gnu/gnunet/mq.scm
+++ b/gnu/gnunet/mq.scm
@@ -26,7 +26,7 @@
 ;; Messages are made of bytes. In particular,
 ;; messages must be prefixed by a /:message-header.
 ;;
-;; TODO cancelling, injecting messages, message handlers ...
+;; TODO injecting messages, message handlers ...
 ;; These are not implemented yet or untested, or need
 ;; more documentation!
 (define-library (gnu gnunet mq)
@@ -34,6 +34,7 @@
          make-one-by-one-sender
          inject-message! send-message!
          message-queue-length
+         %message-queue-garbagitude
          try-send-again!
 
          &missing-header-error make-missing-header-error
@@ -59,13 +60,16 @@
          (only (gnu gnunet netstruct syntactic)
                sizeof read%)
          (only (guile) define* exact-integer?)
+         (only (ice-9 weak-vector)
+               weak-vector weak-vector-ref)
          (only (ice-9 atomic)
                make-atomic-box atomic-box-ref
                atomic-box-compare-and-swap!)
          (only (rnrs base)
                lambda assert let begin define
                procedure? eq? >= = <= < if quote
-               values and let* not)
+               values and let* not cons car cdr
+               cond + - > *)
          (only (rnrs control)
                when unless)
          (only (rnrs conditions)
@@ -74,19 +78,26 @@
          (only (rnrs exceptions)
                raise raise-continuable)
          (only (rnrs records syntactic) define-record-type)
+         (only (srfi srfi-1) filter)
          (only (srfi srfi-8) receive)
          (only (srfi srfi-39) make-parameter)
          (prefix (only (pfds queues)
                        make-queue dequeue enqueue queue-length
-                       queue-empty?)
+                       queue-empty? queue->list list->queue)
                  #{pfds:}#))
   (begin
     (define-record-type (<message-queue> make-message-queue message-queue?)
       (fields (immutable handlers message-queue-handlers)
              (immutable error-handler message-queue-error-handler)
-             ;; Atomic box of a queue of messages to send
-             ;; (as @code{<envelope>} objects).
-             (immutable messages/box message-queue-messages/box)
+             ;; Atomic box of a queue of messages to send (as @code{<envelope>}
+             ;; objects), together with an over-estimate of how many items in
+             ;; the queue are already cancelled, used as a heuristic for when
+             ;; optimising the message queue is required.
+             ;;
+             ;; It can occassionally be an under-estimate due to marking
+             ;; envelopes as cancelled and updating the estimate not being
+             ;; an atomic operation.
+             (immutable messages+garbage/box 
message-queue-messages+garbage/box)
              ;; A procedure for actually sending the messages.
              ;; It accepts a single argument, the message queue.
              ;;
@@ -115,7 +126,7 @@ Messages are sent with @var{sender}. It can be created with
           #;(assert (message-handlers? handlers))
           #;(assert (message-handler? error-handler))
           (%make handlers error-handler
-                 (make-atomic-box (pfds:make-queue))
+                 (make-atomic-box (cons (pfds:make-queue) 0))
                  sender)))))
 
     (define (make-one-by-one-sender proc)
@@ -133,35 +144,42 @@ messages must be sent in-order (TODO really received 
in-order?)."
       (lambda (mq)
        (assert (message-queue? mq))
        (%%bind-atomic-boxen
-        ((queue (message-queue-messages/box mq) swap!))
+        ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
         ;; First extract an envelope ...
-        (let spin ((old queue))
+        (let spin ((old queue+garbage))
+          (define old-queue (car old))
+          (define old-garbage (cdr old))
+          (assert (<= 0 old-garbage))
           ;; ... unless there isn't anything to remove anymore.
           ;; This check cannot be moved outside the (let spin ...),
           ;; as message senders may be called at any time
           ;; (even if there are no messages!). Also, in case of
           ;; concurrency, the queue may become empty after a spin
           ;; iteration.
-          (unless (pfds:queue-empty? old)
-            (receive (envelope new) (pfds:dequeue old)
-              (when (eq? old (swap! old new))
-                ;; We extracted a message. Now do something
-                ;; with it!
-                ;;
-                ;; Make sure @var{proc} does not return
-                ;; any values, as we may want to assign
-                ;; meaning to return values later.
-                (receive ()
-                    ;; Process the message.
-                    (proc envelope)
-                  'nothing))
-              ;; Process the remaining messages
-              ;; / Someone modified the message queue
-              ;; before us; retry.
-              ;;
+          (unless (pfds:queue-empty? old-queue)
+            (receive (envelope new-queue) (pfds:dequeue old-queue)
+              (cond ((envelope-peek-cancelled? envelope)
+                     ;; There is no need to pass already cancelled
+                     ;; envelopes to @var{proc} (although passing them
+                     ;; anyway should be harmless), so remove them
+                     ;; from the queue. Also try to keep the estimate
+                     ;; accurate.
+                     (swap! old (cons new-queue (- old-garbage 1))))
+                    ;; We extracted a (not-yet-cancelled) envelope.
+                    ;; Now do something with it!
+                    ((eq? old (swap! old (cons new-queue old-garbage)))
+                     ;; Make sure @var{proc} does not return
+                     ;; any values, as we may want to assign
+                     ;; meaning to return values later.
+                     (receive ()
+                         ;; Process the message.
+                         (proc envelope)
+                       (values))))
+              ;; Process remaining messages (or retry in case there was
+              ;; a race and we lost it).
               ;; TODO: if someone else modified the message queue,
               ;; does that mean we don't have to anymore?
-              (spin queue)))))))
+              (spin queue+garbage)))))))
 
     (define (inject-message! mq message)
       "Call the message handler that was registered
@@ -215,7 +233,12 @@ of message queues."
     (define (message-queue-length mq)
       "How many messages are currently in the message queue @var{mq}?"
       (pfds:queue-length
-       (atomic-box-ref (message-queue-messages/box mq))))
+       (car (atomic-box-ref (message-queue-messages+garbage/box mq)))))
+
+    (define (%message-queue-garbagitude mq)
+      "Return the estimated amount of cancelled envelopes. This procedure
+is not part of the API and is only intended for the test suite."
+      (cdr (atomic-box-ref (message-queue-messages+garbage/box mq))))
 
     ;; TODO: should this be a subtype of the not-yet-existing
     ;; &malformed-message?
@@ -261,13 +284,19 @@ but in case of an exception (for example, an 
out-of-memory exception during
 the handling of a @code{&overly-full-queue-warning}), it is possible
 the envelope isn't returned even though it has been enqueued and it might
 perhaps be sent."
+      (define mq/weak
+       (let ((v (weak-vector mq)))
+         (lambda () (weak-vector-ref v 0))))
       (define (cancel!)
-       (assert (and #f "cancel! not yet implemented")))
+       (let ((mq (mq/weak)))
+         (if mq
+             (increment-garbage&maybe-cleanup mq)
+             (values))))
       (assert (and (slice? message)
                   (exact-integer? priority)
                   (<= 0 priority) (< priority 512)))
       (%%bind-atomic-boxen
-       ((queue (message-queue-messages/box mq) swap-queue!))
+       ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
        ;; Add the message to the queue. Also remember the
        ;; length of the new queue; we'll need it later.
        (let* ((envelope (make-envelope cancel!
@@ -275,11 +304,13 @@ perhaps be sent."
                                       #:priority priority
                                       #:notify-sent! notify-sent!))
              (queue-length
-              (let spin ((old queue))
-                (let ((new (pfds:enqueue old envelope)))
-                  (if (eq? old (swap-queue! old new))
-                      (pfds:queue-length new)
-                      (spin queue))))))
+              (let spin ((old queue+garbage))
+                (let* ((old-queue (car old))
+                       (old-garbage (cdr old))
+                       (new-queue (pfds:enqueue old-queue envelope)))
+                  (if (eq? old (swap! old (cons new-queue old-garbage)))
+                      (pfds:queue-length new-queue)
+                      (spin queue+garbage))))))
         ;; The C implementation emits a warning if the queue has
         ;; many entries, as this may indicate a bug (in the scheduler,
         ;; in the queue implementation, ...). This seems a good idea.
@@ -297,4 +328,55 @@ perhaps be sent."
     (define (try-send-again! mq)
       "Try to send messages in the queue @var{mq} that were not yet sent.
 This is expected to be called from the message queue implementation."
-      ((message-queue-sender mq) mq))))
+      ((message-queue-sender mq) mq))
+
+    (define (queue-filter ? queue)
+      "Construct a queue, based on @var{queue}, restricted to elements
+satisfying the predicate @var{?}."
+      (pfds:list->queue (filter ? (pfds:queue->list queue))))
+
+    (define (increment-garbage&maybe-cleanup mq)
+      "Increment the garbage counter of @var{mq} and perhaps
+take out the trash (i.e., cancelled envelopes still in the queue),
+and if the trash is taken out, reset the garbage counter to zero,
+as an atomic operation."
+      (%%bind-atomic-boxen
+       ((queue+garbage (message-queue-messages+garbage/box mq) swap!))
+       (let loop ((old queue+garbage))
+        (let* ((old-queue (car old))
+               (old-queue-length (pfds:queue-length old-queue))
+               (old-garbage (cdr old))
+               (incremented-garbage (+ 1 old-garbage)))
+          (assert (<= 0 old-garbage))
+          ;; If the messages in the queue are largely
+          ;; garbage, throw the garbage out.  The procedure
+          ;; choses to throw the garbage out if the (estimated)
+          ;; ratio of garbage to the queue length is more than
+          ;; 3/4.
+          ;;
+          ;; There are no deep theoretical reasons for choosing
+          ;; the ratio 3/4=0.75, only that it is between 1/2 and
+          ;; 1. Choosing a ratio seemed less arbitrary than, say,
+          ;; only collect garbage if the garbage exceeds some
+          ;; fixed amount.
+          (if (> (* 4 incremented-garbage) (* 3 old-queue-length))
+              ;; It is time to collect garbage!
+              ;; Construct a new queue with all garbage removed.
+              (let ((filtered (queue-filter
+                               (lambda (i)
+                                 (not (envelope-peek-cancelled? i)))
+                               old-queue)))
+                ;; Try to write this new queue,
+                ;; resetting the garbage counter.
+                (if (eq? old (swap! old (cons filtered 0)))
+                    ;; All garbage has been thrown out! Done!
+                    (values)
+                    ;; We lost the race, try again!
+                    (loop queue+garbage)))
+              ;; Not yet time for garbage collection,
+              ;; just increment the garbage counter
+              (if (eq? old (swap! old (cons old-queue incremented-garbage)))
+                  ;; The garbage counter has been incremented! Done!
+                  (values)
+                  ;; We lost the race, try again!
+                  (loop queue+garbage)))))))))

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]