Skip to content

Commit a8b4974

Browse files
Added #actor macro to register the actor (#50)
* updated generated api clients * clippy * updated api to get list of registerd apis * wip ui * added how_to.md * added register actor macro
1 parent 39e86a2 commit a8b4974

File tree

31 files changed

+1206
-1100
lines changed

31 files changed

+1206
-1100
lines changed

Cargo.lock

Lines changed: 14 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

actor/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ tracing = {version = "0.1.41", features = ["std", "attributes"] }
2020
tracing-subscriber = "0.3.19"
2121
tracing-shared = "0.1.5"
2222
criterion = "0.6.0"
23+
serde_json = "1.0.143"
24+
inventory = "0.3.21"
25+
reactor-macros = { path = "../macros" }
2326

2427
[dev-dependencies]
2528
lazy_static = "1.5.0"

actor/src/lib.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
// #![feature(log_syntax)]
22

3-
use std::{borrow::Cow, marker::PhantomData};
3+
use std::{borrow::Cow, collections::HashMap, marker::PhantomData};
44

55
use bincode::{Decode, Encode};
66
use err::ActorError;
77
use futures::future::join_all;
8+
pub use inventory as __inventory;
89
use reactor_channel::{ReactorChannelTx, reactor_channel};
910
use recv::rx;
1011
use send::tx;
@@ -24,6 +25,35 @@ mod send;
2425

2526
pub use node_comm::{Connection, ControlInst, ControlReq, NodeComm};
2627
pub use reactor_channel::{HasPriority, MAX_PRIO};
28+
pub use reactor_macros::actor;
29+
30+
pub type ActorSpawnCB = fn(RuntimeCtx, HashMap<String, serde_json::Value>);
31+
32+
pub struct ExportedFn {
33+
pub name: &'static str,
34+
pub func: ActorSpawnCB,
35+
}
36+
37+
__inventory::collect!(ExportedFn);
38+
39+
#[macro_export]
40+
macro_rules! register_actor {
41+
($fn_name:path) => {
42+
$crate::__inventory::submit! {
43+
$crate::ExportedFn {
44+
name: stringify!($fn_name),
45+
func: $fn_name,
46+
}
47+
}
48+
};
49+
}
50+
51+
#[unsafe(no_mangle)]
52+
fn get_registered() -> Vec<String> {
53+
inventory::iter::<ExportedFn>()
54+
.map(|x| x.name.to_string())
55+
.collect()
56+
}
2757

2858
static CHANNEL_SIZE: usize = 1 << 20;
2959
/// Messages that can flow between the actors.

examples/how_to.md

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# How to use Reactor.
2+
3+
4+
## Initial Setup
5+
6+
1. Start new Rust project:- `cargo init --lib my_proj`
7+
2. Modify `Cargo.toml` to contain
8+
```toml
9+
[lib]
10+
crate-type = ["cdylib", "rlib"]
11+
```
12+
3. Add reactor as the dependency.
13+
`cargo add --git https://github.com/satyamjay-iitd/reactor/ reactor-actor`
14+
4. Define some actors.
15+
5. Compile the library:-
16+
```bash
17+
cargo build
18+
# You should now have a `target/debug/libmy_proj.so`, and ready to run a job.
19+
```
20+
21+
22+
## Running a job
23+
24+
1. Install node controller.
25+
`cargo install --git https://github.com/satyamjay-iitd/reactor/ reactor_nctrl`
26+
2. Install job controller.
27+
`cargo install --git https://github.com/satyamjay-iitd/reactor/ reactor_jctrl`
28+
3. Start the node controller
29+
```bash
30+
# Note that the directory must contain the *.so file.
31+
reactor_nctrl --port 3000 ./target/debug`
32+
```
33+
4. Define job in a toml file.
34+
```toml
35+
[[ops]]
36+
name = "pingpong"
37+
lib_name = "ping_pong_actor"
38+
39+
[[nodes]]
40+
name = "node1"
41+
hostname = "0.0.0.0"
42+
port = 3000
43+
44+
[placement]
45+
[[placement.pingpong]]
46+
nodename = "node1"
47+
actor_name = "pinger"
48+
other = "ponger"
49+
50+
[[placement.pingpong]]
51+
nodename = "node1"
52+
actor_name = "ponger"
53+
other = "pinger"
54+
```
55+
5. Run the job.
56+
```bash
57+
reactor_jctrl ./job.toml
58+
```

examples/ping_pong/src/lib.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ pub use reactor_actor::setup_shared_logger_ref;
33
use bincode::{Decode, Encode};
44

55
use reactor_actor::codec::BincodeCodec;
6-
use reactor_actor::{BehaviourBuilder, RouteTo, RuntimeCtx};
6+
use reactor_actor::{BehaviourBuilder, RouteTo, RuntimeCtx, actor};
77
use reactor_macros::{DefaultPrio, Msg as DeriveMsg};
88
use std::collections::HashMap;
99
use std::time::Duration;
@@ -77,27 +77,25 @@ impl Sender {
7777
// ACTORS
7878
// //////////////////////////////////////////////////////////////////////////////
7979

80-
pub async fn actor(ctx: RuntimeCtx, other_addr: String) {
81-
BehaviourBuilder::new(Processor {}, BincodeCodec::default())
82-
.send(Sender::new(other_addr))
83-
.generator_if(ctx.addr == "pinger", || vec![PingPongMsg::Ping].into_iter())
84-
.build()
85-
.run(ctx)
86-
.await
87-
.unwrap();
88-
}
89-
9080
lazy_static::lazy_static! {
9181
static ref RUNTIME: tokio::runtime::Runtime = tokio::runtime::Runtime::new().unwrap();
9282
}
9383

94-
#[unsafe(no_mangle)]
84+
#[actor]
9585
pub fn pingpong(ctx: RuntimeCtx, mut payload: HashMap<String, serde_json::Value>) {
96-
let other: String = payload
86+
let other_addr: String = payload
9787
.remove("other")
9888
.unwrap()
9989
.as_str()
10090
.unwrap()
10191
.to_string();
102-
RUNTIME.spawn(actor(ctx, other));
92+
RUNTIME.spawn(async move {
93+
BehaviourBuilder::new(Processor {}, BincodeCodec::default())
94+
.send(Sender::new(other_addr))
95+
.generator_if(ctx.addr == "pinger", || vec![PingPongMsg::Ping].into_iter())
96+
.build()
97+
.run(ctx)
98+
.await
99+
.unwrap();
100+
});
103101
}

examples/read_write_server/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use writer::WriterIn;
1111
use crate::reader::{ReadAck, ReadOut, reader as reader_behaviour};
1212
use crate::server::server as server_behaviour;
1313
use crate::writer::{WriteAck, WriteOut, writer as writer_behaviour};
14-
use reactor_actor::RuntimeCtx;
14+
use reactor_actor::{RuntimeCtx, actor};
1515
use reactor_macros::msg_converter;
1616
use std::collections::HashMap;
1717
// //////////////////////////////////////////////////////////////////////////////
@@ -34,12 +34,12 @@ lazy_static::lazy_static! {
3434
static ref RUNTIME: tokio::runtime::Runtime = tokio::runtime::Runtime::new().unwrap();
3535
}
3636

37-
#[unsafe(no_mangle)]
37+
#[actor]
3838
fn server(ctx: RuntimeCtx, _payload: HashMap<String, serde_json::Value>) {
3939
RUNTIME.spawn(server_behaviour(ctx, server_decoder));
4040
}
4141

42-
#[unsafe(no_mangle)]
42+
#[actor]
4343
fn writer(ctx: RuntimeCtx, mut payload: HashMap<String, serde_json::Value>) {
4444
let server_addr = payload
4545
.remove("server_addr")
@@ -50,7 +50,7 @@ fn writer(ctx: RuntimeCtx, mut payload: HashMap<String, serde_json::Value>) {
5050
RUNTIME.spawn(writer_behaviour(ctx, server_addr, writer_decoder));
5151
}
5252

53-
#[unsafe(no_mangle)]
53+
#[actor]
5454
fn reader(ctx: RuntimeCtx, mut payload: HashMap<String, serde_json::Value>) {
5555
let server_addr = payload
5656
.remove("server_addr")

macros/src/lib.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,27 @@
11
use proc_macro::{self, TokenStream};
22
use proc_macro2::TokenStream as TokenStream2;
33
use quote::quote;
4+
use syn::ItemFn;
45
use syn::{
56
DeriveInput, Ident, Result, Token, Type, bracketed,
67
parse::{Parse, ParseStream},
78
parse_macro_input,
89
punctuated::Punctuated,
910
};
1011

12+
#[proc_macro_attribute]
13+
pub fn actor(_attr: TokenStream, item: TokenStream) -> TokenStream {
14+
let input = parse_macro_input!(item as ItemFn);
15+
let name = &input.sig.ident;
16+
let fn_tokens = quote! {
17+
#[unsafe(no_mangle)]
18+
#input
19+
20+
reactor_actor::register_actor!(#name);
21+
};
22+
fn_tokens.into()
23+
}
24+
1125
#[proc_macro_derive(DefaultPrio)]
1226
pub fn auto_default_priority(input: TokenStream) -> TokenStream {
1327
let input = parse_macro_input!(input as DeriveInput);

node/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use rpc::webserver;
2929
mod op_lib_manager;
3030

3131
pub type NodeAddr = &'static str;
32-
pub type ActorSpawnCB = fn(RuntimeCtx, HashMap<String, serde_json::Value>);
32+
// pub type ActorSpawnCB = fn(RuntimeCtx, HashMap<String, serde_json::Value>);
3333

3434
pub type SetupSharedLogger = fn(SharedLogger);
3535

@@ -48,7 +48,7 @@ pub(crate) struct RegisterResult {}
4848
#[derive(Debug)]
4949
pub(crate) struct NodeStatus {
5050
actors: Vec<String>,
51-
loaded_libs: Vec<String>,
51+
loaded_libs: HashMap<String, Vec<String>>,
5252
}
5353

5454
/// Global Controller
@@ -201,6 +201,8 @@ async fn handle_job_req(
201201

202202
let lib = op_lib.get_lib(&lib_name);
203203
unsafe {
204+
use reactor_actor::ActorSpawnCB;
205+
204206
let shared_logger: libloading::Symbol<SetupSharedLogger> =
205207
lib.get(b"setup_shared_logger_ref").unwrap();
206208
let logger = SharedLogger::new();

node/src/op_lib_manager.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,35 @@ use crate::LibName;
66

77
#[derive(Default, Debug)]
88
pub(crate) struct OpLibrary {
9-
container: HashMap<LibName, Library>,
9+
container: HashMap<LibName, (Library, Vec<String>)>,
1010
}
1111

1212
impl OpLibrary {
1313
pub(crate) fn add_lib(&mut self, name: LibName, library: Library) {
14-
self.container.insert(name, library);
14+
let registered = unsafe {
15+
if let Ok(get_registered) =
16+
library.get::<libloading::Symbol<fn() -> Vec<String>>>(b"get_registered")
17+
{
18+
get_registered()
19+
} else {
20+
return;
21+
}
22+
};
23+
self.container.insert(name, (library, registered));
1524
}
1625

1726
pub(crate) fn get_lib(&self, lib_name: &str) -> &Library {
18-
self.container.get(lib_name).unwrap()
27+
&self.container.get(lib_name).unwrap().0
1928
}
2029

2130
pub(crate) fn num_libs(&self) -> usize {
2231
self.container.len()
2332
}
2433

25-
pub(crate) fn lib_names(&self) -> Vec<LibName> {
26-
self.container.keys().cloned().collect()
34+
pub(crate) fn lib_names(&self) -> HashMap<LibName, Vec<String>> {
35+
self.container
36+
.iter()
37+
.map(|(name, (_, ops))| (name.clone(), ops.clone()))
38+
.collect()
2739
}
2840
}

node/src/rpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ async fn stop_all_actors(State(state): State<Arc<AppState>>) -> impl IntoRespons
215215
#[derive(Serialize, ToSchema)]
216216
struct StatusResponse {
217217
actors: Vec<String>,
218-
loaded_libs: Vec<String>,
218+
loaded_libs: HashMap<String, Vec<String>>,
219219
}
220220
#[cfg_attr(feature="swagger", utoipa::path(
221221
get,

0 commit comments

Comments
 (0)