gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnunet-scheme] 01/02: tests/distributed-hash-table: Test retrieval.


From: gnunet
Subject: [gnunet-scheme] 01/02: tests/distributed-hash-table: Test retrieval.
Date: Wed, 02 Feb 2022 13:12:06 +0100

This is an automated email from the git hooks/post-receive script.

maxime-devos pushed a commit to branch master
in repository gnunet-scheme.

commit abc4e2023c73e930522af4b8a90dfa3bd99a24c6
Author: Maxime Devos <maximedevos@telenet.be>
AuthorDate: Wed Feb 2 11:58:08 2022 +0000

    tests/distributed-hash-table: Test retrieval.
    
    * tests/distributed-hash-table.scm
      (client-get->query,insertion->result,simulate-dht-service):
      New procedures.
      (type:ping,type:pong): New variables.
      ("synchronuous ping-pong with multiple balls (no interruptions, no 
cancellation)"):
      New test.
---
 tests/distributed-hash-table.scm | 212 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 209 insertions(+), 3 deletions(-)

diff --git a/tests/distributed-hash-table.scm b/tests/distributed-hash-table.scm
index cc6b8b8..28de506 100644
--- a/tests/distributed-hash-table.scm
+++ b/tests/distributed-hash-table.scm
@@ -16,7 +16,8 @@
 ;;
 ;; SPDX-License-Identifier: AGPL-3.0-or-later
 (define-module (test-distributed-hash-table))
-(import (quickcheck)
+(import (ice-9 match)
+       (quickcheck)
        (quickcheck arbitrary)
        (quickcheck generator)
        (quickcheck property)
@@ -28,6 +29,7 @@
        (gnu gnunet hashcode struct)
        (gnu gnunet block)
        (gnu gnunet message protocols)
+       (gnu gnunet mq)
        (gnu gnunet mq handler)
        (gnu gnunet mq-impl stream)
        (gnu extractor enum)
@@ -38,6 +40,7 @@
        (srfi srfi-26)
        (srfi srfi-64)
        (fibers conditions)
+       (fibers channels)
        (tests utils))
 
 ;; Copied from tests/bv-slice.scm.
@@ -361,7 +364,7 @@
 ;;; Currently, the following operations are tested:
 ;;;
 ;;;  * [x] insertion (@code{put!})
-;;;  * [ ] retrieval (@code{start-get!})
+;;;  * [x] retrieval (@code{start-get!})
 ;;;  * [ ] monitoring
 ;;;
 ;;; In the following contexts:
@@ -382,7 +385,18 @@
   (pk 'e e)
   (error "no error handler"))
 
-;; TODO: options
+;; TODO: options, (gnu gnunet dht network)?
+(define (client-get->query message)
+  (let^ ((! type (read% /:msg:dht:client:get '(type) message))
+        (! key (select /:msg:dht:client:get '(key) message))
+        (! desired-replication-level
+           (read% /:msg:dht:client:get '(desired-replication-level) message))
+        (! unique-id (read% /:msg:dht:client:get '(unique-id) message))
+        (! query
+           (make-query type key #:desired-replication-level
+                       desired-replication-level)))
+       (values query unique-id)))
+
 (define (client-put->insertion slice)
   (let^ ((! header (slice-slice slice 0 (sizeof /:msg:dht:client:put '())))
         (! type (read% /:msg:dht:client:put '(type) header))
@@ -399,6 +413,122 @@
                              desired-replication-level)))
        insertion))
 
+(define (insertion->result insertion unique-id)
+  (let^ ((! datum (insertion->datum insertion))
+        (! value (datum-value datum))
+        (! size (+ (sizeof /:msg:dht:client:result '())
+                   (slice-length value)))
+        (! slice (make-slice/read-write size))
+        (! header (slice-slice slice 0 (sizeof /:msg:dht:client:result '())))
+        (! rest (slice-slice slice (sizeof /:msg:dht:client:result '()))))
+       (set%! /:msg:dht:client:result '(header type)
+              header
+              (value->index (symbol-value message-type msg:dht:client:result)))
+       (set%! /:msg:dht:client:result '(header size) header size)
+       (set%! /:msg:dht:client:result '(type) header (datum-type datum))
+       ;; TODO: get and put paths, options
+       (set%! /:msg:dht:client:result '(put-path-length) header 0)
+       (set%! /:msg:dht:client:result '(get-path-length) header 0)
+       (set%! /:msg:dht:client:result '(unique-id) header unique-id)
+       (set%! /:msg:dht:client:result '(expiration) header
+              (datum-expiration datum))
+       (slice-copy! (datum-key datum)
+                    (select /:msg:dht:client:result '(key) header))
+       (slice-copy! (datum-value datum) rest)
+       slice))
+
+;; TODO: would be nice to turn this in a real service
+;; (gnu gnunet dht service).
+(define (simulate-dht-service)
+  "Simulate a DHT service, remembering all insertions and ignoring expiration
+and replication.  Cancellation is not supported.  Only a single client is
+supported."
+  (define (slice->bv slice)
+    (define bv (make-bytevector (slice-length slice)))
+    (define bv/slice (bv-slice/read-write bv))
+    (slice-copy! slice bv/slice)
+    bv)
+  (define (query->key query)
+    (cons (query-type query) (slice->bv (query-key query))))
+  (define (insertion->key insertion)
+    (define datum (insertion->datum insertion))
+    (cons (datum-type datum) (slice->bv (datum-key datum))))
+  ;; Mapping from (numeric type + key bytevector)
+  ;;   --> (list of value . interested mq channels)
+  (define table (make-hash-table))
+  (define table-channel (make-channel))
+  (define (handle-table spawn-fiber)
+    (define (put-message/async channel message)
+      (assert (channel? channel))
+      (spawn-fiber
+       (lambda ()
+        (put-message channel message))))
+    (match (get-message table-channel)
+      (('start-get! query response-channel)
+       (let* ((key (query->key query))
+             (old (hash-ref table key '(() . ())))
+             (old-values (car old))
+             (channels (cdr old)))
+        ;; Send currently known values.
+        (for-each
+         (lambda (old-value)
+           (put-message/async response-channel old-value))
+         old-values)
+        ;; Send future values to the channel as well.
+        (hash-set! table key
+                   `(,old-values ,response-channel ,@channels))))
+      (('put! insertion)
+       (let* ((key (insertion->key insertion))
+             (old (hash-ref table key '(() . ())))
+             (old-values (car old))
+             (channels (cdr old))
+             (new-values (cons insertion old-values)))
+        ;; Send the new value.
+        (for-each
+         (lambda (response-channel)
+           (put-message/async response-channel insertion))
+         channels)
+        (hash-set! table key `(,new-values . ,channels)))))
+    (handle-table spawn-fiber))
+  (lambda (port spawn-fiber)
+    (spawn-fiber (lambda () (handle-table spawn-fiber)))
+    (let^ ((! mq #false) ; not yet defined
+          (! mq-defined (make-condition))
+          (! (simple-message-handler type* handle!*)
+             (message-handler
+              (type type*)
+              ((interpose foo) foo)
+              ((well-formed? s) #true)
+              ((handle! slice) (handle!* slice))))
+          (! (handle/put! message)
+             ""
+             (put-message table-channel
+                          `(put! ,(client-put->insertion message))))
+          (!^ (handle/start-get! message)
+              ""
+              ((! channel (make-channel))
+               (<-- (query unique-id) (client-get->query message)))
+              (put-message table-channel `(start-get! ,query ,channel))
+              (spawn-fiber
+               (lambda ()
+                 (let loop ()
+                   (define insertion (get-message channel))
+                   (wait mq-defined)
+                   (send-message! mq (insertion->result insertion unique-id))
+                   (loop))))
+              (values))
+          (! h (message-handlers
+                (simple-message-handler
+                 (symbol-value message-type msg:dht:client:put)
+                 handle/put!)
+                (simple-message-handler
+                 (symbol-value message-type msg:dht:client:get)
+                 handle/start-get!))))
+         (set! mq
+           (port->message-queue port h no-error-handler #:spawn spawn-fiber))
+         (signal-condition! mq-defined)
+         (values))))
+
 (test-equal "put! sends one message to service, after connecting"
   i
   (let^ ((! connected? #false)
@@ -433,4 +563,80 @@
           (assert message)
           (client-put->insertion message)))))
 
+;; Squat two message types for tests below.
+(define type:ping 7)
+(define type:pong 8)
+
+(test-assert "synchronuous ping-pong with multiple balls (no interruptions, no 
cancellation)"
+  (call-with-services/fibers
+   `(("dht" . ,(simulate-dht-service)))
+   (lambda (config spawn-fiber)
+     (define N_ROUNDS 50)
+     (define server
+       (connect config #:spawn spawn-fiber))
+     (define (round->key round)
+       (define key (make-slice/read-write (sizeof /hashcode:512 '())))
+       (slice-u64-set! key 0 round (endianness little))
+       key)
+     (define (make-a-insertion type round j)
+       (define key (round->key round))
+       (define value (make-slice/read-write 8))
+       (slice-u64-set! value 0 j (endianness little))
+       (datum->insertion (make-datum type key value)))
+     (define (make-a-query type round)
+       (define key (round->key round))
+       (make-query type key))
+     (define (n-responses-for-round round)
+       (+ 1 (mod round 8)))
+     (define (ping/pong type round)
+       ;; round: number (used as key)
+       ;; j: value
+       ;;
+       ;; Multiple values are inserted for the same key,
+       ;; to test iteration.
+       (let loop ((j 0))
+        (when (< j (n-responses-for-round round))
+          (put! server (make-a-insertion type round j))
+          (loop (+ 1 j)))))
+     (define (search-result->j type search-result)
+       (define datum (search-result->datum search-result))
+       (define value (datum-value datum))
+       (assert (= (slice-length value) 8)) ; u64
+       (assert (= type (datum-type datum)))
+       (slice-u64-ref value 0 (endianness little)))
+     (define (wait-for-values type round)
+       (define done (make-condition))
+       (define responses '())
+       (define (found search-result)
+        (set! responses
+              (cons (search-result->j type search-result) responses))
+        (define length/current (length responses))
+        (define length/expected (n-responses-for-round round))
+        (when (>= length/current length/expected)
+          ;; Duplicated responses might happen in practice, but should
+          ;; be avoided when feasible.
+          (assert (= length/current length/expected))
+          (assert (equal? (sort responses <) (iota length/expected)))
+          ;; TODO: cancel query
+          (signal-condition! done)))
+       (start-get! server (make-a-query type round) found)
+       (wait done))
+     (define* (ping/pong* this-type other-type round)
+       (when (< round N_ROUNDS)
+        (ping/pong this-type round)
+        (wait-for-values other-type round)
+        (ping/pong* this-type other-type (+ 1 round))))
+     (define (spawn-ping/pong* this-type other-type)
+       (define done (make-condition))
+       (spawn-fiber
+       (lambda ()
+         (ping/pong* this-type other-type 0)
+         (signal-condition! done)))
+       done)
+     (define ping (spawn-ping/pong* type:ping type:pong))
+     (define pong (spawn-ping/pong* type:pong type:ping))
+     (wait ping)
+     (wait pong)
+     #true)))
+
 (test-end)

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]