1
- use std:: sync:: Arc ;
2
1
use actor_core_client:: { self as actor_core_rs} ;
3
2
use futures_util:: FutureExt ;
4
3
use pyo3:: { prelude:: * , types:: { PyList , PyString , PyTuple } } ;
5
- use tokio:: sync:: { mpsc, Mutex } ;
4
+ use tokio:: sync:: mpsc;
6
5
7
6
use crate :: util;
8
7
8
+ const EVENT_BUFFER_SIZE : usize = 100 ;
9
+
9
10
struct ActorEvent {
10
11
name : String ,
11
12
args : Vec < serde_json:: Value > ,
12
13
}
13
14
14
- pub struct InnerActorData {
15
- event_tx : Mutex < Option < mpsc:: Sender < ActorEvent > > > ,
16
- }
17
-
18
- impl InnerActorData {
19
- pub fn new ( ) -> Arc < Self > {
20
- Arc :: new ( Self {
21
- event_tx : Mutex :: new ( None ) ,
22
- } )
23
- }
15
+ #[ pyclass]
16
+ pub struct ActorHandle {
17
+ handle : actor_core_rs:: handle:: ActorHandle ,
18
+ event_rx : Option < mpsc:: Receiver < ActorEvent > > ,
19
+ event_tx : mpsc:: Sender < ActorEvent > ,
24
20
}
25
21
26
- impl InnerActorData {
27
- pub async fn on_event (
28
- & self ,
29
- event_name : String ,
30
- args : & Vec < serde_json:: Value >
31
- ) {
32
- let tx = & self . event_tx . lock ( ) . await ;
33
- let Some ( tx) = tx. as_ref ( ) else {
34
- return ;
35
- } ;
36
-
37
- tx. send ( ActorEvent {
38
- name : event_name,
39
- args : args. clone ( ) ,
40
- } ) . await . map_err ( |e| {
41
- py_runtime_err ! (
42
- "Failed to send via inner tx: {}" ,
43
- e
44
- )
45
- } ) . ok ( ) ;
22
+ impl ActorHandle {
23
+ pub fn new ( handle : actor_core_rs:: handle:: ActorHandle ) -> Self {
24
+ let ( event_tx, event_rx) = mpsc:: channel ( EVENT_BUFFER_SIZE ) ;
25
+
26
+ Self {
27
+ handle,
28
+ event_tx,
29
+ event_rx : Some ( event_rx) ,
30
+ }
46
31
}
47
32
}
48
33
49
- #[ pyclass]
50
- pub struct ActorHandle {
51
- pub handle : actor_core_rs:: handle:: ActorHandle ,
52
- pub data : Arc < InnerActorData > ,
53
- }
54
-
55
34
#[ pymethods]
56
35
impl ActorHandle {
57
36
#[ new]
58
- pub fn new ( ) -> PyResult < Self > {
37
+ pub fn py_new ( ) -> PyResult < Self > {
59
38
Err ( py_runtime_err ! (
60
39
"Actor handle cannot be instantiated directly" ,
61
40
) )
@@ -106,17 +85,27 @@ impl ActorHandle {
106
85
event_name : & str
107
86
) -> PyResult < Bound < ' a , PyAny > > {
108
87
let event_name = event_name. to_string ( ) ;
109
- let data = self . data . clone ( ) ;
110
88
let handle = self . handle . clone ( ) ;
89
+ let tx = self . event_tx . clone ( ) ;
111
90
112
91
pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
113
92
handle. on_event ( & event_name. clone ( ) , move |args| {
114
93
let event_name = event_name. clone ( ) ;
115
94
let args = args. clone ( ) ;
116
- let data = data . clone ( ) ;
95
+ let tx = tx . clone ( ) ;
117
96
118
97
tokio:: spawn ( async move {
119
- data. on_event ( event_name, & args) . await ;
98
+ let event = ActorEvent {
99
+ name : event_name,
100
+ args : args. clone ( ) ,
101
+ } ;
102
+ // Send this upstream(?)
103
+ tx. send ( event) . await . map_err ( |e| {
104
+ py_runtime_err ! (
105
+ "Failed to send via inner tx: {}" ,
106
+ e
107
+ )
108
+ } ) . ok ( ) ;
120
109
} ) ;
121
110
} ) . await ;
122
111
@@ -126,17 +115,16 @@ impl ActorHandle {
126
115
127
116
#[ pyo3( signature=( count, timeout=None ) ) ]
128
117
pub fn receive < ' a > (
129
- & self ,
118
+ & mut self ,
130
119
py : Python < ' a > ,
131
120
count : u32 ,
132
121
timeout : Option < f64 >
133
122
) -> PyResult < Bound < ' a , PyAny > > {
134
- let ( tx, mut rx) = mpsc:: channel ( count as usize ) ;
123
+ let mut rx = self . event_rx . take ( ) . ok_or_else ( || {
124
+ py_runtime_err ! ( "Two .receive() calls cannot co-exist" )
125
+ } ) ?;
135
126
136
- let data = self . data . clone ( ) ;
137
127
pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
138
- data. event_tx . lock ( ) . await . replace ( tx) ;
139
-
140
128
let result: Vec < ActorEvent > = {
141
129
let mut events: Vec < ActorEvent > = Vec :: new ( ) ;
142
130
0 commit comments