Skip to content

Commit a1843fd

Browse files
committed
Added distributed lock
Signed-off-by: Zachary Edgell <[email protected]>
1 parent ac71bb7 commit a1843fd

File tree

8 files changed

+167
-1
lines changed

8 files changed

+167
-1
lines changed

.github/workflows/validate-examples.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ jobs:
144144
fail-fast: false
145145
matrix:
146146
examples:
147-
[ "actors", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "secrets-bulk" ]
147+
[ "actors", "client", "configuration", "crypto", "distributed-lock" "invoke/grpc", "invoke/grpc-proxying", "pubsub", "secrets-bulk" ]
148148
steps:
149149
- name: Check out code
150150
uses: actions/checkout@v4

Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ path = "examples/configuration/main.rs"
5858
name = "crypto"
5959
path = "examples/crypto/main.rs"
6060

61+
[[example]]
62+
name = "distributed-lock"
63+
path = "examples/distributed-lock/main.rs"
64+
6165
[[example]]
6266
name = "invoke-grpc-client"
6367
path = "examples/invoke/grpc/client.rs"

examples/distributed-lock/README.md

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Distributed Lock
2+
3+
This is a simple example that demonstrates Dapr's Distributed Lock capabilities.
4+
5+
> **Note:** Make sure to use latest version of proto bindings.
6+
7+
## Running
8+
9+
To run this example:
10+
11+
1. Run the multi-app run template:
12+
13+
<!-- STEP
14+
name: Run multi-app
15+
output_match_mode: substring
16+
match_order: none
17+
expected_stdout_lines:
18+
- '== APP - distributed-lock-example == Successfully locked my-data'
19+
- '== APP - distributed-lock-example == Successfully unlocked my-data'
20+
background: true
21+
sleep: 30
22+
timeout_seconds: 90
23+
-->
24+
25+
```bash
26+
dapr run -f .
27+
```
28+
29+
<!-- END_STEP -->
30+
31+
2. Stop with `ctrl + c`
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: lockstore
5+
spec:
6+
type: lock.redis
7+
version: v1
8+
metadata:
9+
- name: redisHost
10+
value: localhost:6379
11+
- name: redisPassword
12+
value: ""
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: statestore
5+
spec:
6+
type: state.redis
7+
version: v1
8+
metadata:
9+
- name: redisHost
10+
value: localhost:6379
11+
- name: redisPassword
12+
value: ""
13+
- name: actorStateStore
14+
value: "true"

examples/distributed-lock/dapr.yaml

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
version: 1
2+
common:
3+
daprdLogDestination: console
4+
apps:
5+
- appID: distributed-lock-example
6+
appDirPath: ./
7+
daprGRPCPort: 35002
8+
logLevel: debug
9+
command: [ "cargo", "run", "--example", "distributed-lock" ]
10+
resourcesPath: ./components

examples/distributed-lock/main.rs

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use tokio::time::sleep;
2+
3+
#[tokio::main]
4+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
5+
sleep(std::time::Duration::new(2, 0)).await;
6+
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
7+
let addr = format!("https://127.0.0.1:{}", port);
8+
9+
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;
10+
11+
let files = vec![("my-data", b"some-data".to_vec())];
12+
13+
client.save_state("statestore", files).await.unwrap();
14+
15+
let result = client
16+
.lock(dapr::client::TryLockRequest {
17+
store_name: "lockstore".to_string(),
18+
resource_id: "my-data".to_string(),
19+
lock_owner: "some-random-id".to_string(),
20+
expiry_in_seconds: 60,
21+
})
22+
.await
23+
.unwrap();
24+
25+
assert!(result.success);
26+
27+
println!("Successfully locked my-data");
28+
29+
let result = client
30+
.unlock(dapr::client::UnlockRequest {
31+
store_name: "lockstore".to_string(),
32+
resource_id: "my-data".to_string(),
33+
lock_owner: "some-random-id".to_string(),
34+
})
35+
.await
36+
.unwrap();
37+
38+
assert_eq!(0, result.status);
39+
40+
println!("Successfully unlocked my-data");
41+
42+
Ok(())
43+
}

src/client.rs

+52
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,24 @@ impl<T: DaprInterface> Client<T> {
455455
.collect();
456456
self.0.decrypt(requested_items).await
457457
}
458+
459+
/// Distributed lock request call
460+
///
461+
/// # Arguments
462+
///
463+
/// * `request` - Request to be made, TryLockRequest
464+
pub async fn lock(&mut self, request: TryLockRequest) -> Result<TryLockResponse, Error> {
465+
self.0.lock(request).await
466+
}
467+
468+
/// Distributed lock request call
469+
///
470+
/// # Arguments
471+
///
472+
/// * `request` - Request to be made, TryLockRequest
473+
pub async fn unlock(&mut self, request: UnlockRequest) -> Result<UnlockResponse, Error> {
474+
self.0.unlock(request).await
475+
}
458476
}
459477

460478
#[async_trait]
@@ -501,6 +519,10 @@ pub trait DaprInterface: Sized {
501519
-> Result<Vec<StreamPayload>, Status>;
502520

503521
async fn decrypt(&mut self, payload: Vec<DecryptRequest>) -> Result<Vec<u8>, Status>;
522+
523+
async fn lock(&mut self, request: TryLockRequest) -> Result<TryLockResponse, Error>;
524+
525+
async fn unlock(&mut self, request: UnlockRequest) -> Result<UnlockResponse, Error>;
504526
}
505527

506528
#[async_trait]
@@ -661,6 +683,24 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
661683
}
662684
Ok(data)
663685
}
686+
687+
/// Distributed lock request call
688+
///
689+
/// # Arguments
690+
///
691+
/// * `request` - Request to be made, TryLockRequest
692+
async fn lock(&mut self, request: TryLockRequest) -> Result<TryLockResponse, Error> {
693+
Ok(self.try_lock_alpha1(request).await?.into_inner())
694+
}
695+
696+
/// Distributed unlock request call
697+
///
698+
/// # Arguments
699+
///
700+
/// * `request` - Request to be made, UnlockRequest
701+
async fn unlock(&mut self, request: UnlockRequest) -> Result<UnlockResponse, Error> {
702+
Ok(self.unlock_alpha1(request).await?.into_inner())
703+
}
664704
}
665705

666706
/// A request from invoking a service
@@ -752,6 +792,18 @@ pub type EncryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::EncryptR
752792
/// Decryption request options
753793
pub type DecryptRequestOptions = crate::dapr::dapr::proto::runtime::v1::DecryptRequestOptions;
754794

795+
/// Lock response
796+
pub type TryLockResponse = crate::dapr::dapr::proto::runtime::v1::TryLockResponse;
797+
798+
/// Lock request
799+
pub type TryLockRequest = crate::dapr::dapr::proto::runtime::v1::TryLockRequest;
800+
801+
/// Unlock request
802+
pub type UnlockRequest = crate::dapr::dapr::proto::runtime::v1::UnlockRequest;
803+
804+
/// Unlock response
805+
pub type UnlockResponse = crate::dapr::dapr::proto::runtime::v1::UnlockResponse;
806+
755807
type StreamPayload = crate::dapr::dapr::proto::common::v1::StreamPayload;
756808
impl<K> From<(K, Vec<u8>)> for common_v1::StateItem
757809
where

0 commit comments

Comments
 (0)