|
4 | 4 |
|
5 | 5 | (defvar *registry* (make-hash-table))
|
6 | 6 |
|
| 7 | +(define-condition unregister (condition) ()) |
| 8 | + |
7 | 9 | (defstruct close-signal)
|
8 | 10 |
|
9 | 11 | (defstruct async-signal
|
|
26 | 28 | (defstruct dispatcher
|
27 | 29 | (name nil :type (or nil symbol keyword))
|
28 | 30 | (workers nil :type list)
|
29 |
| - (openp t :type boolean) |
30 |
| - (lock (bt2:make-lock) :type bt2:lock)) |
| 31 | + (openp t :type boolean)) |
31 | 32 |
|
32 | 33 | (defgeneric resolve-actor (actor)
|
33 | 34 | (:method ((actor dispatcher))
|
|
46 | 47 |
|
47 | 48 | (defmethod run-actor ((actor actor))
|
48 | 49 | "Run main event loop for actor."
|
49 |
| - (loop (let ((sig (q:qpop (actor-queue actor)))) |
50 |
| - (etypecase sig |
51 |
| - (async-signal |
52 |
| - (process-message actor (async-signal-msg sig))) |
53 |
| - (sync-signal |
54 |
| - (process-message actor (sync-signal-msg sig)) |
55 |
| - (bt2:signal-semaphore (sync-signal-sem sig))) |
56 |
| - (close-signal |
57 |
| - (remhash (actor-name actor) *registry*) |
58 |
| - (return-from run-actor)) |
59 |
| - (null |
60 |
| - (bt2:wait-on-semaphore (actor-sem actor))))))) |
| 50 | + (handler-case |
| 51 | + (loop (let ((sig (q:qpop (actor-queue actor)))) |
| 52 | + (etypecase sig |
| 53 | + (async-signal |
| 54 | + (process-message actor (async-signal-msg sig))) |
| 55 | + (sync-signal |
| 56 | + (process-message actor (sync-signal-msg sig)) |
| 57 | + (bt2:signal-semaphore (sync-signal-sem sig))) |
| 58 | + (close-signal |
| 59 | + (signal (make-instance 'unregister))) |
| 60 | + (null |
| 61 | + (bt2:wait-on-semaphore (actor-sem actor)))))) |
| 62 | + (unregister (c) |
| 63 | + (declare (ignore c)) |
| 64 | + (remhash (actor-name actor) *registry*)))) |
61 | 65 |
|
62 | 66 | (defmethod send-signal ((actor actor) sig)
|
63 | 67 | (unless (actor-openp actor)
|
|
69 | 73 | (:documentation "Send a close-signal to an actor.")
|
70 | 74 | (:method ((actor actor))
|
71 | 75 | (send-signal actor (make-close-signal))
|
72 |
| - (setf (actor-openp actor) nil)) |
| 76 | + (setf (actor-openp actor) nil) |
| 77 | + actor) |
73 | 78 | (:method ((actor dispatcher))
|
74 |
| - (remhash (dispatcher-name actor) *registry*) |
75 | 79 | (dolist (worker (dispatcher-workers actor))
|
76 | 80 | (close-actor worker))
|
77 |
| - (setf (dispatcher-openp actor) nil)) |
| 81 | + (setf (dispatcher-openp actor) nil) |
| 82 | + actor) |
78 | 83 | (:method ((actor t))
|
79 | 84 | (close-actor (resolve-actor actor))))
|
80 | 85 |
|
|
84 | 89 | (bt2:join-thread (actor-thread actor))
|
85 | 90 | (apply #'values (actor-store actor)))
|
86 | 91 | (:method ((actor dispatcher))
|
87 |
| - (mapcar #'join-actor (dispatcher-workers actor))) |
| 92 | + (prog1 (mapcar #'join-actor (dispatcher-workers actor)) |
| 93 | + (remhash (dispatcher-name actor) *registry*))) |
88 | 94 | (:method ((actor t))
|
89 | 95 | (join-actor (resolve-actor actor))))
|
90 | 96 |
|
91 | 97 | (defgeneric destroy-actor (actor)
|
92 | 98 | (:documentation "Immediately destroy an actor's thread.")
|
93 | 99 | (:method ((actor actor))
|
94 |
| - (remhash (actor-name actor) *registry*) |
95 |
| - (bt2:destroy-thread (actor-thread actor))) |
| 100 | + (bt2:interrupt-thread (actor-thread actor) |
| 101 | + (lambda () |
| 102 | + (signal (make-instance 'unregister))))) |
96 | 103 | (:method ((actor dispatcher))
|
97 |
| - (remhash (dispatcher-name actor) *registry*) |
98 |
| - (mapcar #'destroy-actor (dispatcher-workers actor))) |
| 104 | + (prog1 (mapcar #'destroy-actor (dispatcher-workers actor)) |
| 105 | + (remhash (dispatcher-name actor) *registry*))) |
99 | 106 | (:method ((actor t))
|
100 | 107 | (destroy-actor (resolve-actor actor))))
|
101 | 108 |
|
102 | 109 | (defun close-and-join-actors (&rest actors)
|
103 |
| - (mapc #'close-actor actors) |
104 |
| - (mapcar #'join-actor actors)) |
| 110 | + (mapcar #'join-actor (mapcar #'close-actor actors))) |
105 | 111 |
|
106 | 112 | (defgeneric send (actor &rest args)
|
107 | 113 | (:documentation "Asyncronously send a message to an actor.")
|
|
0 commit comments