From 7e7117cd73d0ec33de2ea7068d8ce8dcff209156 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20F=2E=20Wittenberger?= Date: Mon, 15 Feb 2016 13:50:13 +0100 Subject: [PATCH] Modified scheduler policy to prefer work load over i/o with adjustible "priority" to reduce latency. --- scheduler.scm | 172 ++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 96 insertions(+), 76 deletions(-) diff --git a/scheduler.scm b/scheduler.scm index 17a7318..8a4a313 100644 --- a/scheduler.scm +++ b/scheduler.scm @@ -30,7 +30,7 @@ (disable-interrupts) (hide ready-queue-head ready-queue-tail ##sys#timeout-list ##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer - remove-from-ready-queue ##sys#unblock-threads-for-i/o + pending-queue ##sys#unblock-threads-for-i/o fdset-set fdset-test create-fdset stderr ##sys#clear-i/o-state-for-thread! ##sys#abandon-mutexes) (not inline ##sys#interrupt-hook) @@ -148,68 +148,93 @@ EOF (syntax-rules () ((_ msg) (##core#inline "C_halt" msg)))) +(define pending-queue '()) + +(define-inline (remove-from-pending-queue) + (and (pair? pending-queue) + (let ((h (car pending-queue))) + (set! pending-queue (cdr pending-queue)) + h))) + +(define-inline (update-pending-queue!) + (set! pending-queue ready-queue-head) + (set! ready-queue-head '()) + (set! ready-queue-tail '())) + +(define ##sys#load-priority 1000) +(define ##sys#schedule-turn 1) + (define (##sys#schedule) - (define (switch thread) - (dbg "switching to " thread) - (set! ##sys#current-thread thread) - (##sys#setslot thread 3 'running) - (##sys#restore-thread-state-buffer thread) - ;;XXX WRONG! this sets the t/i-period ("quantum") for the _next_ thread - (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot thread 9)) - ((##sys#slot thread 1)) ) - (let* ([ct ##sys#current-thread] - [cts (##sys#slot ct 3)] ) - (dbg "==================== scheduling, current: " ct ", ready: " ready-queue-head) + (let* ((ct ##sys#current-thread) + (cts (##sys#slot ct 3)) ) + (dbg "scheduling, current: " ct ", pending: " pending-queue ", ready: " ready-queue-head) (##sys#update-thread-state-buffer ct) ;; Put current thread on ready-queue: (when (or (eq? cts 'running) (eq? cts 'ready)) ; should ct really be 'ready? - normally not. (##sys#setislot ct 13 #f) ; clear timeout-unblock flag (##sys#add-to-ready-queue ct) ) - (let loop1 () - ;; Unblock threads waiting for timeout: - (unless (null? ##sys#timeout-list) - (let ((now (##core#inline_allocate ("C_a_i_current_milliseconds" 4) #f))) - (let loop ((lst ##sys#timeout-list)) - (if (null? lst) - (set! ##sys#timeout-list '()) - (let* ([tmo1 (caar lst)] ; timeout of thread on list - [tto (cdar lst)] ; thread on list - [tmo2 (##sys#slot tto 4)] ) ; timeout value stored in thread - (dbg "timeout: " tto " -> " tmo2 " (now: " now ")") - (if (equal? tmo1 tmo2) ;XXX why do we check this? - (if (fp>= now tmo1) ; timeout reached? - (begin - (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout - (##sys#clear-i/o-state-for-thread! tto) - (##sys#thread-basic-unblock! tto) - (loop (cdr lst)) ) - (begin - (set! ##sys#timeout-list lst) - ;; If there are no threads blocking on a select call (fd-list) - ;; but there are threads in the timeout list then sleep for - ;; the number of milliseconds of next thread to wake up. - (when (and (null? ready-queue-head) - (null? ##sys#fd-list) - (pair? ##sys#timeout-list)) - (let ((tmo1 (caar ##sys#timeout-list))) - (##core#inline - "C_msleep" - (fxmax - 0 - (##core#inline "C_quickflonumtruncate" (fp- tmo1 now)))) ) ) ) ) - (loop (cdr lst)) ) ) ) ) ) ) - ;; Unblock threads blocked by I/O: - (unless (null? ##sys#fd-list) - (##sys#unblock-threads-for-i/o) ) - ;; Fetch and activate next ready thread: - (let loop2 () - (let ([nt (remove-from-ready-queue)]) - (cond [(not nt) - (if (and (null? ##sys#timeout-list) (null? ##sys#fd-list)) - (panic "deadlock") - (loop1) ) ] - [(eq? (##sys#slot nt 3) 'ready) (switch nt)] - [else (loop2)] ) ) ) ) ) ) + + ;; Fetch and activate next ready thread: + (let loop ((nt (remove-from-pending-queue))) + (cond + ((not nt) + ;; Unblock threads blocked by I/O: + (unless (null? ##sys#fd-list) + (let ((turn (##core#inline "C_fixnum_modulo" ##sys#schedule-turn ##sys#load-priority))) + (if (eq? turn 0) + (begin + (set! ##sys#schedule-turn 1) + (##sys#unblock-threads-for-i/o)) + (set! ##sys#schedule-turn (add1 turn))))) + + ;; Unblock threads waiting for timeout: + (unless (null? ##sys#timeout-list) + (dbg "timeout queue " ##sys#timeout-list) + (let ((now (##core#inline_allocate ("C_a_i_current_milliseconds" 4) #f))) + (let loop ((lst ##sys#timeout-list)) + (if (null? lst) + (set! ##sys#timeout-list '()) + (let* ([tmo1 (caar lst)] ; timeout of thread on list + [tto (cdar lst)] ; thread on list + [tmo2 (##sys#slot tto 4)] ) ; timeout value stored in thread + (dbg "timeout: " tto " -> " tmo2 " (now: " now ")") + (if (not (equal? tmo1 tmo2)) (dbg "chicken would loose timeout " tmo1 " since " tto " has " tmo2)) + (if (equal? tmo1 tmo2) ;XXX why do we check this? + (if (fp>= now tmo1) ; timeout reached? + (begin + (##sys#setislot tto 13 #t) ; mark as being unblocked by timeout + (##sys#clear-i/o-state-for-thread! tto) + (##sys#thread-basic-unblock! tto) + (loop (cdr lst)) ) + (set! ##sys#timeout-list lst) ) + (loop (cdr lst)) ) ) ) ) + (if (and (null? ##sys#fd-list) (null? ready-queue-head)) + (if (null? ##sys#timeout-list) + (##sys#signal-hook #:runtime-error "deadlock") + ;; Sleep for the number of milliseconds of next thread + ;; to wake up. + (let ((tmo1 (caar ##sys#timeout-list))) + (##core#inline + "C_msleep" + (fxmax + 0 + (##core#inline "C_quickflonumtruncate" (fp- tmo1 now)))) ))))) + + (update-pending-queue!) + + ;; Nothing to do this turn? Force "turn" to be 0 and do i/i next time. + (if (null? pending-queue) (set! ##sys#schedule-turn ##sys#load-priority)) + + (loop (remove-from-pending-queue)) ) + ((eq? (##sys#slot nt 3) 'ready) + (dbg "switching to " nt) + (set! ##sys#current-thread nt) + (##sys#setslot nt 3 'running) + (##sys#restore-thread-state-buffer nt) + ;;XXX WRONG! this sets the t/i-period ("quantum") for the _next_ thread + (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot nt 9)) + ((##sys#slot nt 1))) + (else (loop (remove-from-pending-queue))) ) ) )) (define ready-queue-head '()) (define ready-queue-tail '()) @@ -224,14 +249,6 @@ EOF (else (set-cdr! ready-queue-tail new-pair)) ) (set! ready-queue-tail new-pair) ) ) -(define (remove-from-ready-queue) - (let ((first-pair ready-queue-head)) - (and (not (null? first-pair)) - (let ((first-cdr (cdr first-pair))) - (set! ready-queue-head first-cdr) - (when (eq? '() first-cdr) (set! ready-queue-tail '())) - (car first-pair) ) ) ) ) - (define (##sys#update-thread-state-buffer thread) (let ([buf (##sys#slot thread 5)]) (##sys#setslot buf 0 ##sys#dynamic-winds) @@ -514,20 +531,23 @@ EOF (cns (lambda (queue arg val init) (cons val init))) (init '())) - (let loop ((l ready-queue-head) (i init)) + (let loop ((l pending-queue) (i init)) (if (pair? l) - (loop (cdr l) (cns 'ready #f (car l) i)) - (let loop ((l ##sys#fd-list) (i i)) + (loop (cdr l) (cns 'ready #f (car l) i)) ;; maybe signal as 'pending? + (let loop ((l ready-queue-head) (i i)) (if (pair? l) - (loop (cdr l) - (let ((fd (caar l))) - (let loop ((l (cdar l))) - (if (null? l) i - (cns 'i/o fd (car l) (loop (cdr l))))))) - (let loop ((l ##sys#timeout-list) (i i)) + (loop (cdr l) (cns 'ready #f (car l) i)) + (let loop ((l ##sys#fd-list) (i i)) (if (pair? l) - (loop (cdr l) (cns 'timeout (caar l) (cdar l) i)) - i))))))) + (loop (cdr l) + (let ((fd (caar l))) + (let loop ((l (cdar l))) + (if (null? l) i + (cns 'i/o fd (car l) (loop (cdr l))))))) + (let loop ((l ##sys#timeout-list) (i i)) + (if (pair? l) + (loop (cdr l) (cns 'timeout (caar l) (cdar l) i)) + i))))))))) ;;; Remove all waiting threads from the relevant queues with the exception of the current thread: -- 2.6.2