Skip to content

Commit

Permalink
feat: refresh addresses of available schedulers when call grpc api (#197
Browse files Browse the repository at this point in the history
)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jan 4, 2024
1 parent a6f9896 commit a68e50b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "dragonfly-client"
version = "0.1.10"
version = "0.1.11"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
Expand Down
88 changes: 68 additions & 20 deletions src/grpc/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::dynconfig::Dynconfig;
use crate::{Error, Result};
use dragonfly_api::common::v2::{Peer, Task};
use dragonfly_api::manager::v2::Scheduler;
use dragonfly_api::scheduler::v2::{
scheduler_client::SchedulerClient as SchedulerGRPCClient, AnnounceHostRequest,
AnnouncePeerRequest, AnnouncePeerResponse, ExchangePeerRequest, ExchangePeerResponse,
Expand Down Expand Up @@ -49,8 +50,11 @@ pub struct SchedulerClient {
// dynconfig is the dynamic configuration of the dfdaemon.
dynconfig: Arc<Dynconfig>,

// available_schedulers is the endpoints of available schedulers.
available_schedulers: Arc<RwLock<Vec<SocketAddr>>>,
// available_schedulers is the available schedulers.
available_schedulers: Arc<RwLock<Vec<Scheduler>>>,

// available_scheduler_addrs is the addresses of available schedulers.
available_scheduler_addrs: Arc<RwLock<Vec<SocketAddr>>>,

// hashring is the hashring of the scheduler.
hashring: Arc<RwLock<HashRing<VNode>>>,
Expand All @@ -63,10 +67,11 @@ impl SchedulerClient {
let client = Self {
dynconfig,
available_schedulers: Arc::new(RwLock::new(Vec::new())),
available_scheduler_addrs: Arc::new(RwLock::new(Vec::new())),
hashring: Arc::new(RwLock::new(HashRing::new())),
};

client.refresh_scheduler_client().await?;
client.refresh_available_scheduler_addrs().await?;
Ok(client)
}

Expand Down Expand Up @@ -140,8 +145,8 @@ impl SchedulerClient {
#[instrument(skip(self))]
pub async fn init_announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
let mut join_set = JoinSet::new();
let available_schedulers = self.available_schedulers.read().await;
for available_scheduler in available_schedulers.iter() {
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler_addr in available_scheduler_addrs.iter() {
let request = Self::make_request(request.clone());
async fn announce_host(
addr: SocketAddr,
Expand All @@ -159,7 +164,7 @@ impl SchedulerClient {
Ok(())
}

join_set.spawn(announce_host(*available_scheduler, request));
join_set.spawn(announce_host(*available_scheduler_addr, request));
}

while let Some(message) = join_set.join_next().await {
Expand All @@ -175,9 +180,13 @@ impl SchedulerClient {
// announce_host announces the host to the scheduler.
#[instrument(skip(self))]
pub async fn announce_host(&self, request: AnnounceHostRequest) -> Result<()> {
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?;

// Announce the host to the scheduler.
let mut join_set = JoinSet::new();
let available_schedulers = self.available_schedulers.read().await;
for available_scheduler in available_schedulers.iter() {
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler_addr in available_scheduler_addrs.iter() {
let request = Self::make_request(request.clone());
async fn announce_host(
addr: SocketAddr,
Expand All @@ -195,7 +204,7 @@ impl SchedulerClient {
Ok(())
}

join_set.spawn(announce_host(*available_scheduler, request));
join_set.spawn(announce_host(*available_scheduler_addr, request));
}

while let Some(message) = join_set.join_next().await {
Expand All @@ -210,9 +219,13 @@ impl SchedulerClient {
// leave_host tells the scheduler that the host is leaving.
#[instrument(skip(self))]
pub async fn leave_host(&self, request: LeaveHostRequest) -> Result<()> {
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?;

// Leave the host from the scheduler.
let mut join_set = JoinSet::new();
let available_schedulers = self.available_schedulers.read().await;
for available_scheduler in available_schedulers.iter() {
let available_scheduler_addrs = self.available_scheduler_addrs.read().await;
for available_scheduler_addr in available_scheduler_addrs.iter() {
let request = Self::make_request(request.clone());
async fn leave_host(
addr: SocketAddr,
Expand All @@ -230,7 +243,7 @@ impl SchedulerClient {
Ok(())
}

join_set.spawn(leave_host(*available_scheduler, request));
join_set.spawn(leave_host(*available_scheduler_addr, request));
}

while let Some(message) = join_set.join_next().await {
Expand All @@ -245,6 +258,10 @@ impl SchedulerClient {
// client gets the grpc client of the scheduler.
#[instrument(skip(self))]
async fn client(&self, key: String) -> Result<SchedulerGRPCClient<Channel>> {
// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await?;

// Get the scheduler address from the hashring.
let addr = self.hashring.read().await;
let addr = addr.get(&key).ok_or_else(|| Error::HashRing(key.clone()))?;
info!("{} picked {:?}", key, addr);
Expand All @@ -258,7 +275,7 @@ impl SchedulerClient {
Ok(channel) => channel,
Err(err) => {
error!("failed to connect to {:?}: {}", addr, err);
if let Err(err) = self.refresh_scheduler_client().await {
if let Err(err) = self.refresh_available_scheduler_addrs().await {
error!("failed to refresh scheduler client: {}", err);
};

Expand All @@ -269,12 +286,9 @@ impl SchedulerClient {
Ok(SchedulerGRPCClient::new(channel))
}

// get_endpoints gets the endpoints of available schedulers.
// update_available_scheduler_addrs updates the addresses of available schedulers.
#[instrument(skip(self))]
async fn refresh_scheduler_client(&self) -> Result<()> {
// Refresh the dynamic configuration.
self.dynconfig.refresh().await?;

async fn update_available_scheduler_addrs(&self) -> Result<()> {
// Get the endpoints of available schedulers.
let data = self.dynconfig.data.read().await;

Expand All @@ -283,9 +297,27 @@ impl SchedulerClient {
return Err(Error::AvailableSchedulersNotFound());
}

// Get the available schedulers.
let available_schedulers = self.available_schedulers.read().await;

// Check if the available schedulers is not changed.
if data.available_schedulers.len() == available_schedulers.len()
&& data
.available_schedulers
.iter()
.all(|available_scheduler| available_schedulers.contains(available_scheduler))
{
info!("available schedulers is not changed");
return Ok(());
}
drop(available_schedulers);

// Get the available schedulers.
let mut available_schedulers = self.available_schedulers.write().await;

// Get the addresses of available schedulers.
let mut available_scheduler_addrs = self.available_scheduler_addrs.write().await;

// Refresh the hashring.
let mut new_hashring = HashRing::new();
for available_scheduler in data.available_schedulers.iter() {
Expand All @@ -298,7 +330,10 @@ impl SchedulerClient {
};

// Add the scheduler to the available schedulers.
available_schedulers.push(SocketAddr::new(ip, available_scheduler.port as u16));
available_schedulers.push(available_scheduler.clone());

// Add the scheduler address to the addresses of available schedulers.
available_scheduler_addrs.push(SocketAddr::new(ip, available_scheduler.port as u16));

// Add the scheduler to the hashring.
new_hashring.add(VNode {
Expand All @@ -309,10 +344,23 @@ impl SchedulerClient {
// Update the hashring.
let mut hashring = self.hashring.write().await;
*hashring = new_hashring;
info!("refresh available schedulers: {:?}", available_schedulers);
info!(
"refresh available scheduler addresses: {:?}",
available_scheduler_addrs
);
Ok(())
}

// refresh_available_scheduler_addrs refreshes addresses of available schedulers.
#[instrument(skip(self))]
async fn refresh_available_scheduler_addrs(&self) -> Result<()> {
// Refresh the dynamic configuration.
self.dynconfig.refresh().await?;

// Update scheduler addresses of the client.
self.update_available_scheduler_addrs().await
}

// make_request creates a new request with timeout.
fn make_request<T>(request: T) -> tonic::Request<T> {
let mut request = tonic::Request::new(request);
Expand Down

0 comments on commit a68e50b

Please sign in to comment.