Skip to content

Commit 3ff7f46

Browse files
fix lint
1 parent fd4281b commit 3ff7f46

22 files changed

Lines changed: 101 additions & 109 deletions

ballista/core/src/client.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ use datafusion::error::Result;
4747
use crate::extension::BallistaConfigGrpcEndpoint;
4848
use crate::serde::protobuf;
4949

50-
use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
51-
5250
use crate::utils::create_grpc_client_endpoint;
5351

5452
use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream};

ballista/core/src/execution_plans/distributed_query.rs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ use crate::serde::protobuf::{
2828
scheduler_grpc_client::SchedulerGrpcClient,
2929
};
3030

31-
use crate::utils::{GrpcClientConfig, create_grpc_client_connection};
32-
3331
use crate::utils::create_grpc_client_endpoint;
3432

3533
use datafusion::arrow::datatypes::SchemaRef;
@@ -266,8 +264,6 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
266264
self.scheduler_url.clone(),
267265
self.session_id.clone(),
268266
query,
269-
self.config.default_grpc_client_max_message_size(),
270-
GrpcClientConfig::from(&self.config),
271267
Arc::new(self.metrics.clone()),
272268
partition,
273269
self.config.clone(),
@@ -306,16 +302,13 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
306302
}
307303
}
308304

305+
#[allow(clippy::too_many_arguments)]
309306
async fn execute_query(
310307
scheduler_url: String,
311308
session_id: String,
312309
query: ExecuteQueryParams,
313-
314-
max_message_size: usize,
315-
grpc_config: GrpcClientConfig,
316310
metrics: Arc<ExecutionPlanMetricsSet>,
317311
partition: usize,
318-
319312
config: BallistaConfig,
320313
grpc_interceptor: Arc<BallistaGrpcMetadataInterceptor>,
321314
customize_endpoint: Option<Arc<BallistaConfigGrpcEndpoint>>,

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ fn local_remote_read_split(
392392
}
393393
}
394394

395+
#[allow(clippy::too_many_arguments)]
395396
fn send_fetch_partitions(
396397
partition_locations: Vec<PartitionLocation>,
397398
max_request_num: usize,
@@ -433,21 +434,21 @@ fn send_fetch_partitions(
433434
.await;
434435

435436
// Record local read metrics if callback is set and read succeeded
436-
if r.is_ok() {
437-
if let Some(ref callback) = metrics_callback_c {
438-
let duration_ms = start_time.elapsed().as_millis() as u64;
439-
let bytes = p.partition_stats.num_bytes().unwrap_or(0);
440-
let rows = p.partition_stats.num_rows().unwrap_or(0);
441-
callback.record_local_read(
442-
&p.partition_id.job_id,
443-
p.partition_id.stage_id,
444-
p.partition_id.partition_id,
445-
&p.executor_meta.id,
446-
bytes,
447-
rows,
448-
duration_ms,
449-
);
450-
}
437+
if r.is_ok()
438+
&& let Some(ref callback) = metrics_callback_c
439+
{
440+
let duration_ms = start_time.elapsed().as_millis() as u64;
441+
let bytes = p.partition_stats.num_bytes().unwrap_or(0);
442+
let rows = p.partition_stats.num_rows().unwrap_or(0);
443+
callback.record_local_read(
444+
&p.partition_id.job_id,
445+
p.partition_id.stage_id,
446+
p.partition_id.partition_id,
447+
&p.executor_meta.id,
448+
bytes,
449+
rows,
450+
duration_ms,
451+
);
451452
}
452453

453454
if let Err(e) = response_sender_c.send(r).await {
@@ -476,21 +477,21 @@ fn send_fetch_partitions(
476477
.await;
477478

478479
// Record remote read metrics if callback is set and read succeeded
479-
if r.is_ok() {
480-
if let Some(ref callback) = metrics_callback_c {
481-
let duration_ms = start_time.elapsed().as_millis() as u64;
482-
let bytes = p.partition_stats.num_bytes().unwrap_or(0);
483-
let rows = p.partition_stats.num_rows().unwrap_or(0);
484-
callback.record_remote_read(
485-
&p.partition_id.job_id,
486-
p.partition_id.stage_id,
487-
p.partition_id.partition_id,
488-
&p.executor_meta.id,
489-
bytes,
490-
rows,
491-
duration_ms,
492-
);
493-
}
480+
if r.is_ok()
481+
&& let Some(ref callback) = metrics_callback_c
482+
{
483+
let duration_ms = start_time.elapsed().as_millis() as u64;
484+
let bytes = p.partition_stats.num_bytes().unwrap_or(0);
485+
let rows = p.partition_stats.num_rows().unwrap_or(0);
486+
callback.record_remote_read(
487+
&p.partition_id.job_id,
488+
p.partition_id.stage_id,
489+
p.partition_id.partition_id,
490+
&p.executor_meta.id,
491+
bytes,
492+
rows,
493+
duration_ms,
494+
);
494495
}
495496

496497
// Block if the channel buffer is full.

ballista/core/src/extension.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,11 +161,13 @@ pub trait SessionConfigExt {
161161
/// Get a `tonic` interceptor configured to decorate the provided metadata keys
162162
fn ballista_grpc_interceptor(&self) -> Arc<BallistaGrpcMetadataInterceptor>;
163163

164+
/// Set an override function for creating gRPC client endpoints.
164165
fn with_ballista_override_create_grpc_client_endpoint(
165166
self,
166167
override_f: EndpointOverrideFn,
167168
) -> Self;
168169

170+
/// Get the override function for creating gRPC client endpoints.
169171
fn ballista_override_create_grpc_client_endpoint(
170172
&self,
171173
) -> Option<Arc<BallistaConfigGrpcEndpoint>>;
@@ -664,6 +666,7 @@ pub struct BallistaGrpcMetadataInterceptor {
664666
}
665667

666668
impl BallistaGrpcMetadataInterceptor {
669+
/// Create a new interceptor with additional metadata to be added to requests.
667670
pub fn new(additional_metadata: HashMap<String, String>) -> Self {
668671
Self {
669672
additional_metadata,
@@ -694,16 +697,19 @@ impl Interceptor for BallistaGrpcMetadataInterceptor {
694697
}
695698
}
696699

700+
/// Wrapper for gRPC endpoint configuration override function.
697701
#[derive(Clone)]
698702
pub struct BallistaConfigGrpcEndpoint {
699703
override_f: EndpointOverrideFn,
700704
}
701705

702706
impl BallistaConfigGrpcEndpoint {
707+
/// Create a new endpoint configuration with the given override function.
703708
pub fn new(override_f: EndpointOverrideFn) -> Self {
704709
Self { override_f }
705710
}
706711

712+
/// Configure an endpoint using the override function.
707713
pub fn configure_endpoint(
708714
&self,
709715
endpoint: Endpoint,
@@ -735,6 +741,7 @@ pub trait ShuffleReadMetricsCallback: Send + Sync {
735741
/// * `bytes` - Number of bytes read
736742
/// * `rows` - Number of rows read
737743
/// * `duration_ms` - Time taken to read the partition
744+
#[allow(clippy::too_many_arguments)]
738745
fn record_local_read(
739746
&self,
740747
job_id: &str,
@@ -759,6 +766,7 @@ pub trait ShuffleReadMetricsCallback: Send + Sync {
759766
/// * `bytes` - Number of bytes read
760767
/// * `rows` - Number of rows read
761768
/// * `duration_ms` - Time taken to fetch the partition
769+
#[allow(clippy::too_many_arguments)]
762770
fn record_remote_read(
763771
&self,
764772
job_id: &str,
@@ -810,6 +818,7 @@ pub trait ResultFetchMetricsCallback: Send + Sync {
810818
/// * `bytes` - Number of bytes fetched
811819
/// * `rows` - Number of rows fetched
812820
/// * `duration_ms` - Time taken to fetch the partition
821+
#[allow(clippy::too_many_arguments)]
813822
fn record_result_fetch(
814823
&self,
815824
job_id: &str,

ballista/core/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ pub mod registry;
5555
/// Serialization and deserialization for Ballista messages and plans.
5656
pub mod serde;
5757

58-
/// General utility functions for Ballista operations.
59-
58+
/// Remote catalog serialization and stub providers for Ballista clients.
6059
pub mod remote_catalog;
6160

61+
/// General utility functions for Ballista operations.
6262
pub mod utils;
6363

6464
///

ballista/core/src/remote_catalog/catalog_serialize_ext.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,20 @@
1818
use crate::serde::generated::ballista::{CatalogInfo, SchemaInfo, TableInfo};
1919
use datafusion::catalog::CatalogProvider;
2020
use datafusion::prelude::SessionContext;
21-
use futures::stream;
2221
use futures::StreamExt;
22+
use futures::stream;
2323
use std::sync::Arc;
2424

25-
/// Used to serialize catalog schemas and names to ship to Ballista clients
25+
/// Extension trait for serializing catalog schemas and names to ship to Ballista clients.
2626
#[async_trait::async_trait]
2727
pub trait CatalogSerializeExt {
28+
/// Serialize all catalogs in the session context.
2829
async fn serialize_catalogs(&self) -> Vec<CatalogInfo>;
2930

31+
/// Serialize a specific catalog by name.
3032
async fn serialize_catalog(&self, name: &str) -> Option<CatalogInfo>;
3133

34+
/// Serialize a specific schema within a catalog.
3235
async fn serialize_schema(
3336
&self,
3437
name: &str,

ballista/core/src/remote_catalog/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,17 @@
1616
// under the License.
1717
//
1818

19+
//! Remote catalog serialization and stub providers for Ballista clients.
20+
//!
21+
//! This module provides functionality to serialize catalog metadata (schemas, tables, functions)
22+
//! from the scheduler to ship to Ballista clients, as well as stub providers that allow clients
23+
//! to perform logical planning without access to actual table data.
24+
25+
/// Extension trait for serializing catalog schemas and table names.
1926
pub mod catalog_serialize_ext;
27+
/// Extension trait for serializing user-defined functions.
2028
pub mod remote_function_serialize_ext;
29+
/// Stub scalar UDF implementation for remote function planning.
2130
pub mod remote_scalar_udf;
31+
/// Stub table provider for remote table planning.
2232
pub mod remote_table_provider;

ballista/core/src/remote_catalog/remote_function_serialize_ext.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ use datafusion::prelude::SessionContext;
2525
use datafusion_proto_common::ArrowType;
2626
use std::collections::HashSet;
2727

28-
/// Used to serialize function shapes to ship to Ballista clients
28+
/// Extension trait for serializing function signatures to ship to Ballista clients.
2929
pub trait RemoteFunctionSerializeExt {
30+
/// Serialize all user-defined scalar functions in the session context.
3031
fn serialize_udfs(&self) -> Vec<ScalarUdfInfo>;
3132
}
3233

ballista/core/src/remote_catalog/remote_scalar_udf.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use crate::serde::protobuf::ScalarUdfInfo;
2020
use datafusion::arrow::datatypes::DataType;
2121
use datafusion::common::Result;
22-
use datafusion::common::{exec_err, plan_err, DataFusionError};
22+
use datafusion::common::{DataFusionError, exec_err, plan_err};
2323
use datafusion::logical_expr::{
2424
ColumnarValue, Documentation, ScalarFunctionArgs, ScalarUDFImpl, Signature,
2525
TypeSignature, Volatility,
@@ -56,6 +56,7 @@ impl PartialEq for RemoteScalarUDF {
5656
impl Eq for RemoteScalarUDF {}
5757

5858
impl RemoteScalarUDF {
59+
/// Create a new RemoteScalarUDF from a ScalarUdfInfo protobuf message.
5960
pub fn new(meta: ScalarUdfInfo) -> Result<Self> {
6061
let mut arities = vec![];
6162

ballista/core/src/remote_catalog/remote_table_provider.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use datafusion::arrow::datatypes::SchemaRef;
2020
use datafusion::catalog::{Session, TableProvider};
21-
use datafusion::common::{exec_err, Result};
21+
use datafusion::common::{Result, exec_err};
2222
use datafusion::datasource::TableType;
2323
use datafusion::logical_expr::Expr;
2424
use datafusion::physical_plan::ExecutionPlan;
@@ -36,6 +36,7 @@ pub struct RemoteTableProvider {
3636
}
3737

3838
impl RemoteTableProvider {
39+
/// Create a new RemoteTableProvider with the given catalog, schema, and table names.
3940
pub fn new(
4041
catalog_name: &str,
4142
schema_name: &str,
@@ -50,14 +51,17 @@ impl RemoteTableProvider {
5051
}
5152
}
5253

54+
/// Get the catalog name.
5355
pub fn catalog_name(&self) -> &str {
5456
&self.catalog_name
5557
}
5658

59+
/// Get the schema name.
5760
pub fn schema_name(&self) -> &str {
5861
&self.schema_name
5962
}
6063

64+
/// Get the table name.
6165
pub fn table_name(&self) -> &str {
6266
&self.table_name
6367
}
@@ -84,7 +88,11 @@ impl TableProvider for RemoteTableProvider {
8488
_filters: &[Expr],
8589
_limit: Option<usize>,
8690
) -> Result<Arc<dyn ExecutionPlan>> {
87-
exec_err!("{}.{}.{} is a stub table implementation to be resolved on the Ballista scheduler. It should not be scanned on the client. This is a bug.",
88-
self.catalog_name, self.schema_name, self.table_name)
91+
exec_err!(
92+
"{}.{}.{} is a stub table implementation to be resolved on the Ballista scheduler. It should not be scanned on the client. This is a bug.",
93+
self.catalog_name,
94+
self.schema_name,
95+
self.table_name
96+
)
8997
}
9098
}

0 commit comments

Comments
 (0)