Skip to content

Commit 1cc88fb

Browse files
authored
fix(query): data lost in new hash join caused by spill (#19415)
* fix(query): fail-close runtime filter on spill revert fix(query): disable runtime filter for correlated right semi/anti fixup change strategy add log fix: try fix lost data when new join spilled test: revert enable_experimental_new_join * fixup * fixup * fixup * refine log * revert reproduce ci
1 parent 29cb512 commit 1cc88fb

File tree

7 files changed

+79
-48
lines changed

7 files changed

+79
-48
lines changed

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/builder.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -256,16 +256,12 @@ pub fn build_runtime_filter_packet(
256256
is_spill_happened: bool,
257257
) -> Result<JoinRuntimeFilterPacket> {
258258
if is_spill_happened {
259-
return Ok(JoinRuntimeFilterPacket::disable_all(
260-
runtime_filter_desc,
261-
build_num_rows,
262-
));
259+
return Ok(JoinRuntimeFilterPacket::disable_all(build_num_rows));
263260
}
264261
if build_num_rows == 0 {
265-
return Ok(JoinRuntimeFilterPacket {
266-
packets: None,
267-
build_rows: build_num_rows,
268-
});
262+
return Ok(JoinRuntimeFilterPacket::complete_without_filters(
263+
build_num_rows,
264+
));
269265
}
270266
let mut runtime_filters = HashMap::new();
271267
for rf in runtime_filter_desc {
@@ -283,8 +279,8 @@ pub fn build_runtime_filter_packet(
283279
.build(rf)?,
284280
);
285281
}
286-
Ok(JoinRuntimeFilterPacket {
287-
packets: Some(runtime_filters),
288-
build_rows: build_num_rows,
289-
})
282+
Ok(JoinRuntimeFilterPacket::complete(
283+
runtime_filters,
284+
build_num_rows,
285+
))
290286
}

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/local_builder.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -223,17 +223,11 @@ impl RuntimeFilterLocalBuilder {
223223
let total_rows = self.total_rows;
224224

225225
if spill_happened {
226-
return Ok(JoinRuntimeFilterPacket::disable_all(
227-
&self.runtime_filters,
228-
total_rows,
229-
));
226+
return Ok(JoinRuntimeFilterPacket::disable_all(total_rows));
230227
}
231228

232229
if total_rows == 0 {
233-
return Ok(JoinRuntimeFilterPacket {
234-
packets: None,
235-
build_rows: 0,
236-
});
230+
return Ok(JoinRuntimeFilterPacket::complete_without_filters(0));
237231
}
238232

239233
let packets: Vec<_> = self
@@ -245,10 +239,10 @@ impl RuntimeFilterLocalBuilder {
245239
})
246240
.collect::<Result<_>>()?;
247241

248-
Ok(JoinRuntimeFilterPacket {
249-
packets: Some(packets.into_iter().collect()),
250-
build_rows: total_rows,
251-
})
242+
Ok(JoinRuntimeFilterPacket::complete(
243+
packets.into_iter().collect(),
244+
total_rows,
245+
))
252246
}
253247
}
254248

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/merge.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,22 @@ pub fn merge_join_runtime_filter_packets(
2929
packets
3030
);
3131
let total_build_rows: usize = packets.iter().map(|packet| packet.build_rows).sum();
32-
// Skip packets that `JoinRuntimeFilterPacket::packets` is `None`
32+
33+
// If any packet is incomplete (disable_all_due_to_spill), the merged result is also incomplete
34+
if packets.iter().any(|packet| packet.disable_all_due_to_spill) {
35+
return Ok(JoinRuntimeFilterPacket::disable_all(total_build_rows));
36+
}
37+
3338
let packets = packets
3439
.into_iter()
3540
.filter_map(|packet| packet.packets)
3641
.collect::<Vec<_>>();
3742

43+
// Skip packets that `JoinRuntimeFilterPacket::packets` is `None`
3844
if packets.is_empty() {
39-
return Ok(JoinRuntimeFilterPacket {
40-
packets: None,
41-
build_rows: total_build_rows,
42-
});
45+
return Ok(JoinRuntimeFilterPacket::complete_without_filters(
46+
total_build_rows,
47+
));
4348
}
4449

4550
let mut result = HashMap::new();
@@ -56,10 +61,14 @@ pub fn merge_join_runtime_filter_packets(
5661
"RUNTIME-FILTER: merge_join_runtime_filter_packets output: {:?}",
5762
result
5863
);
59-
Ok(JoinRuntimeFilterPacket {
60-
packets: Some(result),
61-
build_rows: total_build_rows,
62-
})
64+
65+
if result.is_empty() {
66+
return Ok(JoinRuntimeFilterPacket::complete_without_filters(
67+
total_build_rows,
68+
));
69+
}
70+
71+
Ok(JoinRuntimeFilterPacket::complete(result, total_build_rows))
6372
}
6473

6574
fn merge_inlist(

src/query/service/src/pipelines/processors/transforms/hash_join/runtime_filter/packet.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ use databend_common_expression::types::NumberColumn;
3131
use databend_common_expression::types::NumberColumnBuilder;
3232
use databend_common_expression::types::array::ArrayColumnBuilder;
3333

34-
use crate::pipelines::processors::transforms::RuntimeFilterDesc;
35-
3634
/// Represents a runtime filter that can be transmitted and merged.
3735
///
3836
/// # Fields
@@ -66,30 +64,41 @@ impl Debug for RuntimeFilterPacket {
6664
///
6765
/// # Fields
6866
///
69-
/// * `packets` - A map of runtime filter packets, keyed by their unique identifier `RuntimeFilterPacket::id`. When `packets` is `None`, it means that `build_num_rows` is zero.
67+
/// * `packets` - A map of runtime filter packets, keyed by their unique identifier `RuntimeFilterPacket::id`.
7068
/// * `build_rows` - Total number of rows used when building the runtime filters.
69+
/// * `disable_all_due_to_spill` - Indicates if this packet comes from a spilled build and should disable all runtime filters globally.
7170
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)]
7271
pub struct JoinRuntimeFilterPacket {
7372
#[serde(default)]
7473
pub packets: Option<HashMap<usize, RuntimeFilterPacket>>,
7574
#[serde(default)]
7675
pub build_rows: usize,
76+
#[serde(default)]
77+
pub disable_all_due_to_spill: bool,
7778
}
7879

7980
impl JoinRuntimeFilterPacket {
80-
pub fn disable_all(descs: &[RuntimeFilterDesc], build_rows: usize) -> Self {
81-
let mut packets = HashMap::new();
82-
for desc in descs {
83-
packets.insert(desc.id, RuntimeFilterPacket {
84-
id: desc.id,
85-
inlist: None,
86-
min_max: None,
87-
bloom: None,
88-
});
81+
pub fn complete_without_filters(build_rows: usize) -> Self {
82+
Self {
83+
packets: None,
84+
build_rows,
85+
disable_all_due_to_spill: false,
8986
}
87+
}
88+
89+
pub fn complete(packets: HashMap<usize, RuntimeFilterPacket>, build_rows: usize) -> Self {
9090
Self {
9191
packets: Some(packets),
9292
build_rows,
93+
disable_all_due_to_spill: false,
94+
}
95+
}
96+
97+
pub fn disable_all(build_rows: usize) -> Self {
98+
Self {
99+
packets: None,
100+
build_rows,
101+
disable_all_due_to_spill: true,
93102
}
94103
}
95104
}
@@ -108,6 +117,8 @@ struct FlightJoinRuntimeFilterPacket {
108117
pub build_rows: usize,
109118
#[serde(default)]
110119
pub packets: Option<HashMap<usize, FlightRuntimeFilterPacket>>,
120+
#[serde(default)]
121+
pub disable_all_due_to_spill: bool,
111122

112123
pub schema: DataSchemaRef,
113124
}
@@ -166,6 +177,7 @@ impl TryInto<DataBlock> for JoinRuntimeFilterPacket {
166177
data_block.add_meta(Some(Box::new(FlightJoinRuntimeFilterPacket {
167178
build_rows: self.build_rows,
168179
packets: join_flight_packets,
180+
disable_all_due_to_spill: self.disable_all_due_to_spill,
169181
schema,
170182
})))
171183
}
@@ -183,6 +195,7 @@ impl TryFrom<DataBlock> for JoinRuntimeFilterPacket {
183195
return Ok(JoinRuntimeFilterPacket {
184196
packets: None,
185197
build_rows: flight_join_rf.build_rows,
198+
disable_all_due_to_spill: flight_join_rf.disable_all_due_to_spill,
186199
});
187200
};
188201

@@ -219,6 +232,7 @@ impl TryFrom<DataBlock> for JoinRuntimeFilterPacket {
219232
return Ok(JoinRuntimeFilterPacket {
220233
packets: Some(flight_packets),
221234
build_rows: flight_join_rf.build_rows,
235+
disable_all_due_to_spill: flight_join_rf.disable_all_due_to_spill,
222236
});
223237
}
224238

src/query/service/src/pipelines/processors/transforms/new_hash_join/grace/grace_join.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_storages_parquet::ReadSettings;
3131

3232
use crate::pipelines::processors::HashJoinDesc;
3333
use crate::pipelines::processors::transforms::Join;
34+
use crate::pipelines::processors::transforms::JoinRuntimeFilterPacket;
3435
use crate::pipelines::processors::transforms::get_hashes;
3536
use crate::pipelines::processors::transforms::new_hash_join::grace::grace_memory::GraceMemoryJoin;
3637
use crate::pipelines::processors::transforms::new_hash_join::grace::grace_state::GraceHashJoinState;
@@ -68,6 +69,11 @@ unsafe impl<T: GraceMemoryJoin> Send for GraceHashJoin<T> {}
6869
unsafe impl<T: GraceMemoryJoin> Sync for GraceHashJoin<T> {}
6970

7071
impl<T: GraceMemoryJoin> Join for GraceHashJoin<T> {
72+
fn build_runtime_filter(&self) -> Result<JoinRuntimeFilterPacket> {
73+
// TODO: this is hacked to mark it as disabled, we may need look back latter
74+
Ok(JoinRuntimeFilterPacket::disable_all(0))
75+
}
76+
7177
fn add_block(&mut self, data: Option<DataBlock>) -> Result<()> {
7278
let ready_partitions = match data {
7379
None => self.finalize_build_data(),

src/query/service/src/pipelines/processors/transforms/new_hash_join/hybrid/hybrid_join.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,11 @@ impl Join for HybridHashJoin {
199199
}
200200

201201
fn final_build(&mut self) -> Result<Option<ProgressValues>> {
202-
// Only the state needs to be transitioned, as all data migration must have been completed in the previous stage.
202+
// A processor can reach final_build while still in Memory mode even if another processor
203+
// has already triggered spill. In that case we must still perform full transition work
204+
// to avoid leaving late-arrived chunks in memory state.
203205
if self.state.check_spilled() && matches!(self.mode, HybridJoinMode::Memory(_)) {
204-
let grace_join = self.create_grace_join()?;
205-
self.mode = HybridJoinMode::Grace(Box::new(grace_join));
206+
self.switch_to_grace_mode(true)?;
206207
}
207208

208209
match &mut self.mode {

src/query/service/src/pipelines/processors/transforms/new_hash_join/transform_hash_join.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_pipeline::core::OutputPort;
2828
use databend_common_pipeline::core::Processor;
2929
use databend_common_pipeline::core::ProcessorPtr;
3030
use databend_common_sql::ColumnSet;
31+
use log::info;
3132

3233
use crate::pipelines::processors::transforms::RuntimeFilterLocalBuilder;
3334
use crate::pipelines::processors::transforms::new_hash_join::join::Join;
@@ -227,7 +228,17 @@ impl Processor for TransformHashJoin {
227228
let before_wait = self.instant.elapsed();
228229

229230
if wait_res.is_leader() {
231+
let spilled = self.join.is_spill_happened();
230232
let packet = self.join.build_runtime_filter()?;
233+
if let Some(packets) = &packet.packets {
234+
info!(
235+
"spilled: {}, globalize runtime filter: total {}, disable_all_due_to_spill: {}",
236+
spilled,
237+
packets.len(),
238+
packet.disable_all_due_to_spill
239+
);
240+
};
241+
231242
self.rf_desc.globalization(packet).await?;
232243
}
233244

0 commit comments

Comments
 (0)