Skip to content

Commit

Permalink
Discarding executor. Simulation pass. Shared single threaded executors.
Browse files Browse the repository at this point in the history
  • Loading branch information
borodust committed Dec 4, 2016
1 parent 3cde972 commit dc66999
Show file tree
Hide file tree
Showing 21 changed files with 157 additions and 88 deletions.
2 changes: 1 addition & 1 deletion audio/buffer.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@


(define-destructor audio-buffer ((id id-of) (sys system-of))
(-> (sys)
(-> (sys :priority :low)
(al:delete-buffer id)))


Expand Down
1 change: 1 addition & 0 deletions cl-bodge.asd
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
(:file "misc")
(:file "node")
(:file "scene")
(:file "simulation")
(:file "rendering")
(:file "transformations")
(:file "animation")
Expand Down
31 changes: 21 additions & 10 deletions engine/concurrency/dispatch.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,28 @@
*env*))


(defgeneric dispatch (dispatcher task &optional priority))
(defgeneric dispatch (dispatcher task &key priority))

(defmethod dispatch :around (dispatcher (fn function) &optional priority)
(call-next-method dispatcher
(lambda ()
(let ((*active-dispatcher* dispatcher))
(funcall fn)))
priority))
(defmethod dispatch :around (dispatcher (fn function) &key (priority :medium))
(flet ((wrapped ()
(let ((*active-dispatcher* dispatcher))
(funcall fn))))
(call-next-method dispatcher #'wrapped :priority priority)))


(defmacro within-new-thread-waiting (thread-name &body body)
(defmethod dispatch (dispatcher fn &key (priority :medium))
(declare (ignore priority))
nil)


(defmacro in-new-thread (thread-name &body body)
`(bt:make-thread
(lambda ()
(progn
,@body))
:name ,(format nil "~a" thread-name)))

(defmacro in-new-thread-waiting (thread-name &body body)
(with-gensyms (latch)
`(wait-with-latch (,latch)
(bt:make-thread
Expand Down Expand Up @@ -73,7 +84,7 @@
(package-name (symbol-package name)) name))


(defmacro -> (&environment env (dispatcher &optional (priority :medium)) &body body)
(defmacro -> (&environment env (dispatcher &rest keys) &body body)
(with-gensyms (fn r)
(let ((transformed (traverse-dispatch-body `(progn ,@body) env)))
`(flet ((,fn ()
Expand All @@ -85,4 +96,4 @@
(funcall #',*active-callback-name* ,r)))
transformed)
transformed)))
(dispatch ,dispatcher #',fn ,priority)))))
(dispatch ,dispatcher #',fn ,@keys)))))
38 changes: 24 additions & 14 deletions engine/concurrency/execution.lisp
Original file line number Diff line number Diff line change
@@ -1,32 +1,28 @@
(in-package :cl-bodge.concurrency)


(define-constant +default-queue-size+ 256)
(define-constant +default-queue-size+ 1024)


(define-constant +default-pool-size+ 4)


(defgeneric execute (executor task &optional priority))
(defgeneric execute (executor task &key priority))


(defclass simple-executor (disposable)
(defclass generic-executor (disposable)
((queue :initform nil :reader task-queue-of)))


(defmethod initialize-instance :after ((this simple-executor) &key queue-size)
(defmethod initialize-instance :after ((this generic-executor) &key queue-size)
(with-slots (queue) this
(setf queue (make-blocking-queue queue-size))))


(define-destructor simple-executor (queue)
(define-destructor generic-executor (queue)
(interrupt queue))


(definline make-simple-executor (&optional queue-size)
(make-instance 'simple-executor :queue-size queue-size))


(defun run (executor)
(block interruptible
(loop
Expand All @@ -41,15 +37,29 @@
(funcall (pop-from (task-queue-of executor))))))))


(defmethod execute ((this simple-executor) (task function) &optional (priority :medium))
(defclass blocking-executor (generic-executor) ())

(defmethod execute ((this blocking-executor) (task function) &key (priority :medium))
(put-into (task-queue-of this) task priority))

(definline make-blocking-executor (&optional queue-size)
(make-instance 'blocking-executor :queue-size queue-size))


(defclass discarding-executor (generic-executor) ())

(defmethod execute ((this discarding-executor) (task function) &key (priority :medium))
(try-put-replacing (task-queue-of this) task priority))

(definline make-discarding-executor (&optional queue-size)
(make-instance 'discarding-executor :queue-size queue-size))


;;;
;;;
;;;
(defclass single-threaded-executor (disposable)
((executor :initform (make-simple-executor +default-queue-size+))))
((executor :initform (make-discarding-executor +default-queue-size+))))


(defmethod initialize-instance :after ((this single-threaded-executor) &key special-variables)
Expand All @@ -68,9 +78,9 @@
(make-instance 'single-threaded-executor :special-variables special-variables))


(defmethod execute ((this single-threaded-executor) (task function) &optional (priority :medium))
(defmethod execute ((this single-threaded-executor) (task function) &key (priority :medium))
(with-slots (executor) this
(execute executor task priority)))
(execute executor task :priority priority)))

;;;
;;;
Expand All @@ -93,6 +103,6 @@
(make-instance 'pooled-executor :size size))


(defmethod execute ((this pooled-executor) (task function) &optional (priority :medium))
(defmethod execute ((this pooled-executor) (task function) &key (priority :medium))
(with-slots (pool) this
(push-to-pool pool task priority)))
16 changes: 8 additions & 8 deletions engine/concurrency/transform-dispatch.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@
(flet ((,return-funame (&optional ,r)
(declare (ignore ,r))
;; fixme : propagate priority
(dispatch ,d (lambda () ,(generate-calling-code cont nil)) :medium)))
(dispatch ,d (lambda () ,(generate-calling-code cont nil)) :priority :medium)))
,@(call-next-method this (make-function-name-null-continuation return-funame)))))))


Expand All @@ -217,7 +217,7 @@
`(let ((,d *active-dispatcher*))
(flet ((,return-funame (&optional ,r)
;; fixme : propagate priority
(dispatch ,d (lambda () ,(generate-calling-code cont r)) :medium)))
(dispatch ,d (lambda () ,(generate-calling-code cont r)) :priority :medium)))
,@(call-next-method this
(make-function-name-result-continuation return-funame)))))))

Expand Down Expand Up @@ -339,7 +339,7 @@
`(flet ((,cb (,r)
(declare (ignore ,r))
;; fixme : propagate priority
(dispatch ,d #'(lambda () ,(generate-calling-code cont nil)) :medium)))
(dispatch ,d #'(lambda () ,(generate-calling-code cont nil)) :priority :medium)))
,(macroexpand-1 (car forms) *env*))
`(let ((,n ,count))
(flet ((,cb (,r)
Expand All @@ -349,7 +349,7 @@
(decf ,n)
(when (= ,n 0)
,(generate-calling-code cont nil)))
:medium))) ; fixme : propagate priority
:priority :medium))) ; fixme : propagate priority
,@(loop for form/d in forms collect
(macroexpand-1 form/d *env*))))))))))

Expand All @@ -362,7 +362,7 @@
,(if (null (rest forms))
`(flet ((,cb (,r)
;; fixme : propagate priority
(dispatch ,d #'(lambda () ,(generate-calling-code cont r)) :medium)))
(dispatch ,d #'(lambda () ,(generate-calling-code cont r)) :priority :medium)))
,(macroexpand-1 (car forms) *env*))
`(let ((,n ,count)
(,results '()))
Expand All @@ -373,7 +373,7 @@
(push ,r ,results)
(when (= ,n 0)
,(generate-calling-code cont `(nreverse ,results))))
:medium))) ; fixme : propagate priority
:priority :medium))) ; fixme : propagate priority
,@(loop for form/d in forms collect
(macroexpand-1 form/d *env*))))))))))

Expand All @@ -394,7 +394,7 @@
(declare (ignore ,r))
;; fixme : propagate priority
(dispatch ,(active-dispatcher-name-for codegen)
#'(lambda () ,(generate-calling-code cont nil)) :medium))))
#'(lambda () ,(generate-calling-code cont nil)) :priority :medium))))

(defmethod cc-callback-gen ((codegen wait-for*-gen) (cont result-cont) cb epilogue)
(with-gensyms (r)
Expand All @@ -403,7 +403,7 @@
(setf ,r (car ,r)))
;; fixme : propagate priority
(dispatch ,(active-dispatcher-name-for codegen)
#'(lambda () ,(generate-calling-code cont r)) :medium))))
#'(lambda () ,(generate-calling-code cont r)) :priority :medium))))


(defmethod chain-callback-gen ((this wait-for*-gen) group name last-cb result-required-p)
Expand Down
42 changes: 29 additions & 13 deletions engine/engine.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
(defclass bodge-engine ()
((systems :initform nil)
(properties :initform '())
(shared-executor :initform nil)
(shared-executors :initform nil)
(shared-pool :initform nil)
(disabling-order :initform '())))

(defvar *engine* (make-instance 'bodge-engine))
Expand Down Expand Up @@ -71,11 +72,12 @@


(defun startup (properties-pathspec)
(within-new-thread-waiting "startup-worker"
(with-slots (systems properties disabling-order shared-executor) *engine*
(in-new-thread-waiting "startup-worker"
(with-slots (systems properties disabling-order shared-pool shared-executors) *engine*
(setf properties (%load-properties properties-pathspec)
shared-executor (make-pooled-executor
(property :engine-shared-executor-pool-size 4)))
shared-pool (make-pooled-executor
(property :engine-shared-pool-size 2))
shared-executors (list (make-single-threaded-executor)))
(let ((system-class-names
(property :systems (lambda ()
(error ":systems property should be defined")))))
Expand All @@ -84,31 +86,45 @@


(defun shutdown ()
(within-new-thread-waiting "shutdown-worker"
(with-slots (systems disabling-order shared-executor) *engine*
(in-new-thread-waiting "shutdown-worker"
(with-slots (systems disabling-order shared-pool shared-executors) *engine*
(loop for system-class in disabling-order do
(log:debug "Disabling ~a" system-class)
(disable (gethash system-class systems)))
(dispose shared-executor))))
(dispose shared-pool)
(dolist (ex shared-executors)
(dispose ex)))))


(defun acquire-executor (&rest args &key (single-threaded-p nil) (exclusive-p nil)
(special-variables nil))
(with-slots (shared-executor) *engine*
(with-slots (shared-pool shared-executors) *engine*
(cond
((and (not exclusive-p) (not single-threaded-p) (not special-variables))
shared-executor)
((or exclusive-p single-threaded-p)
shared-pool)
((and exclusive-p single-threaded-p)
(make-single-threaded-executor special-variables))
((and single-threaded-p (not exclusive-p) (not special-variables))
(first shared-executors))
(t (error "Cannot provide executor for combination of requirements: ~a" args)))))


(defun shared-executor-p (executor shared-executors)
(some (lambda (shared) (eq shared executor)) shared-executors))


(defun release-executor (executor)
(with-slots (shared-executor) *engine*
(unless (eq executor shared-executor)
(with-slots (shared-pool shared-executors) *engine*
(unless (or (shared-executor-p executor shared-executors)
(eq executor shared-pool))
(dispose executor))))


(defmethod dispatch ((this bodge-engine) (task function) &key priority)
(with-slots (shared-pool) this
(execute shared-pool task :priority priority)
t))

;;
(defgeneric system-of (obj))

Expand Down
3 changes: 2 additions & 1 deletion engine/packages.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
make-single-threaded-executor
make-pooled-executor

within-new-thread-waiting
in-new-thread
in-new-thread-waiting

->
dispatch
Expand Down
36 changes: 25 additions & 11 deletions engine/thread-bound-system.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

(defclass thread-bound-system (enableable generic-system)
((executor :initform nil :accessor %executor-of)
(context :initform nil)))
(context :initform nil :reader system-context-of)))


(defgeneric make-system-context (system)
Expand All @@ -20,32 +20,46 @@
(:method (context system) (declare (ignore context system))))


(defmethod dispatch ((this thread-bound-system) fn &optional (priority :medium))
(execute (%executor-of this) fn priority))
(defmethod dispatch ((this thread-bound-system) fn &key (priority :medium))
(unless (call-next-method)
(flet ((invoker ()
(log-errors
(let ((*system-context* (system-context-of this))
(*system* this))
(funcall fn)))))
(execute (%executor-of this) #'invoker :priority priority))
t))


(defgeneric acquire-system-executor (system)
(:method ((this thread-bound-system))
(acquire-executor :single-threaded-p t :exclusive-p t)))


(defgeneric release-system-executor (system executor)
(:method ((this thread-bound-system) executor)
(release-executor executor)))


(defmethod enable ((this thread-bound-system))
(call-next-method)
(setf (%executor-of this)
(acquire-executor :single-threaded-p t :exclusive-p t
:special-variables '(*system-context*
*system*)))
(setf (%executor-of this) (acquire-system-executor this))
(wait-with-latch (latch)
(execute (%executor-of this)
(lambda ()
(log-errors
(setf *system-context* (make-system-context this)
*system* this)
(with-slots (context) this
(setf context (make-system-context this)))
(open-latch latch))))))


(defmethod disable ((this thread-bound-system))
(wait-with-latch (latch)
(execute (%executor-of this)
(lambda ()
(destroy-system-context *system-context* this)
(destroy-system-context this (system-context-of this))
(open-latch latch))))
(release-executor (%executor-of this))
(release-system-executor this (%executor-of this))
(call-next-method))


Expand Down
2 changes: 1 addition & 1 deletion graphics/buffers.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


(define-destructor buffer ((id id-of) (sys system-of))
(-> (sys)
(-> (sys :priority :low)
(gl:delete-buffers (list id))))


Expand Down
Loading

0 comments on commit dc66999

Please sign in to comment.