|
23 | 23 | (sem (bt2:make-semaphore) :type bt2:semaphore)
|
24 | 24 | (thread nil :type (or null bt2:thread)))
|
25 | 25 |
|
26 |
| -(defmethod resolve-actor ((actor symbol)) |
27 |
| - (gethash actor *registry*)) |
| 26 | +(defstruct dispatcher |
| 27 | + (name nil :type (or nil symbol keyword)) |
| 28 | + (workers nil :type list) |
| 29 | + (lock (bt2:make-lock) :type bt2:lock)) |
| 30 | + |
| 31 | +(defgeneric resolve-actor (actor) |
| 32 | + (:method ((actor dispatcher)) |
| 33 | + (reduce #'min |
| 34 | + (dispatcher-workers actor) |
| 35 | + :initial-value -1 |
| 36 | + :key (lambda (actor) (q:qsize (actor-queue actor))))) |
| 37 | + (:method ((actor symbol)) |
| 38 | + (gethash actor *registry*))) |
28 | 39 |
|
29 | 40 | (defmethod process-message ((actor actor) msg)
|
30 | 41 | (let ((*self* actor))
|
@@ -109,16 +120,23 @@ is the code to handle messages."
|
109 | 120 | (defun ,behav ,args
|
110 | 121 | ,@(mapcar (lambda (pair) `(declare (special ,(car pair)))) state)
|
111 | 122 | ,@body)
|
112 |
| - (defun ,name (&key name) |
| 123 | + (defun ,name (&key name workers) |
113 | 124 | (when (gethash name *registry*)
|
114 | 125 | (error "Actor named ~a already exists." name))
|
115 |
| - (let ((actor (make-actor :behav ',behav :name name))) |
116 |
| - (setf (actor-thread actor) |
117 |
| - (bt2:make-thread |
118 |
| - (lambda () |
119 |
| - (let ,state |
120 |
| - ,@(mapcar (lambda (pair) `(declare (special ,(car pair)))) state) |
121 |
| - (run-actor actor))))) |
122 |
| - (if name |
123 |
| - (setf (gethash name *registry*) actor) |
124 |
| - actor)))))) |
| 126 | + (if workers |
| 127 | + (let ((dispatcher (make-dispatcher :name name))) |
| 128 | + (dotimes (i workers) |
| 129 | + (push (,name) (dispatcher-workers dispatcher))) |
| 130 | + (if name |
| 131 | + (setf (gethash name *registry*) dispatcher) |
| 132 | + dispatcher)) |
| 133 | + (let ((actor (make-actor :behav ',behav :name name))) |
| 134 | + (setf (actor-thread actor) |
| 135 | + (bt2:make-thread |
| 136 | + (lambda () |
| 137 | + (let ,state |
| 138 | + ,@(mapcar (lambda (pair) `(declare (special ,(car pair)))) state) |
| 139 | + (run-actor actor))))) |
| 140 | + (if name |
| 141 | + (setf (gethash name *registry*) actor) |
| 142 | + actor))))))) |
0 commit comments