Skip to content

Commit f642bd8

Browse files
authored
Merge pull request dapr#87 from zedgell/feature/pubsub-macro
Created Procedural Macros For Pub Sub
2 parents 90f07be + d21290e commit f642bd8

File tree

7 files changed

+358
-73
lines changed

7 files changed

+358
-73
lines changed

examples/pubsub/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
This is a simple example that demonstrates Dapr's pub/sub capabilities. To implement pub/sub in your rust application, you need to implement `AppCallback` server for subscribing to events. Specifically, the following two methods need to be implemented for pub/sub to work:
44

55
1. `list_topic_subscriptions` - Dapr runtime calls this method to get list of topics the application is subscribed to.
6-
2. `on_topic_event` - Defines how the application handles the topic event.
6+
2. `on_topic_event` - Defines how the application handles the topic event.
77

88
> **Note:** Make sure to use latest version of proto bindings.
99
@@ -54,4 +54,4 @@ dapr run --app-id rust-subscriber --app-protocol grpc --app-port 50051 cargo run
5454
2. Run the publisher with dapr
5555
```bash
5656
dapr run --app-id rust-publisher --app-protocol grpc cargo run -- --example publisher
57-
```
57+
```

examples/pubsub/publisher.rs

+46-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
use std::{collections::HashMap, thread, time::Duration};
22

3+
use dapr::serde::{Deserialize, Serialize};
4+
use dapr::serde_json;
5+
6+
#[derive(Serialize, Deserialize, Debug)]
7+
struct Order {
8+
pub order_number: i32,
9+
pub order_details: String,
10+
}
11+
12+
#[derive(Serialize, Deserialize, Debug)]
13+
struct Refund {
14+
pub order_number: i32,
15+
pub refund_amount: i32,
16+
}
17+
318
#[tokio::main]
419
async fn main() -> Result<(), Box<dyn std::error::Error>> {
520
// TODO: Handle this issue in the sdk
@@ -21,14 +36,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2136

2237
// topic to publish message to
2338
let topic = "A".to_string();
39+
let topic_b = "B".to_string();
2440

25-
for count in 0..3 {
41+
for count in 0..10 {
42+
let order = Order {
43+
order_number: count,
44+
order_details: format!("Count is {}", count),
45+
};
2646
// message metadata
2747
let mut metadata = HashMap::<String, String>::new();
2848
metadata.insert("count".to_string(), count.to_string());
2949

3050
// message
31-
let message = format!("{} => hello from rust!", &count).into_bytes();
51+
let message = serde_json::to_string(&order).unwrap().into_bytes();
3252

3353
client
3454
.publish_event(
@@ -43,7 +63,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4363
// sleep for 1 second to simulate delay between each event
4464
tokio::time::sleep(Duration::from_secs(1)).await;
4565
}
66+
for count in 0..10 {
67+
let refund = Refund {
68+
order_number: count,
69+
refund_amount: 1200,
70+
};
71+
// message metadata
72+
let mut metadata = HashMap::<String, String>::new();
73+
metadata.insert("count".to_string(), count.to_string());
74+
75+
// message
76+
let message = serde_json::to_string(&refund).unwrap().into_bytes();
4677

78+
client
79+
.publish_event(
80+
&pubsub_name,
81+
&topic_b,
82+
&data_content_type,
83+
message,
84+
Some(metadata),
85+
)
86+
.await?;
87+
88+
// sleep for 2 seconds to simulate delay between two events
89+
tokio::time::sleep(Duration::from_secs(2)).await;
90+
}
4791
println!("messages published");
4892

4993
Ok(())

examples/pubsub/subscriber.rs

+28-63
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,43 @@
1-
use tonic::{transport::Server, Request, Response, Status};
1+
use dapr_macros::topic;
2+
use tonic::transport::Server;
23

4+
use dapr::appcallback::AppCallbackService;
5+
use dapr::serde::{Deserialize, Serialize};
36
use dapr::{
4-
appcallback::*,
5-
dapr::dapr::proto::runtime::v1::app_callback_server::{AppCallback, AppCallbackServer},
7+
appcallback::*, dapr::dapr::proto::runtime::v1::app_callback_server::AppCallbackServer,
68
};
79

8-
#[derive(Default)]
9-
pub struct AppCallbackService {}
10-
11-
#[tonic::async_trait]
12-
impl AppCallback for AppCallbackService {
13-
/// Invokes service method with InvokeRequest.
14-
async fn on_invoke(
15-
&self,
16-
_request: Request<InvokeRequest>,
17-
) -> Result<Response<InvokeResponse>, Status> {
18-
Ok(Response::new(InvokeResponse::default()))
19-
}
20-
21-
/// Lists all topics subscribed by this app.
22-
///
23-
/// NOTE: Dapr runtime will call this method to get
24-
/// the list of topics the app wants to subscribe to.
25-
/// In this example, the app is subscribing to topic `A`.
26-
async fn list_topic_subscriptions(
27-
&self,
28-
_request: Request<()>,
29-
) -> Result<Response<ListTopicSubscriptionsResponse>, Status> {
30-
let topic = "A".to_string();
31-
let pubsub_name = "pubsub".to_string();
32-
33-
let list_subscriptions = ListTopicSubscriptionsResponse::topic(pubsub_name, topic);
34-
35-
Ok(Response::new(list_subscriptions))
36-
}
37-
38-
/// Subscribes events from Pubsub.
39-
async fn on_topic_event(
40-
&self,
41-
request: Request<TopicEventRequest>,
42-
) -> Result<Response<TopicEventResponse>, Status> {
43-
let r = request.into_inner();
44-
let data = &r.data;
45-
let data_content_type = &r.data_content_type;
46-
47-
let message = String::from_utf8_lossy(data);
48-
println!("Message: {}", &message);
49-
println!("Content-Type: {}", &data_content_type);
10+
#[derive(Serialize, Deserialize, Debug)]
11+
struct Order {
12+
pub order_number: i32,
13+
pub order_details: String,
14+
}
5015

51-
Ok(Response::new(TopicEventResponse::default()))
52-
}
16+
#[derive(Serialize, Deserialize, Debug)]
17+
struct Refund {
18+
pub order_number: i32,
19+
pub refund_amount: i32,
20+
}
5321

54-
/// Lists all input bindings subscribed by this app.
55-
async fn list_input_bindings(
56-
&self,
57-
_request: Request<()>,
58-
) -> Result<Response<ListInputBindingsResponse>, Status> {
59-
Ok(Response::new(ListInputBindingsResponse::default()))
60-
}
22+
#[topic(pub_sub_name = "pubsub", topic = "A")]
23+
async fn handle_a_event(order: Order) {
24+
println!("Topic A - {:#?}", order)
25+
}
6126

62-
/// Listens events from the input bindings.
63-
async fn on_binding_event(
64-
&self,
65-
_request: Request<BindingEventRequest>,
66-
) -> Result<Response<BindingEventResponse>, Status> {
67-
Ok(Response::new(BindingEventResponse::default()))
68-
}
27+
#[topic(pub_sub_name = "pubsub", topic = "B")]
28+
async fn handle_b_event(refund: Refund) {
29+
println!("Topic B - {:#?}", refund)
6930
}
7031

7132
#[tokio::main]
7233
async fn main() -> Result<(), Box<dyn std::error::Error>> {
73-
let addr = "[::]:50051".parse().unwrap();
34+
let addr = "127.0.0.1:50051".parse().unwrap();
35+
36+
let mut callback_service = AppCallbackService::new();
37+
38+
callback_service.add_handler(HandleAEvent::default().get_handler());
7439

75-
let callback_service = AppCallbackService::default();
40+
callback_service.add_handler(HandleBEvent::default().get_handler());
7641

7742
println!("AppCallback server listening on: {}", addr);
7843

macros/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@ proc-macro = true
1212
async-trait = "0.1"
1313
log = "0.4"
1414
axum = "0.7.4"
15-
syn = {version="2.0.29",features=["full"]}
15+
syn = { version = "2.0.29", features = ["full"] }
1616
quote = "1.0.8"
17+
proc-macro2 = "1.0"

0 commit comments

Comments
 (0)