Skip to content

Commit 529233a

Browse files
committed
[lighthouse] detect unhealthy participants via heartbeats
1 parent 5dd6f38 commit 529233a

File tree

6 files changed

+192
-37
lines changed

6 files changed

+192
-37
lines changed

src/lib.rs

+6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ impl Manager {
4444
bind: String,
4545
store_addr: String,
4646
world_size: u64,
47+
heartbeat_interval: Duration,
4748
) -> PyResult<Self> {
4849
py.allow_threads(move || {
4950
let runtime = Runtime::new()?;
@@ -55,6 +56,7 @@ impl Manager {
5556
bind,
5657
store_addr,
5758
world_size,
59+
heartbeat_interval,
5860
))
5961
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
6062
let handle = runtime.spawn(manager.clone().run());
@@ -224,16 +226,19 @@ struct Lighthouse {
224226

225227
#[pymethods]
226228
impl Lighthouse {
229+
#[pyo3(signature = (bind, min_replicas, join_timeout_ms=None, quorum_tick_ms=None, heartbeat_timeout_ms=None))]
227230
#[new]
228231
fn new(
229232
py: Python<'_>,
230233
bind: String,
231234
min_replicas: u64,
232235
join_timeout_ms: Option<u64>,
233236
quorum_tick_ms: Option<u64>,
237+
heartbeat_timeout_ms: Option<u64>,
234238
) -> PyResult<Self> {
235239
let join_timeout_ms = join_timeout_ms.unwrap_or(100);
236240
let quorum_tick_ms = quorum_tick_ms.unwrap_or(100);
241+
let heartbeat_timeout_ms = heartbeat_timeout_ms.unwrap_or(5000);
237242

238243
py.allow_threads(move || {
239244
let rt = Runtime::new()?;
@@ -244,6 +249,7 @@ impl Lighthouse {
244249
min_replicas: min_replicas,
245250
join_timeout_ms: join_timeout_ms,
246251
quorum_tick_ms: quorum_tick_ms,
252+
heartbeat_timeout_ms: heartbeat_timeout_ms,
247253
}))
248254
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
249255

0 commit comments

Comments
 (0)