Skip to content

Commit 7939cad

Browse files
committed
fix exhchange
1 parent 45dc770 commit 7939cad

File tree

1 file changed

+55
-56
lines changed

1 file changed

+55
-56
lines changed

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

+55-56
Original file line numberDiff line numberDiff line change
@@ -761,16 +761,18 @@ impl QueryCoordinator {
761761
return Ok(fragment_coordinator.pipeline_build_res.unwrap());
762762
}
763763

764-
let exchange_params = fragment_coordinator.create_exchange_params(
765-
info,
766-
fragment_coordinator
767-
.pipeline_build_res
768-
.as_ref()
769-
.map(|x| x.exchange_injector.clone())
770-
.ok_or_else(|| {
771-
ErrorCode::Internal("Pipeline build result is none, It's a bug")
772-
})?,
773-
)?;
764+
let exchange_params = fragment_coordinator
765+
.create_exchange_params(
766+
info,
767+
fragment_coordinator
768+
.pipeline_build_res
769+
.as_ref()
770+
.map(|x| x.exchange_injector.clone())
771+
.ok_or_else(|| {
772+
ErrorCode::Internal("Pipeline build result is none, It's a bug")
773+
})?,
774+
)?
775+
.unwrap();
774776
let mut build_res = fragment_coordinator.pipeline_build_res.unwrap();
775777

776778
// Add exchange data transform.
@@ -840,13 +842,11 @@ impl QueryCoordinator {
840842
if let Some(mut build_res) = coordinator.pipeline_build_res.take() {
841843
build_res.set_max_threads(max_threads as usize);
842844

843-
if !build_res.main_pipeline.is_pulling_pipeline()? {
844-
return Err(ErrorCode::Internal("Logical error, It's a bug"));
845+
if let Some(params) = params {
846+
// Add exchange data publisher.
847+
ExchangeSink::via(&info.query_ctx, &params, &mut build_res.main_pipeline)?;
845848
}
846849

847-
// Add exchange data publisher.
848-
ExchangeSink::via(&info.query_ctx, &params, &mut build_res.main_pipeline)?;
849-
850850
if !build_res.main_pipeline.is_complete_pipeline()? {
851851
return Err(ErrorCode::Internal("Logical error, It's a bug"));
852852
}
@@ -924,48 +924,47 @@ impl FragmentCoordinator {
924924
&self,
925925
info: &QueryInfo,
926926
exchange_injector: Arc<dyn ExchangeInjector>,
927-
) -> Result<ExchangeParams> {
928-
if let Some(data_exchange) = &self.data_exchange {
929-
return match data_exchange {
930-
DataExchange::Merge(exchange) => {
931-
Ok(ExchangeParams::MergeExchange(MergeExchangeParams {
932-
exchange_injector: exchange_injector.clone(),
933-
schema: self.physical_plan.output_schema()?,
934-
fragment_id: self.fragment_id,
935-
query_id: info.query_id.to_string(),
936-
destination_id: exchange.destination_id.clone(),
937-
allow_adjust_parallelism: exchange.allow_adjust_parallelism,
938-
ignore_exchange: exchange.ignore_exchange,
939-
}))
940-
}
941-
DataExchange::Broadcast(exchange) => {
942-
Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
943-
exchange_injector: exchange_injector.clone(),
944-
schema: self.physical_plan.output_schema()?,
945-
fragment_id: self.fragment_id,
946-
query_id: info.query_id.to_string(),
947-
executor_id: info.current_executor.to_string(),
948-
destination_ids: exchange.destination_ids.to_owned(),
949-
shuffle_scatter: exchange_injector
950-
.flight_scatter(&info.query_ctx, data_exchange)?,
951-
}))
952-
}
953-
DataExchange::ShuffleDataExchange(exchange) => {
954-
Ok(ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
955-
exchange_injector: exchange_injector.clone(),
956-
schema: self.physical_plan.output_schema()?,
957-
fragment_id: self.fragment_id,
958-
query_id: info.query_id.to_string(),
959-
executor_id: info.current_executor.to_string(),
960-
destination_ids: exchange.destination_ids.to_owned(),
961-
shuffle_scatter: exchange_injector
962-
.flight_scatter(&info.query_ctx, data_exchange)?,
963-
}))
964-
}
965-
};
927+
) -> Result<Option<ExchangeParams>> {
928+
let Some(data_exchange) = &self.data_exchange else {
929+
return Ok(None);
930+
};
931+
match data_exchange {
932+
DataExchange::Merge(exchange) => {
933+
Ok(Some(ExchangeParams::MergeExchange(MergeExchangeParams {
934+
exchange_injector: exchange_injector.clone(),
935+
schema: self.physical_plan.output_schema()?,
936+
fragment_id: self.fragment_id,
937+
query_id: info.query_id.to_string(),
938+
destination_id: exchange.destination_id.clone(),
939+
allow_adjust_parallelism: exchange.allow_adjust_parallelism,
940+
ignore_exchange: exchange.ignore_exchange,
941+
})))
942+
}
943+
DataExchange::Broadcast(exchange) => Ok(Some(ExchangeParams::ShuffleExchange(
944+
ShuffleExchangeParams {
945+
exchange_injector: exchange_injector.clone(),
946+
schema: self.physical_plan.output_schema()?,
947+
fragment_id: self.fragment_id,
948+
query_id: info.query_id.to_string(),
949+
executor_id: info.current_executor.to_string(),
950+
destination_ids: exchange.destination_ids.to_owned(),
951+
shuffle_scatter: exchange_injector
952+
.flight_scatter(&info.query_ctx, data_exchange)?,
953+
},
954+
))),
955+
DataExchange::ShuffleDataExchange(exchange) => Ok(Some(
956+
ExchangeParams::ShuffleExchange(ShuffleExchangeParams {
957+
exchange_injector: exchange_injector.clone(),
958+
schema: self.physical_plan.output_schema()?,
959+
fragment_id: self.fragment_id,
960+
query_id: info.query_id.to_string(),
961+
executor_id: info.current_executor.to_string(),
962+
destination_ids: exchange.destination_ids.to_owned(),
963+
shuffle_scatter: exchange_injector
964+
.flight_scatter(&info.query_ctx, data_exchange)?,
965+
}),
966+
)),
966967
}
967-
968-
Err(ErrorCode::Internal("Cannot find data exchange."))
969968
}
970969

971970
pub fn prepare_pipeline(&mut self, ctx: Arc<QueryContext>) -> Result<()> {

0 commit comments

Comments
 (0)