[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
02/05: Add some utilities to use PostgreSQL/Squee through a channel
From: |
Christopher Baines |
Subject: |
02/05: Add some utilities to use PostgreSQL/Squee through a channel |
Date: |
Thu, 1 Oct 2020 14:16:44 -0400 (EDT) |
cbaines pushed a commit to branch master
in repository data-service.
commit 614f9888a58fbd7b2e708cbbf0262f3eb42d2d49
Author: Christopher Baines <mail@cbaines.net>
AuthorDate: Thu Oct 1 19:13:30 2020 +0100
Add some utilities to use PostgreSQL/Squee through a channel
To allow for some concurrency.
---
guix-data-service/database.scm | 102 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 102 insertions(+)
diff --git a/guix-data-service/database.scm b/guix-data-service/database.scm
index df4daac..8298b93 100644
--- a/guix-data-service/database.scm
+++ b/guix-data-service/database.scm
@@ -18,9 +18,19 @@
(define-module (guix-data-service database)
#:use-module (system foreign)
#:use-module (ice-9 match)
+ #:use-module (ice-9 threads)
#:use-module (squee)
+ #:use-module (fibers)
+ #:use-module (fibers channels)
+ #:use-module (fibers conditions)
#:use-module (guix-data-service config)
#:export (with-postgresql-connection
+
+ make-postgresql-connection-channel
+ close-postgresql-connection-channel
+ exec-query/through-channel
+ with-postgresql-transaction/through-channel
+
with-postgresql-transaction
check-test-database!
@@ -61,6 +71,98 @@
(lambda (key . args)
(pg-conn-finish conn)))))
+(define* (make-postgresql-connection-channel name
+ #:key
+ (statement-timeout #f)
+ (threads 4))
+ (parameterize (((@@ (fibers internal) current-fiber) #f))
+ (let ((channel (make-channel)))
+ (for-each
+ (lambda _
+ (call-with-new-thread
+ (lambda ()
+ (with-postgresql-connection
+ name
+ (lambda (conn)
+ (let loop ()
+ (match (get-message channel)
+ (((? channel? reply) f (? boolean? allways-rollback?))
+ (put-message
+ reply
+ (with-exception-handler
+ (lambda (exn)
+ (cons 'worker-thread-error exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "postgresql connection thread: exception: ~A\n"
+ exn)
+ (backtrace)
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (with-postgresql-transaction
+ conn
+ (lambda (conn)
+ (f conn))))
+ (lambda vals vals)))))
+ #:unwind? #t))
+ (loop))
+ (((? channel? reply) . (? list? args))
+ (put-message
+ reply
+ (with-exception-handler
+ (lambda (exn)
+ (cons 'worker-thread-error exn))
+ (lambda ()
+ (with-exception-handler
+ (lambda (exn)
+ (simple-format
+ (current-error-port)
+ "postgresql connection thread: exception: ~A\n"
+ exn)
+ (backtrace)
+ (raise-exception exn))
+ (lambda ()
+ (call-with-values
+ (lambda ()
+ (apply exec-query
+ conn
+ args))
+ (lambda vals vals)))))
+ #:unwind? #t))
+ (loop))
+ (_ #f))))
+ #:statement-timeout statement-timeout))))
+ (iota threads))
+ channel)))
+
+(define (close-postgresql-connection-channel channel)
+ (put-message channel #f))
+
+(define (exec-query/through-channel channel . args)
+ (let ((reply (make-channel)))
+ (put-message channel (cons reply args))
+ (match (get-message reply)
+ (('worker-thread-error . exn)
+ (raise-exception exn))
+ (result
+ (apply values result)))))
+
+(define* (with-postgresql-transaction/through-channel channel
+ f
+ #:key always-rollback?)
+ (let ((reply (make-channel)))
+ (put-message channel (list reply f always-rollback?))
+ (match (get-message reply)
+ (('worker-thread-error . exn)
+ (raise-exception exn))
+ (result
+ (apply values result)))))
+
(define* (with-postgresql-transaction conn f
#:key always-rollback?)
(exec-query conn "BEGIN;")