From 3f2306aaec36a6c7b7dca49e15bca03baf73a7fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20F=2E=20Wittenberger?= Date: Fri, 19 Feb 2016 22:26:49 +0100 Subject: [PATCH] Some refactoring. * some predicates whose scetchy implementation may soon change given decriptive names * `unblock-threads-for-timeout!` moved out of `schedule` - the former handles the timeout queue, the latter the scheduling policy * `thread-unblock!` to become the "clean unblock" procedure * `##sys#thread-clear-blocking-state!` to handle all blocking reasons at a central spot --- scheduler.scm | 180 +++++++++++++++++++++++++++++++++------------------------- srfi-18.scm | 20 ++----- 2 files changed, 110 insertions(+), 90 deletions(-) diff --git a/scheduler.scm b/scheduler.scm index 245f400..feaac28 100644 --- a/scheduler.scm +++ b/scheduler.scm @@ -32,7 +32,7 @@ ##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer pending-queue ##sys#unblock-threads-for-i/o fdset-set fdset-test create-fdset stderr - ##sys#clear-i/o-state-for-thread! ##sys#abandon-mutexes) + ##sys#thread-clear-blocking-state! ##sys#abandon-mutexes) (not inline ##sys#interrupt-hook) (unsafe) (foreign-declare #< " tmo2 " (now: " now ")") - (if (not (equal? tmo1 tmo2)) (dbg "chicken would loose timeout " tmo1 " since " tto " has " tmo2)) - (if (equal? tmo1 tmo2) ;XXX why do we check this? - (if (fp>= now tmo1) ; timeout reached? - (begin - (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout - (##sys#clear-i/o-state-for-thread! tto) - (##sys#thread-basic-unblock! tto) - (loop (cdr lst)) ) - (set! ##sys#timeout-list lst) ) - (loop (cdr lst)) ) ) ) ) - (if (and (null? ##sys#fd-list) (ready-queue-empty?)) - (if (null? ##sys#timeout-list) - (##sys#signal-hook #:runtime-error "deadlock") - ;; Sleep for the number of milliseconds of next thread - ;; to wake up. - (let ((tmo1 (caar ##sys#timeout-list))) - (##core#inline - "C_msleep" - (fxmax - 0 - (##core#inline "C_quickflonumtruncate" (fp- tmo1 now)))) ))))) + (unless (##sys#timeout-list-empty?) + (##sys#unblock-threads-for-timeout!)) (update-pending-queue!) @@ -314,6 +304,42 @@ dunno what to do (set! ##sys#timeout-list r)) (loop r l)))))) +(define-inline (##sys#thread-basic-unblock! t) + (dbg "unblocking: " t) + (##sys#add-to-ready-queue t) ) + +(define (##sys#unblock-threads-for-timeout!) + (dbg "timeout queue " ##sys#timeout-list) + (let ((now (##core#inline_allocate ("C_a_i_current_milliseconds" 4) #f))) + (let loop ((lst ##sys#timeout-list)) + (if (null? lst) + (set! ##sys#timeout-list '()) + (let* ([tmo1 (caar lst)] ; timeout of thread on list + [tto (cdar lst)] ; thread on list + [tmo2 (##sys#slot tto 4)] ) ; timeout value stored in thread + (dbg "timeout: " tto " -> " tmo2 " (now: " now ")") + (if (not (equal? tmo1 tmo2)) (dbg "chicken would loose timeout " tmo1 " since " tto " has " tmo2)) + (if (equal? tmo1 tmo2) ;XXX why do we check this? + (if (fp>= now tmo1) ; timeout reached? + (begin + (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout + (##sys#thread-clear-blocking-state! tto) + (##sys#thread-basic-unblock! tto) + (loop (cdr lst)) ) + (set! ##sys#timeout-list lst) ) + (loop (cdr lst)) ) ) ) ) + (if (and (##sys#fd-list-empty?) (ready-queue-empty?)) + (if (##sys#timeout-list-empty?) + (##sys#signal-hook #:runtime-error "deadlock") + ;; Sleep for the number of milliseconds of next thread + ;; to wake up. + (let ((tmo1 (caar ##sys#timeout-list))) + (##core#inline + "C_msleep" + (fxmax + 0 + (##core#inline "C_quickflonumtruncate" (fp- tmo1 now)))) ))))) + (define (##sys#thread-block-for-timeout! t tm) (dbg t " blocks for timeout " tm) (unless (flonum? tm) ; to catch old code that uses fixnum timeouts @@ -343,6 +369,7 @@ dunno what to do (define (##sys#abandon-mutexes thread) (let ((ms (##sys#slot thread 8))) (unless (null? ms) + (dbg thread " abandons " (map mutex-name ms)) ;; This may be worth an optional runtime warning (##sys#for-each (lambda (m) (##sys#setislot m 2 #f) @@ -361,17 +388,9 @@ dunno what to do (define (##sys#thread-kill! t s) (dbg "killing: " t " -> " s ", recipients: " (##sys#slot t 12)) (##sys#abandon-mutexes t) - (let ((blocked (##sys#slot t 11))) - (cond - ((##sys#structure? blocked 'condition-variable) - (##sys#setslot blocked 2 (##sys#delq t (##sys#slot blocked 2)))) - ((##sys#structure? blocked 'thread) - (##sys#setslot blocked 12 (##sys#delq t (##sys#slot blocked 12))))) ) (##sys#remove-from-timeout-list t) - (##sys#clear-i/o-state-for-thread! t) + (##sys#thread-clear-blocking-state! t) (##sys#setslot t 3 s) - (##sys#setislot t 4 #f) - (##sys#setislot t 11 #f) (##sys#setislot t 8 '()) (let ((rs (##sys#slot t 12))) (unless (null? rs) @@ -379,15 +398,9 @@ dunno what to do (lambda (t2) (dbg " checking: " t2 " (" (##sys#slot t2 3) ") -> " (##sys#slot t2 11)) (when (eq? (##sys#slot t2 11) t) - (##sys#thread-basic-unblock! t2) ) ) - rs) ) ) - (##sys#setislot t 12 '()) ) - -(define (##sys#thread-basic-unblock! t) - (dbg "unblocking: " t) - (##sys#setislot t 11 #f) ; (FD . RWFLAGS) | # | # - (##sys#setislot t 4 #f) - (add-to-ready-queue t) ) + (##sys#thread-unblock! t2) ) ) + rs) + (##sys#setislot t 12 '()) ) ) ) (define (##sys#default-exception-handler arg) (let ([ct ##sys#current-thread]) @@ -471,7 +484,7 @@ dunno what to do (define (##sys#unblock-threads-for-i/o) (dbg "fd-list: " ##sys#fd-list) (create-fdset) - (let* ((to? (pair? ##sys#timeout-list)) + (let* ((to? (not (##sys#timeout-list-empty?))) (rq? (ready-queue-not-empty?)) (tmo (if (and to? (not rq?)) ; no thread was unblocked by timeout, so wait (let* ((tmo1 (caar ##sys#timeout-list)) @@ -527,26 +540,6 @@ dunno what to do (cons a (loop n (add1 pos) (cdr lst))) ) ) ) ) ) ] ))) ) -;;; Clear I/O state for unblocked thread - -(define (##sys#clear-i/o-state-for-thread! t) - (when (pair? (##sys#slot t 11)) - (let ((fd (car (##sys#slot t 11)))) - (set! ##sys#fd-list - (let loop ((lst ##sys#fd-list)) - (if (null? lst) - '() - (let* ((a (car lst)) - (fd2 (car a)) ) - (if (eq? fd fd2) - (let ((ts (##sys#delq t (cdr a)))) ; remove from fd-list entry - (cond ((null? ts) (cdr lst)) - (else - (##sys#setslot a 1 ts) ; fd-list entry is list with t removed - lst) ) ) - (cons a (loop (cdr lst))))))))))) - - ;;; Get list of all threads that are ready or waiting for timeout or waiting for I/O: ; ; (contributed by Joerg Wittenberger) @@ -593,21 +586,56 @@ dunno what to do (set! ##sys#fd-list (##sys#slot vec 2)) (set! ##sys#timeout-list (##sys#slot vec 3)) ) +;;; Clear blocking queues + +(define (##sys#thread-clear-blocking-state! t) + (let ([blocked (##sys#slot t 11)]) +(dbg "clear-blocking " t " from " blocked) + (cond + ((pair? blocked) + (let* ((fd (##sys#slot blocked 0)) + (entry (fd-list-lookup ##sys#fd-list fd))) + (if entry + (let ((ts (##sys#delq t (int-priority-queue-value entry)))) ; remove from fd-list entry + (cond ((null? ts) + ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error) + (fdset-delfd fd) + (##sys#fd-list-clear-entry! entry)) ; no more threads waiting for this fd + (else + (int-priority-queue-value-set! entry ts)) ) ) + (begin +#| + (define stderr ##sys#standard-error) + (display "##sys#thread-clear-blocking-state! thread " stderr) + (display t stderr) + (display " blocked on unregistered fd " stderr) + (display fd stderr) + (newline stderr) +|# + (fdset-clear fd))))) + ((##sys#structure? blocked 'condition-variable) + (##sys#setslot blocked 2 (##sys#delq t (##sys#slot blocked 2)))) + ((##sys#structure? blocked 'mutex) + (##sys#setslot blocked 3 (##sys#delq t (##sys#slot blocked 3)))) + ((##sys#structure? blocked 'thread) + (##sys#setslot blocked 12 (##sys#delq t (##sys#slot blocked 12))))) + (##sys#setislot t 11 #f))) ;;; Unblock thread cleanly: (define (##sys#thread-unblock! t) - (when (or (eq? 'blocked (##sys#slot t 3)) - (eq? 'sleeping (##sys#slot t 3))) + (when (or (eq? 'blocked (##sys#slot t 3)) (eq? 'sleeping (##sys#slot t 3))) (##sys#remove-from-timeout-list t) - (##sys#clear-i/o-state-for-thread! t) + (##sys#thread-clear-blocking-state! t) (##sys#thread-basic-unblock! t) ) ) ;;; Kill all threads in fd-, io- and timeout-lists and assign one thread as the ; new primordial one. Overrides "##sys#kill-all-threads" in library.scm. -(set! ##sys#kill-other-threads +;; FIXME: this is temporarily broken (does not exist in askemos's branch). + +#;(set! ##sys#kill-other-threads (let ((exit exit)) (lambda (thunk) (let ((primordial ##sys#current-thread)) diff --git a/srfi-18.scm b/srfi-18.scm index 5111531..4128068 100644 --- a/srfi-18.scm +++ b/srfi-18.scm @@ -412,7 +412,7 @@ [t0s (##sys#slot t0 3)] ) (##sys#setslot cvar 2 (##sys#slot ts 1)) (when (or (eq? t0s 'blocked) (eq? t0s 'sleeping)) - (##sys#thread-basic-unblock! t0) ) ) ) ) ) ;; TBD + (##sys#thread-unblock! t0)) ) ) ) ) (define (condition-variable-broadcast! cvar) (##sys#check-structure cvar 'condition-variable 'condition-variable-broadcast!) @@ -421,34 +421,26 @@ (lambda (ti) (let ([tis (##sys#slot ti 3)]) (when (or (eq? tis 'blocked) (eq? tis 'sleeping)) - (##sys#thread-basic-unblock! ti) ) ) ) ;; TBD + (##sys#thread-unblock! ti) ) ) ) (##sys#slot cvar 2) ) (##sys#setislot cvar 2 '()) ) ;;; Change continuation of thread to signal an exception: -(define (thread-signal! thread exn) ;; TBD +(define (thread-signal! thread exn) (##sys#check-structure thread 'thread 'thread-signal!) (dbg "signal " thread exn) (if (eq? thread ##sys#current-thread) (##sys#signal exn) - (let ([old (##sys#slot thread 1)] - [blocked (##sys#slot thread 11)]) - (cond - ((##sys#structure? blocked 'condition-variable) - (##sys#setslot blocked 2 (##sys#delq thread (##sys#slot blocked 2)))) - ((##sys#structure? blocked 'mutex) - (##sys#setslot blocked 3 (##sys#delq thread (##sys#slot blocked 3)))) - ((##sys#structure? blocked 'thread) - (##sys#setslot blocked 12 (##sys#delq thread (##sys#slot blocked 12))))) + (let ((old (##sys#slot thread 1))) (##sys#setslot thread 1 (lambda () (##sys#signal exn) (old) ) ) - (##sys#setslot thread 3 'blocked) - (##sys#thread-unblock! thread) ) ) ) + (##sys#setslot thread 3 'blocked) ;; FIXME: why require this in ##sys#thread-unlock! ? + (##sys#thread-unblock! thread)) ) ) ;;; Don't block in the repl: (by Chris Double) -- 2.6.2