Skip to content

Commit ddbc3c9

Browse files
authored
manager: added E2E tests and support getting lighthouse and manager addresses (#25)
1 parent ab66c7c commit ddbc3c9

File tree

4 files changed

+146
-60
lines changed

4 files changed

+146
-60
lines changed

src/bin/lighthouse.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ async fn main() {
1717
.unwrap();
1818

1919
let opt = LighthouseOpt::from_args();
20-
let lighthouse = Lighthouse::new(opt);
20+
let lighthouse = Lighthouse::new(opt).await.unwrap();
2121

2222
lighthouse.run().await.unwrap();
2323
}

src/lib.rs

+19-19
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ pub mod manager;
99

1010
use core::time::Duration;
1111
use std::env;
12-
use std::sync::Arc;
1312

1413
use anyhow::Result;
1514
use pyo3::exceptions::PyRuntimeError;
@@ -28,8 +27,6 @@ use pyo3::prelude::*;
2827

2928
#[pyclass]
3029
struct Manager {
31-
runtime: Runtime,
32-
manager: Arc<manager::Manager>,
3330
handle: JoinHandle<Result<()>>,
3431
}
3532

@@ -47,20 +44,18 @@ impl Manager {
4744
) -> Self {
4845
py.allow_threads(move || {
4946
let runtime = Runtime::new().unwrap();
50-
let manager = manager::Manager::new(
51-
replica_id,
52-
lighthouse_addr,
53-
address,
54-
bind,
55-
store_addr,
56-
world_size,
57-
);
47+
let manager = runtime
48+
.block_on(manager::Manager::new(
49+
replica_id,
50+
lighthouse_addr,
51+
address,
52+
bind,
53+
store_addr,
54+
world_size,
55+
))
56+
.unwrap();
5857
let handle = runtime.spawn(manager.clone().run());
59-
Self {
60-
runtime: runtime,
61-
manager: manager,
62-
handle: handle,
63-
}
58+
Self { handle: handle }
6459
})
6560
}
6661

@@ -193,11 +188,16 @@ fn lighthouse_main(py: Python<'_>) {
193188
let mut args = env::args();
194189
args.next(); // discard binary arg
195190
let opt = lighthouse::LighthouseOpt::from_iter(args);
196-
let lighthouse = lighthouse::Lighthouse::new(opt);
197-
198191
let rt = Runtime::new().unwrap();
192+
rt.block_on(lighthouse_main_async(opt)).unwrap();
193+
}
199194

200-
rt.block_on(lighthouse.run()).unwrap();
195+
async fn lighthouse_main_async(opt: lighthouse::LighthouseOpt) -> Result<()> {
196+
let lighthouse = lighthouse::Lighthouse::new(opt).await?;
197+
198+
lighthouse.run().await?;
199+
200+
Ok(())
201201
}
202202

203203
#[pymodule]

src/lighthouse.rs

+43-27
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use tokio::sync::Mutex;
2727
use tokio::task::JoinSet;
2828
use tokio::time::sleep;
2929
use tonic::service::Routes;
30+
use tonic::transport::server::TcpIncoming;
3031
use tonic::transport::Server;
3132
use tonic::{Request, Response, Status};
3233

@@ -56,23 +57,25 @@ struct State {
5657
pub struct Lighthouse {
5758
state: Mutex<State>,
5859
opt: LighthouseOpt,
60+
listener: Mutex<Option<tokio::net::TcpListener>>,
61+
local_addr: SocketAddr,
5962
}
6063

6164
#[derive(StructOpt, Debug)]
6265
#[structopt()]
6366
pub struct LighthouseOpt {
6467
// bind is the address to bind the server to.
6568
#[structopt(long = "bind", default_value = "[::]:29510")]
66-
bind: String,
69+
pub bind: String,
6770

6871
#[structopt(long = "join_timeout_ms", default_value = "60000")]
69-
join_timeout_ms: u64,
72+
pub join_timeout_ms: u64,
7073

7174
#[structopt(long = "min_replicas")]
72-
min_replicas: u64,
75+
pub min_replicas: u64,
7376

7477
#[structopt(long = "quorum_tick_ms", default_value = "100")]
75-
quorum_tick_ms: u64,
78+
pub quorum_tick_ms: u64,
7679
}
7780

7881
fn quorum_changed(a: &Vec<QuorumMember>, b: &Vec<QuorumMember>) -> bool {
@@ -83,9 +86,10 @@ fn quorum_changed(a: &Vec<QuorumMember>, b: &Vec<QuorumMember>) -> bool {
8386
}
8487

8588
impl Lighthouse {
86-
pub fn new(opt: LighthouseOpt) -> Arc<Self> {
89+
pub async fn new(opt: LighthouseOpt) -> Result<Arc<Self>> {
8790
let (tx, _) = broadcast::channel(16);
88-
Arc::new(Self {
91+
let listener = tokio::net::TcpListener::bind(&opt.bind).await?;
92+
Ok(Arc::new(Self {
8993
state: Mutex::new(State {
9094
participants: HashMap::new(),
9195
channel: tx,
@@ -94,7 +98,9 @@ impl Lighthouse {
9498
heartbeats: HashMap::new(),
9599
}),
96100
opt: opt,
97-
})
101+
local_addr: listener.local_addr()?,
102+
listener: Mutex::new(Some(listener)),
103+
}))
98104
}
99105

100106
// Checks whether the quorum is valid and an explanation for the state.
@@ -209,13 +215,20 @@ impl Lighthouse {
209215
}
210216
}
211217

212-
async fn _run_grpc(self: Arc<Self>) -> Result<()> {
213-
let bind: SocketAddr = self.opt.bind.parse()?;
214-
info!(
215-
"Lighthouse listening on: http://{}:{}",
218+
pub fn address(&self) -> String {
219+
format!(
220+
"http://{}:{}",
216221
gethostname().into_string().unwrap(),
217-
bind.port()
218-
);
222+
self.local_addr.port()
223+
)
224+
}
225+
226+
async fn _run_grpc(self: Arc<Self>) -> Result<()> {
227+
info!("Lighthouse listening on: {}", self.address());
228+
229+
let listener = self.listener.lock().await.take().unwrap();
230+
let incoming =
231+
TcpIncoming::from_listener(listener, true, None).map_err(|e| anyhow::anyhow!(e))?;
219232

220233
// Setup HTTP endpoints
221234
let app = Router::new()
@@ -245,7 +258,7 @@ impl Lighthouse {
245258
// allow non-GRPC connections
246259
.accept_http1(true)
247260
.add_routes(routes)
248-
.serve(bind)
261+
.serve_with_incoming(incoming)
249262
.await
250263
.map_err(|e| e.into())
251264
}
@@ -429,14 +442,14 @@ mod tests {
429442

430443
use crate::torchftpb::lighthouse_service_client::LighthouseServiceClient;
431444

432-
fn lighthouse_test_new() -> Arc<Lighthouse> {
445+
async fn lighthouse_test_new() -> Result<Arc<Lighthouse>> {
433446
let opt = LighthouseOpt {
434447
min_replicas: 1,
435-
bind: "0.0.0.0:29510".to_string(),
448+
bind: "[::]:0".to_string(),
436449
join_timeout_ms: 60 * 60 * 1000, // 1hr
437450
quorum_tick_ms: 10,
438451
};
439-
Lighthouse::new(opt)
452+
Lighthouse::new(opt).await
440453
}
441454

442455
async fn lighthouse_client_new(addr: String) -> Result<LighthouseServiceClient<Channel>> {
@@ -448,8 +461,8 @@ mod tests {
448461
}
449462

450463
#[tokio::test]
451-
async fn test_quorum_join_timeout() {
452-
let lighthouse = lighthouse_test_new();
464+
async fn test_quorum_join_timeout() -> Result<()> {
465+
let lighthouse = lighthouse_test_new().await?;
453466
assert!(!lighthouse.quorum_valid().await.0);
454467

455468
{
@@ -478,11 +491,13 @@ mod tests {
478491
}
479492

480493
assert!(lighthouse.quorum_valid().await.0);
494+
495+
Ok(())
481496
}
482497

483498
#[tokio::test]
484-
async fn test_quorum_fast_prev_quorum() {
485-
let lighthouse = lighthouse_test_new();
499+
async fn test_quorum_fast_prev_quorum() -> Result<()> {
500+
let lighthouse = lighthouse_test_new().await?;
486501
assert!(!lighthouse.quorum_valid().await.0);
487502

488503
{
@@ -520,23 +535,23 @@ mod tests {
520535
}
521536

522537
assert!(lighthouse.quorum_valid().await.0);
538+
539+
Ok(())
523540
}
524541

525542
#[tokio::test]
526-
async fn test_lighthouse_e2e() {
543+
async fn test_lighthouse_e2e() -> Result<()> {
527544
let opt = LighthouseOpt {
528545
min_replicas: 1,
529-
bind: "0.0.0.0:29510".to_string(),
546+
bind: "[::]:0".to_string(),
530547
join_timeout_ms: 1,
531548
quorum_tick_ms: 10,
532549
};
533-
let lighthouse = Lighthouse::new(opt);
550+
let lighthouse = Lighthouse::new(opt).await?;
534551

535552
let lighthouse_task = tokio::spawn(lighthouse.clone().run());
536553

537-
let mut client = lighthouse_client_new("http://localhost:29510".to_string())
538-
.await
539-
.unwrap();
554+
let mut client = lighthouse_client_new(lighthouse.address()).await.unwrap();
540555

541556
{
542557
let request = tonic::Request::new(LighthouseHeartbeatRequest {
@@ -563,6 +578,7 @@ mod tests {
563578
}
564579

565580
lighthouse_task.abort();
581+
Ok(())
566582
}
567583

568584
#[tokio::test]

0 commit comments

Comments
 (0)