Skip to content

Commit 8da124d

Browse files
committed
Add grpc liveness client
1 parent 46d9bac commit 8da124d

3 files changed

Lines changed: 213 additions & 1 deletion

File tree

components/spider-execution-manager/src/client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ pub mod liveness;
1111
pub mod scheduler;
1212
pub mod storage;
1313

14-
pub use grpc::GrpcStorageClient;
14+
pub use grpc::{GrpcLivenessClient, GrpcStorageClient};
1515
pub use liveness::{LivenessClient, LivenessResponseError, RegistrationResponse};
1616
pub use scheduler::{SchedulerClient, SchedulerError, SchedulerResponse};
1717
pub use storage::{StorageClient, StorageResponseError};
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
//! gRPC-backed [`LivenessClient`] implementation.
2+
3+
use std::net::IpAddr;
4+
5+
use async_trait::async_trait;
6+
use spider_core::types::id::{ExecutionManagerId, SessionId};
7+
use spider_proto_rust::storage::{
8+
self,
9+
execution_manager_liveness_service_client::ExecutionManagerLivenessServiceClient,
10+
register_execution_manager_response,
11+
storage_error,
12+
update_execution_manager_heartbeat_response,
13+
};
14+
use tonic::transport::{Channel, Endpoint};
15+
16+
use crate::client::liveness::{LivenessClient, LivenessResponseError, RegistrationResponse};
17+
18+
/// gRPC-backed [`LivenessClient`] implementation.
19+
#[derive(Debug, Clone)]
20+
pub struct GrpcLivenessClient {
21+
client: ExecutionManagerLivenessServiceClient<Channel>,
22+
}
23+
24+
impl GrpcLivenessClient {
25+
/// Connects to the storage gRPC endpoint.
26+
///
27+
/// # Returns
28+
///
29+
/// A new [`GrpcLivenessClient`] connected to `endpoint` on success.
30+
///
31+
/// # Errors
32+
///
33+
/// Returns an error if:
34+
///
35+
/// * [`LivenessResponseError::Transport`] if tonic cannot create or connect to the endpoint.
36+
pub async fn connect(endpoint: Endpoint) -> Result<Self, LivenessResponseError> {
37+
ExecutionManagerLivenessServiceClient::connect(endpoint)
38+
.await
39+
.map(|inner| Self { client: inner })
40+
.map_err(to_transport_error)
41+
}
42+
}
43+
44+
#[async_trait]
45+
impl LivenessClient for GrpcLivenessClient {
46+
async fn register(&self, ip: IpAddr) -> Result<RegistrationResponse, LivenessResponseError> {
47+
let request = storage::RegisterExecutionManagerRequest {
48+
ip_address: ip.to_string(),
49+
};
50+
let response = self
51+
.client
52+
.clone()
53+
.register_execution_manager(request)
54+
.await
55+
.map_err(to_transport_error)?
56+
.into_inner();
57+
58+
register_response_to_result(response)
59+
}
60+
61+
async fn heartbeat(
62+
&self,
63+
em_id: ExecutionManagerId,
64+
) -> Result<SessionId, LivenessResponseError> {
65+
let request = storage::ExecutionManagerIdRequest {
66+
execution_manager_id: em_id.get(),
67+
};
68+
let response = self
69+
.client
70+
.clone()
71+
.update_execution_manager_heartbeat(request)
72+
.await
73+
.map_err(to_transport_error)?
74+
.into_inner();
75+
76+
heartbeat_response_to_result(response)
77+
}
78+
}
79+
80+
/// # Returns
81+
///
82+
/// [`storage::RegisterExecutionManagerResponse`] converted into
83+
/// [`Result<RegistrationResponse, LivenessResponseError>`].
84+
fn register_response_to_result(
85+
response: storage::RegisterExecutionManagerResponse,
86+
) -> Result<RegistrationResponse, LivenessResponseError> {
87+
match response.result {
88+
Some(register_execution_manager_response::Result::Registration(registration)) => {
89+
Ok(RegistrationResponse {
90+
em_id: ExecutionManagerId::from(registration.execution_manager_id),
91+
session_id: registration.session_id,
92+
})
93+
}
94+
Some(register_execution_manager_response::Result::Error(error)) => {
95+
Err(storage_error_to_liveness_error(error))
96+
}
97+
None => Err(LivenessResponseError::Transport(
98+
"register execution manager response missing result".to_owned(),
99+
)),
100+
}
101+
}
102+
103+
/// # Returns
104+
///
105+
/// [`storage::UpdateExecutionManagerHeartbeatResponse`] converted into
106+
/// [`Result<SessionId, LivenessResponseError>`].
107+
fn heartbeat_response_to_result(
108+
response: storage::UpdateExecutionManagerHeartbeatResponse,
109+
) -> Result<SessionId, LivenessResponseError> {
110+
match response.result {
111+
Some(update_execution_manager_heartbeat_response::Result::SessionId(session_id)) => {
112+
Ok(session_id)
113+
}
114+
Some(update_execution_manager_heartbeat_response::Result::Error(error)) => {
115+
Err(storage_error_to_liveness_error(error))
116+
}
117+
None => Err(LivenessResponseError::Transport(
118+
"update execution manager heartbeat response missing result".to_owned(),
119+
)),
120+
}
121+
}
122+
123+
/// Converts a protobuf storage error into a liveness client error.
124+
///
125+
/// # Returns
126+
///
127+
/// The corresponding [`LivenessResponseError`].
128+
fn storage_error_to_liveness_error(error: storage::StorageError) -> LivenessResponseError {
129+
match storage_error::ErrCode::try_from(error.err_code) {
130+
Ok(storage_error::ErrCode::CacheStale) => LivenessResponseError::MarkedDead,
131+
Ok(storage_error::ErrCode::InvalidInput) => LivenessResponseError::IllegalId(error.message),
132+
Ok(
133+
storage_error::ErrCode::Transport
134+
| storage_error::ErrCode::Server
135+
| storage_error::ErrCode::StaleSession
136+
| storage_error::ErrCode::Unspecified,
137+
) => LivenessResponseError::Transport(error.message),
138+
Err(error) => {
139+
LivenessResponseError::Transport(format!("unknown storage error kind: {error}"))
140+
}
141+
}
142+
}
143+
144+
/// Converts a displayable transport-layer error into [`LivenessResponseError::Transport`].
145+
///
146+
/// # Returns
147+
///
148+
/// A [`LivenessResponseError::Transport`] containing `error`'s display string.
149+
fn to_transport_error(error: impl std::fmt::Display) -> LivenessResponseError {
150+
LivenessResponseError::Transport(error.to_string())
151+
}
152+
153+
#[cfg(test)]
154+
mod tests {
155+
use spider_core::types::id::ExecutionManagerId;
156+
157+
use super::*;
158+
use crate::client::{LivenessResponseError, RegistrationResponse};
159+
160+
#[test]
161+
fn register_response_to_result_returns_registration() {
162+
let response = storage::RegisterExecutionManagerResponse {
163+
result: Some(register_execution_manager_response::Result::Registration(
164+
storage::ExecutionManagerRegistration {
165+
execution_manager_id: 5,
166+
session_id: 7,
167+
},
168+
)),
169+
};
170+
171+
let registration = register_response_to_result(response)
172+
.expect("registration response conversion should succeed");
173+
174+
assert_eq!(
175+
registration,
176+
RegistrationResponse {
177+
em_id: ExecutionManagerId::from(5),
178+
session_id: 7,
179+
}
180+
);
181+
}
182+
183+
#[test]
184+
fn heartbeat_response_to_result_returns_session_id() {
185+
let response = storage::UpdateExecutionManagerHeartbeatResponse {
186+
result: Some(update_execution_manager_heartbeat_response::Result::SessionId(9)),
187+
};
188+
189+
let session_id = heartbeat_response_to_result(response)
190+
.expect("heartbeat response conversion should succeed");
191+
192+
assert_eq!(session_id, 9);
193+
}
194+
195+
#[test]
196+
fn liveness_storage_error_maps_invalid_input_to_illegal_id() {
197+
let error = storage::StorageError {
198+
err_code: storage_error::ErrCode::InvalidInput.into(),
199+
message: "bad em id".to_owned(),
200+
storage_session: 0,
201+
};
202+
203+
match storage_error_to_liveness_error(error) {
204+
LivenessResponseError::IllegalId(message) => {
205+
assert_eq!(message, "bad em id");
206+
}
207+
error => panic!("unexpected liveness response error: {error:?}"),
208+
}
209+
}
210+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! gRPC-backed implementations of the execution manager's client traits.
22
3+
pub mod liveness;
34
pub mod storage;
45

6+
pub use liveness::GrpcLivenessClient;
57
pub use storage::GrpcStorageClient;

0 commit comments

Comments
 (0)