Skip to content

Commit 4966e40

Browse files
committed
wip
1 parent c078cae commit 4966e40

2 files changed

Lines changed: 14 additions & 14 deletions

File tree

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl Stream for InstrumentedStream {
107107
let this = &mut *self;
108108
if !this.first_poll_logged {
109109
this.first_poll_logged = true;
110-
info!("InstrumentedStream({}): first poll", this.label);
110+
debug!("InstrumentedStream({}): first poll", this.label);
111111
}
112112
this.poll_count += 1;
113113

@@ -116,7 +116,7 @@ impl Stream for InstrumentedStream {
116116
Poll::Ready(Some(Ok(batch))) => {
117117
if !this.first_batch_logged {
118118
this.first_batch_logged = true;
119-
info!(
119+
debug!(
120120
"InstrumentedStream({}): first batch ({} rows) after {:.3}s, {} polls",
121121
this.label,
122122
batch.num_rows(),
@@ -134,7 +134,7 @@ impl Stream for InstrumentedStream {
134134
);
135135
}
136136
Poll::Ready(None) => {
137-
info!(
137+
debug!(
138138
"InstrumentedStream({}): stream ended after {:.3}s, {} polls",
139139
this.label,
140140
this.start.elapsed().as_secs_f64(),
@@ -251,7 +251,7 @@ impl ExecutionPlan for ShuffleReaderExec {
251251
context: Arc<TaskContext>,
252252
) -> Result<SendableRecordBatchStream> {
253253
let task_id = context.task_id().unwrap_or_else(|| partition.to_string());
254-
info!(
254+
debug!(
255255
"ShuffleReaderExec::execute({task_id}) partition={partition}, num_locations={}, stage_id={}",
256256
self.partition.get(partition).map(|p| p.len()).unwrap_or(0),
257257
self.stage_id,
@@ -581,7 +581,7 @@ fn send_fetch_partitions(
581581
locations.object_store.len(),
582582
locations.remote.len()
583583
);
584-
info!(
584+
debug!(
585585
"send_fetch_partitions: {} total locations (memory={}, local={}, object_store={}, remote={})",
586586
locations.memory.len()
587587
+ locations.local.len()
@@ -614,9 +614,9 @@ fn send_fetch_partitions(
614614
let local_locations = locations.local;
615615
let local_count = local_locations.len();
616616
spawned_tasks.push(SpawnedTask::spawn(async move {
617-
info!("fetch_local_task: STARTED, {local_count} local files to read");
617+
debug!("fetch_local_task: STARTED, {local_count} local files to read");
618618
for (i, p) in local_locations.into_iter().enumerate() {
619-
info!(
619+
debug!(
620620
"fetch_local[{}/{}]: reading {}/{}/{} from {}",
621621
i + 1,
622622
local_count,
@@ -636,7 +636,7 @@ fn send_fetch_partitions(
636636
)
637637
.await;
638638
let ok = r.is_ok();
639-
info!(
639+
debug!(
640640
"fetch_local[{}/{}]: {}/{}/{} completed in {:.3}s, ok={}",
641641
i + 1,
642642
local_count,
@@ -822,7 +822,7 @@ async fn fetch_partition_remote(
822822
let partition_id = &location.partition_id;
823823
let host = metadata.host.as_str();
824824
let port = metadata.port;
825-
info!(
825+
debug!(
826826
"fetch_partition_remote: fetching {}/{}/{} from {}:{}",
827827
partition_id.job_id, partition_id.stage_id, partition_id.partition_id, host, port
828828
);

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -464,11 +464,11 @@ impl ShuffleWriterExec {
464464
datafusion::physical_plan::displayable(plan.as_ref()).indent(true)
465465
);
466466
}
467-
info!(
467+
debug!(
468468
"ShuffleWriter {job_id}/{stage_id} partition {input_partition}: creating execution stream"
469469
);
470470
let mut stream = plan.execute(input_partition, context)?;
471-
info!(
471+
debug!(
472472
"ShuffleWriter {job_id}/{stage_id} partition {input_partition}: stream created in {:.2}s, starting write (memory={use_memory}, object_store={use_object_store})",
473473
now.elapsed().as_secs_f64()
474474
);
@@ -642,21 +642,21 @@ impl ShuffleWriterExec {
642642
let mut batch_count: u64 = 0;
643643
let mut total_rows: u64 = 0;
644644

645-
info!(
645+
debug!(
646646
"ShuffleWriter partition {input_partition}: entering write loop, about to poll stream for first batch"
647647
);
648648
while let Some(result) = stream.next().await {
649649
let input_batch = result?;
650650
batch_count += 1;
651651
total_rows += input_batch.num_rows() as u64;
652652
if batch_count == 1 {
653-
info!(
653+
debug!(
654654
"ShuffleWriter partition {input_partition}: received first batch ({} rows) after {:.2}s",
655655
input_batch.num_rows(),
656656
now.elapsed().as_secs_f64()
657657
);
658658
} else if batch_count % 100 == 0 {
659-
info!(
659+
debug!(
660660
"ShuffleWriter partition {input_partition}: processed {batch_count} batches ({total_rows} rows) in {:.2}s",
661661
now.elapsed().as_secs_f64()
662662
);

0 commit comments

Comments
 (0)