Skip to content

Commit ee23c42

Browse files
committed
wip
1 parent e1d5a33 commit ee23c42

3 files changed

Lines changed: 77 additions & 9 deletions

File tree

ballista/core/src/execution_plans/shuffle_reader.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ impl ExecutionPlan for ShuffleReaderExec {
170170
context: Arc<TaskContext>,
171171
) -> Result<SendableRecordBatchStream> {
172172
let task_id = context.task_id().unwrap_or_else(|| partition.to_string());
173-
info!("ShuffleReaderExec::execute({task_id}) partition={partition}, num_locations={}, stage_id={}",
173+
info!(
174+
"ShuffleReaderExec::execute({task_id}) partition={partition}, num_locations={}, stage_id={}",
174175
self.partition.get(partition).map(|p| p.len()).unwrap_or(0),
175176
self.stage_id,
176177
);
@@ -496,7 +497,10 @@ fn send_fetch_partitions(
496497
);
497498
info!(
498499
"send_fetch_partitions: {} total locations (memory={}, local={}, object_store={}, remote={})",
499-
locations.memory.len() + locations.local.len() + locations.object_store.len() + locations.remote.len(),
500+
locations.memory.len()
501+
+ locations.local.len()
502+
+ locations.object_store.len()
503+
+ locations.remote.len(),
500504
locations.memory.len(),
501505
locations.local.len(),
502506
locations.object_store.len(),
@@ -522,8 +526,18 @@ fn send_fetch_partitions(
522526
let customize_endpoint_c = customize_endpoint.clone();
523527
let metrics_callback_c = metrics_callback.clone();
524528
let local_locations = locations.local;
529+
let local_count = local_locations.len();
525530
spawned_tasks.push(SpawnedTask::spawn(async move {
526-
for p in local_locations {
531+
for (i, p) in local_locations.into_iter().enumerate() {
532+
info!(
533+
"fetch_local[{}/{}]: reading {}/{}/{} from {}",
534+
i + 1,
535+
local_count,
536+
p.partition_id.job_id,
537+
p.partition_id.stage_id,
538+
p.partition_id.partition_id,
539+
p.path
540+
);
527541
let start_time = std::time::Instant::now();
528542
let r = PartitionReaderEnum::Local
529543
.fetch_partition(
@@ -534,6 +548,17 @@ fn send_fetch_partitions(
534548
use_tls,
535549
)
536550
.await;
551+
let ok = r.is_ok();
552+
info!(
553+
"fetch_local[{}/{}]: {}/{}/{} completed in {:.3}s, ok={}",
554+
i + 1,
555+
local_count,
556+
p.partition_id.job_id,
557+
p.partition_id.stage_id,
558+
p.partition_id.partition_id,
559+
start_time.elapsed().as_secs_f64(),
560+
ok
561+
);
537562

538563
// Record local read metrics if callback is set and read succeeded
539564
if r.is_ok()
@@ -712,8 +737,7 @@ async fn fetch_partition_remote(
712737
let port = metadata.port;
713738
info!(
714739
"fetch_partition_remote: fetching {}/{}/{} from {}:{}",
715-
partition_id.job_id, partition_id.stage_id, partition_id.partition_id,
716-
host, port
740+
partition_id.job_id, partition_id.stage_id, partition_id.partition_id, host, port
717741
);
718742
let mut ballista_client = BallistaClient::try_new(
719743
host,

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ use datafusion::arrow::error::ArrowError;
6868
use datafusion::execution::context::TaskContext;
6969
use datafusion::physical_plan::repartition::BatchPartitioner;
7070
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
71-
use log::{debug, info};
71+
use log::{debug, info, warn};
7272

7373
use super::shuffle_writer_trait::ShuffleWriter;
7474

@@ -348,7 +348,30 @@ impl ShuffleWriterExec {
348348
now.elapsed().as_secs_f64()
349349
);
350350

351-
if use_memory {
351+
// Watchdog: log periodically if no progress is made
352+
let watchdog_job_id = job_id.clone();
353+
let watchdog_flag = Arc::new(std::sync::atomic::AtomicBool::new(false));
354+
let watchdog_flag_clone = watchdog_flag.clone();
355+
let _watchdog = tokio::task::spawn(async move {
356+
let mut interval =
357+
tokio::time::interval(std::time::Duration::from_secs(30));
358+
interval.tick().await; // skip first immediate tick
359+
loop {
360+
interval.tick().await;
361+
if watchdog_flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
362+
break;
363+
}
364+
warn!(
365+
"ShuffleWriter {}/{} partition {}: STALLED - no first batch received after {:.0}s",
366+
watchdog_job_id,
367+
stage_id,
368+
input_partition,
369+
now.elapsed().as_secs_f64()
370+
);
371+
}
372+
});
373+
374+
let result = if use_memory {
352375
// Use in-memory shuffle storage with configurable format
353376
Self::execute_shuffle_write_memory(
354377
&job_id,
@@ -393,7 +416,9 @@ impl ShuffleWriterExec {
393416
file_ext,
394417
)
395418
.await
396-
}
419+
};
420+
watchdog_flag.store(true, std::sync::atomic::Ordering::Relaxed);
421+
result
397422
}
398423
}
399424

@@ -470,9 +495,25 @@ impl ShuffleWriterExec {
470495
)?;
471496

472497
let schema = stream.schema();
498+
let mut batch_count: u64 = 0;
499+
let mut total_rows: u64 = 0;
473500

474501
while let Some(result) = stream.next().await {
475502
let input_batch = result?;
503+
batch_count += 1;
504+
total_rows += input_batch.num_rows() as u64;
505+
if batch_count == 1 {
506+
info!(
507+
"ShuffleWriter partition {input_partition}: received first batch ({} rows) after {:.2}s",
508+
input_batch.num_rows(),
509+
now.elapsed().as_secs_f64()
510+
);
511+
} else if batch_count % 100 == 0 {
512+
info!(
513+
"ShuffleWriter partition {input_partition}: processed {batch_count} batches ({total_rows} rows) in {:.2}s",
514+
now.elapsed().as_secs_f64()
515+
);
516+
}
476517

477518
write_metrics.input_rows.add(input_batch.num_rows());
478519

ballista/executor/src/executor_server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskRunnerPool<T,
606606
let mut task_runner_shutdown = shutdown_noti.subscribe_for_shutdown();
607607
let task_runner_complete = shutdown_noti.shutdown_complete_tx.clone();
608608
tokio::spawn(async move {
609-
info!("Starting the task runner pool");
609+
info!(
610+
"Starting the task runner pool with {} threads",
611+
executor_server.executor.concurrent_tasks
612+
);
610613

611614
// Use a dedicated executor for CPU bound tasks so that the main tokio
612615
// executor can still answer requests even when under load

0 commit comments

Comments
 (0)