@@ -15,7 +15,7 @@ use std::collections::HashMap;
1515use tokio:: net:: TcpListener ;
1616use tokio:: sync:: mpsc:: UnboundedSender ;
1717use tokio:: sync:: watch;
18- use tokio:: task:: JoinHandle ;
18+ use tokio:: task:: { JoinError , JoinHandle } ;
1919
2020#[ cfg( feature = "cassandra" ) ]
2121pub mod cassandra;
@@ -35,7 +35,8 @@ pub enum Transport {
3535
3636#[ derive( Debug ) ]
3737pub struct Source {
38- pub join_handle : JoinHandle < ( ) > ,
38+ pub listener_task : JoinHandle < ( ) > ,
39+ /// This value must remain alive for as long as the Source is in use.
3940 pub hot_reload_tx : UnboundedSender < HotReloadListenerRequest > ,
4041 pub gradual_shutdown_tx : UnboundedSender < GradualShutdownRequest > ,
4142 pub name : String ,
@@ -49,16 +50,20 @@ impl Source {
4950 name : String ,
5051 ) -> Self {
5152 Self {
52- join_handle,
53+ listener_task : join_handle,
5354 hot_reload_tx,
5455 gradual_shutdown_tx,
5556 name,
5657 }
5758 }
5859
59- pub fn into_join_handle ( self ) -> JoinHandle < ( ) > {
60- self . join_handle
60+ pub async fn join ( self ) -> Result < ( ) , JoinError > {
61+ self . listener_task . await ?;
62+ // explicitly drop hot_reload_tx here, to show that it occurs after the listener_task has shutdown.
63+ std:: mem:: drop ( self . hot_reload_tx ) ;
64+ Ok ( ( ) )
6165 }
66+
6267 pub fn get_hot_reload_tx ( & self ) -> UnboundedSender < HotReloadListenerRequest > {
6368 self . hot_reload_tx . clone ( )
6469 }
0 commit comments