Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement SDK resiliency #167

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
4 changes: 3 additions & 1 deletion .github/workflows/validate-examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ jobs:
fail-fast: false
matrix:
examples:
[ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ]
[ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "resiliency/instance", "resiliency/simple", "secrets-bulk" ]
steps:
- name: Check out code
uses: actions/checkout@v4
Expand Down Expand Up @@ -210,3 +210,5 @@ jobs:
run: |
cd examples
./validate.sh ${{ matrix.examples }}


9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ axum = "0.7.4"
tokio = { version = "1.29", features = ["sync"] }
tokio-util = { version = "0.7.10", features = ["io"] }
chrono = "0.4.24"
backon = "0.4.4"

[build-dependencies]
tonic-build = "0.11.0"
Expand Down Expand Up @@ -98,6 +99,14 @@ path = "examples/query_state/query1.rs"
name = "query_state_q2"
path = "examples/query_state/query2.rs"

[[example]]
name = "resiliency-instance"
path = "examples/resiliency/instance/main.rs"

[[example]]
name = "resiliency-simple"
path = "examples/resiliency/simple/main.rs"

[[example]]
name = "secrets-bulk"
path = "examples/secrets-bulk/app.rs"
4 changes: 0 additions & 4 deletions examples/actors/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ pub struct MyRequest {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));

// Define the Dapr address
let addr = "https://127.0.0.1".to_string();

Expand Down
4 changes: 0 additions & 4 deletions examples/client/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));

// Set the Dapr address
let addr = "https://127.0.0.1".to_string();

Expand Down
4 changes: 0 additions & 4 deletions examples/configuration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ type DaprClient = dapr::Client<dapr::client::TonicClient>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(2, 0));

// Set the Dapr address
let addr = "https://127.0.0.1".to_string();

Expand Down
2 changes: 0 additions & 2 deletions examples/crypto/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::fs;

use tokio::fs::File;
use tokio::time::sleep;

use dapr::client::ReaderStream;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
sleep(std::time::Duration::new(2, 0)).await;
let addr = "https://127.0.0.1".to_string();

let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;
Expand Down
5 changes: 0 additions & 5 deletions examples/invoke/grpc-proxying/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::{thread, time::Duration};

use hello_world::{greeter_client::GreeterClient, HelloRequest};

use tonic::metadata::MetadataValue;
Expand All @@ -10,9 +8,6 @@ pub mod hello_world {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Sleep to allow for the server to become available
thread::sleep(Duration::from_secs(5));

// Get the Dapr port and create a connection
let port: u16 = std::env::var("DAPR_GRPC_PORT").unwrap().parse().unwrap();
let address = format!("https://127.0.0.1:{}", port);
Expand Down
5 changes: 0 additions & 5 deletions examples/invoke/grpc/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::{thread, time::Duration};

use hello_world::{HelloReply, HelloRequest};
use prost::Message;

Expand All @@ -11,9 +9,6 @@ type DaprClient = dapr::Client<dapr::client::TonicClient>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Sleep to allow for the server to become available
thread::sleep(Duration::from_secs(5));

// Set the Dapr address
let address = "https://127.0.0.1".to_string();

Expand Down
6 changes: 1 addition & 5 deletions examples/pubsub/publisher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, thread, time::Duration};
use std::{collections::HashMap, time::Duration};

use dapr::serde::{Deserialize, Serialize};
use dapr::serde_json;
Expand All @@ -17,10 +17,6 @@ struct Refund {

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
thread::sleep(Duration::from_secs(2));

// Set address for Dapr connection
let addr = "https://127.0.0.1".to_string();

Expand Down
3 changes: 0 additions & 3 deletions examples/query_state/query1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(5, 0));

// Set the Dapr address and create a connection
let addr = "https://127.0.0.1".to_string();

Expand Down
3 changes: 0 additions & 3 deletions examples/query_state/query2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ use serde_json::json;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Introduce delay so that dapr grpc port is assigned before app tries to connect
std::thread::sleep(std::time::Duration::new(5, 0));

// Set the Dapr address and create a connection
let addr = "https://127.0.0.1".to_string();

Expand Down
107 changes: 107 additions & 0 deletions examples/resiliency/instance/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
This example validates the resiliency of the instantiated client and does not
demonstrate any extra functionality. It is based off the configuration example
to connect to the sidecar and make a call for a configuration item stored in
redis.

1. Insert a key with the value `hello` to redis using the following command:


<!-- STEP
name: Insert test configuration item
output_match_mode: substring
expected_stdout_lines:
- 'OK'
background: false
sleep: 5
timeout_seconds: 5
-->

```bash
docker exec dapr_redis redis-cli MSET hello "world"
```

<!-- END_STEP -->

2. Run the example without the sidecar

<!-- STEP
name: Run configuration app
env:
DAPR_GRPC_PORT: "3500"
DAPR_API_MAX_RETRIES: "10"
DAPR_API_TIMEOUT_MILLISECONDS: "10000"
output_match_mode: substring
expected_stdout_lines:
- 'Configuration value: ConfigurationItem { value: "world"'
- 'Configuration value: ConfigurationItem { value: "world2"'
background: true
sleep: 30
timeout_seconds: 30
-->

```bash
cargo run --example resiliency-instance
```

<!-- END_STEP -->

3. Run the Dapr sidecar

<!-- STEP
name: Run Dapr sidecar
output_match_mode: substring
expected_stdout_lines:
- ''
background: false
sleep: 5
timeout_seconds: 10
-->

```bash
dapr run --app-id=rustapp --resources-path ../../components --dapr-grpc-port 3500
```

<!-- END_STEP -->

4. Update the hello key with the value `world2` to redis using the following command:


<!-- STEP
name: Update test configuration item
output_match_mode: substring
expected_stdout_lines:
- 'OK'
background: false
sleep: 1
timeout_seconds: 1
-->

```bash
docker exec dapr_redis redis-cli MSET hello "world2"
```

<!-- END_STEP -->

5. Run the Dapr sidecar (for the second time)

<!-- STEP
name: Run Dapr sidecar
output_match_mode: substring
expected_stdout_lines:
- ''
background: true
sleep: 30
timeout_seconds: 30
-->

```bash
dapr run --app-id=rustapp --resources-path ../../components --dapr-grpc-port 3500
```

<!-- END_STEP -->
The example app should make contact with the Dapr sidecar and the result should
be returned from the configuration request successfully.

```
Configuration value: ConfigurationItem { value: "world", version: "", metadata: {} }
```
47 changes: 47 additions & 0 deletions examples/resiliency/instance/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::{
thread,
time::{Duration, Instant},
};

const CONFIGSTORE_NAME: &str = "configstore";
type DaprClient = dapr::Client<dapr::client::TonicClient>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set the Dapr address
let addr = "https://127.0.0.1".to_string();

// Create the client
let start_time = Instant::now();
let mut client = match DaprClient::connect(addr).await {
Ok(client) => {
println!("connected to dapr sidecar");
client
}
Err(error) => {
panic!("failed to connect to dapr sidecar: {:?}", error)
}
};
let client_start_duration = start_time.elapsed();
println!("Client connection took: {:?}", client_start_duration);

let key = String::from("hello");

// get key-value pair in the state store
let response = client
.get_configuration(CONFIGSTORE_NAME, vec![(&key)], None)
.await?;
let val = response.items.get("hello").unwrap();
println!("Configuration value: {val:?}");

thread::sleep(Duration::from_secs(10));
println!("app slept for 15 seconds");

let response = client
.get_configuration(CONFIGSTORE_NAME, vec![(&key)], None)
.await?;
let val = response.items.get("hello").unwrap();
println!("Configuration value: {val:?}");

Ok(())
}
Loading
Loading