Skip to content

Commit 2f97660

Browse files
authored
[lighthouse] detect unhealthy participants via heartbeats (#64)
1 parent 6b3665a commit 2f97660

File tree

6 files changed

+253
-59
lines changed

6 files changed

+253
-59
lines changed

src/lib.rs

+6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ impl Manager {
4545
bind: String,
4646
store_addr: String,
4747
world_size: u64,
48+
heartbeat_interval: Duration,
4849
) -> PyResult<Self> {
4950
py.allow_threads(move || {
5051
let runtime = Runtime::new()?;
@@ -56,6 +57,7 @@ impl Manager {
5657
bind,
5758
store_addr,
5859
world_size,
60+
heartbeat_interval,
5961
))
6062
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
6163
let handle = runtime.spawn(manager.clone().run());
@@ -228,16 +230,19 @@ struct Lighthouse {
228230

229231
#[pymethods]
230232
impl Lighthouse {
233+
#[pyo3(signature = (bind, min_replicas, join_timeout_ms=None, quorum_tick_ms=None, heartbeat_timeout_ms=None))]
231234
#[new]
232235
fn new(
233236
py: Python<'_>,
234237
bind: String,
235238
min_replicas: u64,
236239
join_timeout_ms: Option<u64>,
237240
quorum_tick_ms: Option<u64>,
241+
heartbeat_timeout_ms: Option<u64>,
238242
) -> PyResult<Self> {
239243
let join_timeout_ms = join_timeout_ms.unwrap_or(100);
240244
let quorum_tick_ms = quorum_tick_ms.unwrap_or(100);
245+
let heartbeat_timeout_ms = heartbeat_timeout_ms.unwrap_or(5000);
241246

242247
py.allow_threads(move || {
243248
let rt = Runtime::new()?;
@@ -248,6 +253,7 @@ impl Lighthouse {
248253
min_replicas: min_replicas,
249254
join_timeout_ms: join_timeout_ms,
250255
quorum_tick_ms: quorum_tick_ms,
256+
heartbeat_timeout_ms: heartbeat_timeout_ms,
251257
}))
252258
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
253259

0 commit comments

Comments
 (0)