Skip to content

Commit

Permalink
add sync_host to DfdaemonUploadClient and optimize DfdaemonUpload.syn…
Browse files Browse the repository at this point in the history
…c_host

Signed-off-by: baowj <[email protected]>
  • Loading branch information
baowj-678 committed Feb 18, 2025
1 parent 9130e6b commit 663a69b
Showing 1 changed file with 39 additions and 31 deletions.
70 changes: 39 additions & 31 deletions dragonfly-client/src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use tonic::{
Code, Request, Response, Status,
};
use tower::ServiceBuilder;
use tracing::{error, info, instrument, Instrument, Span};
use tracing::{debug, error, info, instrument, Instrument, Span};
use url::Url;

/// DfdaemonUploadServer is the grpc server of the upload.
Expand Down Expand Up @@ -911,17 +911,13 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
&self,
request: Request<SyncHostRequest>,
) -> Result<Response<Self::SyncHostStream>, Status> {
/// DEFAULT_INTERFACE_SPEED is the default speed for interfaces.
const DEFAULT_INTERFACE_SPEED: ByteSize = ByteSize::mb(10000 / 8);
/// FIRST_HOST_INFO_REFRESH_INTERVAL is the interval for the first refresh of the host info
/// when start up.
const FIRST_HOST_INFO_REFRESH_INTERVAL: Duration = Duration::from_millis(100);
/// DEFAULT_HOST_INFO_REFRESH_INTERVAL is the default interval for refreshing the host info.
const DEFAULT_HOST_INFO_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
/// MILLISECONDS_PER_SECOND is the number of milliseconds contained per second.
const MILLISECONDS_PER_SECOND: u64 = 1_000;
/// BITS_PER_BYTE is the number of bits contained per byte.
const BITS_PER_BYTE: u64 = 8;
// DEFAULT_HOST_INFO_REFRESH_INTERVAL is the default interval for refreshing the host info.
const DEFAULT_HOST_INFO_REFRESH_INTERVAL: Duration = Duration::from_millis(500);

/// bits_to_bytes converts network speed from bits/sec to bytes/sec.
fn bits_to_bytes(bits_per_sec: u64) -> u64 {
bits_per_sec / 8
}

// Get request ip.
let request_ip = request.remote_addr();
Expand Down Expand Up @@ -950,7 +946,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {

// Get the interface and the interface speed of this request ip.
let mut request_interface = None;
let mut request_interface_speed = DEFAULT_INTERFACE_SPEED.as_u64();
let mut request_interface_speed: u64 = 0;
if let Some(request_ip) = request_ip {
// Get the interface of this request ip.
let interfaces = datalink::interfaces();
Expand All @@ -966,14 +962,13 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let speed_path = format!("/sys/class/net/{}/speed", interface);
let content = fs::read_to_string(speed_path).unwrap_or_default();
if let Ok(speed) = content.trim().parse::<u64>() {
// Convert byte/Sec to bit/Sec.
// Convert bits/sec to bytes/sec.
let speed = ByteSize::mb(bits_to_bytes(speed));
info!(
"interface {} speed is {} for request ip {}",
&interface,
ByteSize::mb(speed / BITS_PER_BYTE),
request_ip,
&interface, speed, request_ip,
);
request_interface_speed = ByteSize::mb(speed / BITS_PER_BYTE).as_u64();
request_interface_speed = speed.as_u64();
}
}
}
Expand All @@ -986,20 +981,17 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
// Initialize sysinfo network.
let mut networks = Networks::new_with_refreshed_list();
let mut last_refresh_time = SystemTime::now();
// Sleep.
tokio::time::sleep(FIRST_HOST_INFO_REFRESH_INTERVAL).await;

// Start the host info update loop.
loop {
let mut host = Host::default();
let mut network = Network {
upload_rate: request_interface_speed,
..Default::default()
};
// Sleep to calculate the network traffic difference over
// the DEFAULT_HOST_INFO_REFRESH_INTERVAL.
tokio::time::sleep(DEFAULT_HOST_INFO_REFRESH_INTERVAL).await;

// Refresh network information.
networks.refresh();
let now_time = SystemTime::now();

// Get interval between two refreshes.
let interval = now_time
.duration_since(last_refresh_time)
Expand All @@ -1008,17 +1000,25 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
// Reset last_refresh_time to now_time.
last_refresh_time = now_time;

let mut host = Host::default();
let mut network = Network {
upload_rate: request_interface_speed,
..Default::default()
};

// Get interface available bandwidth.
if let Some(request_interface) = request_interface.clone() {
for (interface, data) in &networks {
if *interface == request_interface {
if network.upload_rate
< data.transmitted() * MILLISECONDS_PER_SECOND / interval
< data.transmitted() * Duration::from_secs(1).as_millis() as u64
/ interval
{
network.upload_rate = 0;
} else {
network.upload_rate -=
data.transmitted() * MILLISECONDS_PER_SECOND / interval;
network.upload_rate -= data.transmitted()
* Duration::from_secs(1).as_millis() as u64
/ interval;
}
debug!(
"refresh interface {} available bandwidth to {}",
Expand All @@ -1045,9 +1045,6 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
break;
}
};

// Sleep.
tokio::time::sleep(DEFAULT_HOST_INFO_REFRESH_INTERVAL).await;
}
}
.in_current_span(),
Expand Down Expand Up @@ -1791,6 +1788,17 @@ impl DfdaemonUploadClient {
Ok(response.into_inner())
}

/// sync_host provides the host info for parent.
#[instrument(skip_all)]
pub async fn sync_host(
&self,
request: SyncHostRequest,
) -> ClientResult<tonic::Response<tonic::codec::Streaming<Host>>> {
let request = Self::make_request(request);
let response = self.client.clone().sync_host(request).await?;
Ok(response)
}

/// make_request creates a new request with timeout.
#[instrument(skip_all)]
fn make_request<T>(request: T) -> tonic::Request<T> {
Expand Down

0 comments on commit 663a69b

Please sign in to comment.