Skip to content

Commit 90ab99e

Browse files
authored
lighthouse: add heartbeats (#6)
1 parent c7e231e commit 90ab99e

File tree

3 files changed

+84
-25
lines changed

3 files changed

+84
-25
lines changed

proto/torchft.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,15 @@ message LighthouseQuorumResponse {
5353
Quorum quorum = 1;
5454
}
5555

56+
message LighthouseHeartbeatRequest {
57+
string replica_id = 1;
58+
}
59+
60+
message LighthouseHeartbeatResponse {}
61+
5662
service LighthouseService {
5763
rpc Quorum (LighthouseQuorumRequest) returns (LighthouseQuorumResponse);
64+
rpc Heartbeat (LighthouseHeartbeatRequest) returns (LighthouseHeartbeatResponse);
5865
}
5966

6067
message ManagerQuorumRequest {

src/lighthouse.rs

Lines changed: 44 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use tonic::{Request, Response, Status};
2121

2222
use crate::torchftpb::{
2323
lighthouse_service_server::{LighthouseService, LighthouseServiceServer},
24-
LighthouseQuorumRequest, LighthouseQuorumResponse, Quorum, QuorumMember,
24+
LighthouseHeartbeatRequest, LighthouseHeartbeatResponse, LighthouseQuorumRequest,
25+
LighthouseQuorumResponse, Quorum, QuorumMember,
2526
};
2627

2728
struct QuorumMemberDetails {
@@ -34,6 +35,10 @@ struct State {
3435
participants: HashMap<String, QuorumMemberDetails>,
3536
prev_quorum: Option<Quorum>,
3637
quorum_id: i64,
38+
39+
// heartbeat information
40+
// replica_id -> last heartbeat
41+
heartbeats: HashMap<String, Instant>,
3742
}
3843

3944
pub struct Lighthouse {
@@ -74,6 +79,7 @@ impl Lighthouse {
7479
channel: tx,
7580
prev_quorum: None,
7681
quorum_id: 0,
82+
heartbeats: HashMap::new(),
7783
}),
7884
opt: opt,
7985
})
@@ -176,15 +182,15 @@ impl Lighthouse {
176182
Ok(())
177183
}
178184

179-
pub async fn _run_quorum(self: Arc<Self>) -> Result<()> {
185+
async fn _run_quorum(self: Arc<Self>) -> Result<()> {
180186
loop {
181187
self.clone()._quorum_tick().await?;
182188

183189
sleep(Duration::from_millis(self.opt.quorum_tick_ms)).await;
184190
}
185191
}
186192

187-
pub async fn _run_grpc(self: Arc<Self>) -> Result<()> {
193+
async fn _run_grpc(self: Arc<Self>) -> Result<()> {
188194
let bind = self.opt.bind.parse()?;
189195
info!("Lighthouse listening on {}", bind);
190196

@@ -248,6 +254,21 @@ impl LighthouseService for Arc<Lighthouse> {
248254

249255
Ok(Response::new(reply))
250256
}
257+
258+
async fn heartbeat(
259+
&self,
260+
request: Request<LighthouseHeartbeatRequest>,
261+
) -> Result<Response<LighthouseHeartbeatResponse>, Status> {
262+
let replica_id = request.into_inner().replica_id;
263+
264+
{
265+
let mut state = self.state.lock().await;
266+
state.heartbeats.insert(replica_id, Instant::now());
267+
}
268+
269+
let reply = LighthouseHeartbeatResponse {};
270+
Ok(Response::new(reply))
271+
}
251272
}
252273

253274
#[cfg(test)]
@@ -364,18 +385,28 @@ mod tests {
364385
.await
365386
.unwrap();
366387

367-
let request = tonic::Request::new(LighthouseQuorumRequest {
368-
requester: Some(QuorumMember {
388+
{
389+
let request = tonic::Request::new(LighthouseHeartbeatRequest {
369390
replica_id: "foo".to_string(),
370-
address: "".to_string(),
371-
store_address: "".to_string(),
372-
step: 10,
373-
}),
374-
});
391+
});
392+
393+
let response = client.heartbeat(request).await.unwrap();
394+
}
395+
396+
{
397+
let request = tonic::Request::new(LighthouseQuorumRequest {
398+
requester: Some(QuorumMember {
399+
replica_id: "foo".to_string(),
400+
address: "".to_string(),
401+
store_address: "".to_string(),
402+
step: 10,
403+
}),
404+
});
375405

376-
let response = client.quorum(request).await.unwrap();
377-
let quorum = response.into_inner().quorum.unwrap();
378-
assert_eq!(quorum.participants.len(), 1);
406+
let response = client.quorum(request).await.unwrap();
407+
let quorum = response.into_inner().quorum.unwrap();
408+
assert_eq!(quorum.participants.len(), 1);
409+
}
379410

380411
lighthouse_task.abort();
381412
}

src/manager.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use std::time::Duration;
1212
use anyhow::Result;
1313
use tokio::sync::broadcast;
1414
use tokio::sync::Mutex;
15+
use tokio::task::JoinSet;
16+
use tokio::time::sleep;
1517
use tonic::transport::Server;
1618
use tonic::{Request, Response, Status};
1719

@@ -21,9 +23,9 @@ use crate::torchftpb::lighthouse_service_client::LighthouseServiceClient;
2123
use crate::torchftpb::manager_service_client::ManagerServiceClient;
2224
use crate::torchftpb::{
2325
manager_service_server::{ManagerService, ManagerServiceServer},
24-
CheckpointAddressRequest, CheckpointAddressResponse, LighthouseQuorumRequest,
25-
ManagerQuorumRequest, ManagerQuorumResponse, Quorum, QuorumMember, ShouldCommitRequest,
26-
ShouldCommitResponse,
26+
CheckpointAddressRequest, CheckpointAddressResponse, LighthouseHeartbeatRequest,
27+
LighthouseQuorumRequest, ManagerQuorumRequest, ManagerQuorumResponse, Quorum, QuorumMember,
28+
ShouldCommitRequest, ShouldCommitResponse,
2729
};
2830

2931
#[cfg(not(test))]
@@ -99,6 +101,19 @@ impl Manager {
99101
}
100102

101103
pub async fn run(self: Arc<Self>) -> Result<()> {
104+
let mut set = JoinSet::new();
105+
106+
set.spawn(self.clone()._run_heartbeat());
107+
108+
set.spawn(self.clone()._run_grpc());
109+
110+
while let Some(res) = set.join_next().await {
111+
res??;
112+
}
113+
Ok(())
114+
}
115+
116+
async fn _run_grpc(self: Arc<Self>) -> Result<()> {
102117
let bind = self.bind.parse()?;
103118
info!("Manager {} listening on {}", self.replica_id, bind);
104119

@@ -109,6 +124,19 @@ impl Manager {
109124
.map_err(|e| e.into())
110125
}
111126

127+
async fn _run_heartbeat(self: Arc<Self>) -> Result<()> {
128+
let mut client = self.lighthouse_client_new().await?;
129+
loop {
130+
let request = tonic::Request::new(LighthouseHeartbeatRequest {
131+
replica_id: self.replica_id.clone(),
132+
});
133+
134+
let response = client.heartbeat(request).await;
135+
136+
sleep(Duration::from_millis(100)).await;
137+
}
138+
}
139+
112140
async fn lighthouse_client_new(&self) -> Result<LighthouseServiceClient<Channel>> {
113141
info!(
114142
"Manager: connecting to lighthouse at {}",
@@ -333,17 +361,14 @@ mod tests {
333361
#[tokio::test]
334362
async fn test_should_commit() -> Result<()> {
335363
let manager = Manager::new(
336-
"repid".to_string(),
364+
"rep_id".to_string(),
337365
"lighthouse".to_string(),
338366
"addr".to_string(),
339367
"0.0.0.0:29531".to_string(),
340368
"store_addr".to_string(),
341369
2,
342370
);
343-
println!("manager spawn");
344-
let manager_fut = tokio::spawn(manager.run());
345-
346-
println!("should_commit1");
371+
let manager_fut = tokio::spawn(manager._run_grpc());
347372

348373
let fut_a = tokio::spawn(should_commit(0, true));
349374
let fut_b = tokio::spawn(should_commit(1, true));
@@ -353,8 +378,6 @@ mod tests {
353378
assert!(resp_a.should_commit);
354379
assert!(resp_b.should_commit);
355380

356-
println!("should_commit2");
357-
358381
let fut_a = tokio::spawn(should_commit(0, true));
359382
let fut_b = tokio::spawn(should_commit(1, false));
360383
let resp_a = fut_a.await??;
@@ -363,8 +386,6 @@ mod tests {
363386
assert!(!resp_a.should_commit);
364387
assert!(!resp_b.should_commit);
365388

366-
println!("aborting");
367-
368389
manager_fut.abort();
369390

370391
Ok(())

0 commit comments

Comments
 (0)