Skip to content

Commit 44cc774

Browse files
committed
dispatcher work
1 parent 51cb016 commit 44cc774

File tree

4 files changed

+55
-6
lines changed

4 files changed

+55
-6
lines changed

package.lisp

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
(defpackage #:thespis
22
(:use #:cl)
3+
(:local-nicknames
4+
(#:a #:alexandria))
35
(:export
46
;; thespis.lisp
57
#:actor ; STRUCT

test/dispatcher.lisp

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
(fiasco:define-test-package #:thespis/test/dispatcher
2+
(:use #:thespis))
3+
(in-package #:thespis/test/dispatcher)
4+
5+
(defun ~= (a b)
6+
(>= 2 (abs (- a b))))
7+
8+
(deftest test-dispatcher-balancing ()
9+
"Make sure all the workers share."
10+
(define-actor counter ((c 0)) (increment)
11+
(sleep 0.025)
12+
(incf c increment))
13+
14+
(let ((actor (counter :workers 2)))
15+
(dotimes (i 40) (send actor 1))
16+
(close-actor actor)
17+
(let ((stores (join-actor actor)))
18+
(is (~= (first stores) (second stores)))))
19+
20+
(define-actor sleeper ((c 0)) (time)
21+
(sleep time)
22+
(incf c))
23+
24+
(let ((actor (sleeper :workers 2)))
25+
(dotimes (i 5)
26+
(send (first (thespis::dispatcher-workers actor)) 0.2))
27+
(dotimes (i 20) (sleep 0.01) (send actor 0))
28+
(is (equal '((5 20)) (close-and-join-actors actor)))))
29+
30+
(deftest test-dispatcher-registry ()
31+
(define-actor counter ((c 0)) (increment)
32+
(incf c increment))
33+
34+
(counter :name :my-counter :workers 2)
35+
(send :my-counter 1)
36+
(is (ask :my-counter 1))
37+
(close-actor :my-counter))

thespis.asd

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
:author "garlic0x1"
33
:description "Threaded actors for Common Lisp"
44
:license "MIT"
5-
:depends-on (#:bordeaux-threads #:queues.simple-cqueue)
5+
:depends-on (#:alexandria #:bordeaux-threads #:queues.simple-cqueue)
66
:components ((:file "package")
77
(:file "thespis"))
88
:in-order-to ((test-op (test-op #:thespis/test))))

thespis.lisp

+15-5
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626
(defstruct dispatcher
2727
(name nil :type (or nil symbol keyword))
2828
(workers nil :type list)
29+
(openp t :type boolean)
2930
(lock (bt2:make-lock) :type bt2:lock))
3031

3132
(defgeneric resolve-actor (actor)
3233
(:method ((actor dispatcher))
33-
(reduce #'min
34-
(dispatcher-workers actor)
35-
:initial-value -1
36-
:key (lambda (actor) (q:qsize (actor-queue actor)))))
34+
(a:extremum (dispatcher-workers actor)
35+
#'<
36+
:key (lambda (actor) (q:qsize (actor-queue actor)))))
3737
(:method ((actor symbol))
3838
(gethash actor *registry*)))
3939

@@ -70,6 +70,11 @@
7070
(:method ((actor actor))
7171
(send-signal actor (make-close-signal))
7272
(setf (actor-openp actor) nil))
73+
(:method ((actor dispatcher))
74+
(remhash (dispatcher-name actor) *registry*)
75+
(dolist (worker (dispatcher-workers actor))
76+
(close-actor worker))
77+
(setf (dispatcher-openp actor) nil))
7378
(:method ((actor t))
7479
(close-actor (resolve-actor actor))))
7580

@@ -78,6 +83,8 @@
7883
(:method ((actor actor))
7984
(bt2:join-thread (actor-thread actor))
8085
(apply #'values (actor-store actor)))
86+
(:method ((actor dispatcher))
87+
(mapcar #'join-actor (dispatcher-workers actor)))
8188
(:method ((actor t))
8289
(join-actor (resolve-actor actor))))
8390

@@ -86,12 +93,15 @@
8693
(:method ((actor actor))
8794
(remhash (actor-name actor) *registry*)
8895
(bt2:destroy-thread (actor-thread actor)))
96+
(:method ((actor dispatcher))
97+
(remhash (dispatcher-name actor) *registry*)
98+
(mapcar #'destroy-actor (dispatcher-workers actor)))
8999
(:method ((actor t))
90100
(destroy-actor (resolve-actor actor))))
91101

92102
(defun close-and-join-actors (&rest actors)
93103
(mapc #'close-actor actors)
94-
(mapc #'join-actor actors))
104+
(mapcar #'join-actor actors))
95105

96106
(defgeneric send (actor &rest args)
97107
(:documentation "Asyncronously send a message to an actor.")

0 commit comments

Comments
 (0)