Skip to content

Commit c586a91

Browse files
Merge remote-tracking branch 'origin/main' into bianchi/sketch-support
# Conflicts: # quickwit/quickwit-datafusion/src/sources/metrics/mod.rs
2 parents 319ecde + 697ff0e commit c586a91

35 files changed

Lines changed: 4291 additions & 154 deletions

File tree

quickwit/Cargo.lock

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ quickwit-serve = { path = "quickwit-serve" }
386386
quickwit-storage = { path = "quickwit-storage" }
387387
quickwit-telemetry = { path = "quickwit-telemetry" }
388388

389-
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "2e16243", default-features = false, features = [
389+
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "ca139d8", default-features = false, features = [
390390
"lz4-compression",
391391
"mmap",
392392
"quickwit",

quickwit/quickwit-config/src/storage_config.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,10 @@ pub struct S3StorageConfig {
331331
pub disable_multipart_upload: bool,
332332
#[serde(default)]
333333
pub disable_checksums: bool,
334+
#[serde(default)]
335+
pub disable_stalled_stream_protection_upload: bool,
336+
#[serde(default)]
337+
pub disable_stalled_stream_protection_download: bool,
334338
}
335339

336340
impl S3StorageConfig {
@@ -399,6 +403,16 @@ impl fmt::Debug for S3StorageConfig {
399403
"disable_multi_object_delete",
400404
&self.disable_multi_object_delete,
401405
)
406+
.field("disable_multipart_upload", &self.disable_multipart_upload)
407+
.field("disable_checksums", &self.disable_checksums)
408+
.field(
409+
"disable_stalled_stream_protection_upload",
410+
&self.disable_stalled_stream_protection_upload,
411+
)
412+
.field(
413+
"disable_stalled_stream_protection_download",
414+
&self.disable_stalled_stream_protection_download,
415+
)
402416
.finish()
403417
}
404418
}
@@ -634,6 +648,8 @@ mod tests {
634648
disable_multi_object_delete_requests: true
635649
disable_multipart_upload: true
636650
disable_checksums: true
651+
disable_stalled_stream_protection_upload: true
652+
disable_stalled_stream_protection_download: true
637653
"#;
638654
let s3_storage_config: S3StorageConfig =
639655
serde_yaml::from_str(s3_storage_config_yaml).unwrap();
@@ -645,6 +661,8 @@ mod tests {
645661
disable_multi_object_delete: true,
646662
disable_multipart_upload: true,
647663
disable_checksums: true,
664+
disable_stalled_stream_protection_upload: true,
665+
disable_stalled_stream_protection_download: true,
648666
..Default::default()
649667
};
650668
assert_eq!(s3_storage_config, expected_s3_config);

quickwit/quickwit-datafusion/src/sources/metrics/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
pub(crate) mod factory;
2424
pub(crate) mod index_resolver;
2525
pub(crate) mod metastore_provider;
26+
pub(crate) mod optimizer;
2627
pub(crate) mod predicate;
2728
pub(crate) mod sketch_udf;
2829
pub(crate) mod table_provider;
@@ -48,6 +49,7 @@ use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient};
4849

4950
use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory, SKETCHES_FILE_TYPE};
5051
use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver};
52+
use self::optimizer::SortedSeriesStreamingAggregateRule;
5153
use self::sketch_udf::{create_dd_quantile_udf, create_dd_sketch_udaf};
5254
use self::table_provider::MetricsTableProvider;
5355

@@ -256,6 +258,14 @@ impl QuickwitRuntimePlugin for MetricsDataSource {
256258
ParquetSplitKind::Sketches,
257259
));
258260
QuickwitRuntimeRegistration::default()
261+
.with_session_config_setter(|config| {
262+
config
263+
.options_mut()
264+
.optimizer
265+
.enable_round_robin_repartition = false;
266+
config.options_mut().optimizer.repartition_file_scans = false;
267+
})
268+
.with_physical_optimizer_rule(Arc::new(SortedSeriesStreamingAggregateRule))
259269
.with_table_factory(METRICS_FILE_TYPE, factory)
260270
.with_table_factory(SKETCHES_FILE_TYPE, sketches_factory)
261271
.with_udaf(Arc::new(create_dd_sketch_udaf()))
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//! Physical rewrites for sorted-series metrics rollups.
16+
17+
use std::sync::Arc;
18+
19+
use datafusion::common::tree_node::{Transformed, TreeNode};
20+
use datafusion::config::ConfigOptions;
21+
use datafusion::error::Result as DFResult;
22+
use datafusion::physical_expr::expressions::Column;
23+
use datafusion::physical_expr::{LexOrdering, Partitioning};
24+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
25+
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode};
26+
use datafusion::physical_plan::repartition::RepartitionExec;
27+
use datafusion::physical_plan::sorts::sort::SortExec;
28+
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
29+
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
30+
use quickwit_parquet_engine::sorted_series::SORTED_SERIES_COLUMN;
31+
32+
/// Replaces the inner sorted-series hash repartition in rollup plans with a
33+
/// sort-preserving merge into a single final aggregate.
34+
///
35+
/// This keeps worker/file-local partial aggregation parallel, then lets the
36+
/// coordinator stitch ordered per-series partial rows without hash-shuffling
37+
/// those partials by `sorted_series`.
38+
#[derive(Debug, Default)]
39+
pub struct SortedSeriesStreamingAggregateRule;
40+
41+
impl PhysicalOptimizerRule for SortedSeriesStreamingAggregateRule {
42+
fn optimize(
43+
&self,
44+
plan: Arc<dyn ExecutionPlan>,
45+
_config: &ConfigOptions,
46+
) -> DFResult<Arc<dyn ExecutionPlan>> {
47+
let transformed = plan.transform_up(|plan| {
48+
if let Some(rewritten) = rewrite_sorted_series_final_aggregate(&plan)? {
49+
Ok(Transformed::yes(rewritten))
50+
} else {
51+
Ok(Transformed::no(plan))
52+
}
53+
})?;
54+
Ok(transformed.data)
55+
}
56+
57+
fn name(&self) -> &str {
58+
"sorted_series_streaming_aggregate"
59+
}
60+
61+
fn schema_check(&self) -> bool {
62+
true
63+
}
64+
}
65+
66+
fn rewrite_sorted_series_final_aggregate(
67+
plan: &Arc<dyn ExecutionPlan>,
68+
) -> DFResult<Option<Arc<dyn ExecutionPlan>>> {
69+
let Some(final_agg) = plan.as_any().downcast_ref::<AggregateExec>() else {
70+
return Ok(None);
71+
};
72+
if final_agg.mode() != &AggregateMode::FinalPartitioned
73+
|| !aggregate_groups_on_sorted_series(final_agg)
74+
{
75+
return Ok(None);
76+
}
77+
78+
let Some(sort) = final_agg.input().as_any().downcast_ref::<SortExec>() else {
79+
return Ok(None);
80+
};
81+
if !sort.preserve_partitioning()
82+
|| sort.fetch().is_some()
83+
|| !ordering_starts_with_sorted_series(sort.expr())
84+
{
85+
return Ok(None);
86+
}
87+
88+
let Some(repartition) = sort.input().as_any().downcast_ref::<RepartitionExec>() else {
89+
return Ok(None);
90+
};
91+
if !hash_partitioning_contains_sorted_series(repartition.partitioning()) {
92+
return Ok(None);
93+
}
94+
95+
let ordering = sort.expr().clone();
96+
let repartition_input = Arc::clone(repartition.input());
97+
let partition_sort: Arc<dyn ExecutionPlan> = if repartition_input
98+
.equivalence_properties()
99+
.ordering_satisfy(ordering.clone())?
100+
{
101+
repartition_input
102+
} else {
103+
Arc::new(
104+
SortExec::new(ordering.clone(), repartition_input).with_preserve_partitioning(true),
105+
)
106+
};
107+
let merged: Arc<dyn ExecutionPlan> =
108+
Arc::new(SortPreservingMergeExec::new(ordering, partition_sort));
109+
110+
let rewritten = AggregateExec::try_new(
111+
AggregateMode::Final,
112+
final_agg.group_expr().clone(),
113+
final_agg.aggr_expr().to_vec(),
114+
final_agg.filter_expr().to_vec(),
115+
merged,
116+
final_agg.input_schema(),
117+
)?
118+
.with_limit_options(final_agg.limit_options());
119+
120+
Ok(Some(Arc::new(rewritten)))
121+
}
122+
123+
fn aggregate_groups_on_sorted_series(aggregate: &AggregateExec) -> bool {
124+
aggregate
125+
.group_expr()
126+
.expr()
127+
.iter()
128+
.any(|(expr, alias)| alias == SORTED_SERIES_COLUMN || is_sorted_series_column(expr))
129+
}
130+
131+
fn hash_partitioning_contains_sorted_series(partitioning: &Partitioning) -> bool {
132+
let Partitioning::Hash(exprs, _) = partitioning else {
133+
return false;
134+
};
135+
exprs.iter().any(is_sorted_series_column)
136+
}
137+
138+
fn ordering_starts_with_sorted_series(ordering: &LexOrdering) -> bool {
139+
is_sorted_series_column(&ordering.first().expr)
140+
}
141+
142+
fn is_sorted_series_column(expr: &Arc<dyn datafusion::physical_expr::PhysicalExpr>) -> bool {
143+
match expr.as_any().downcast_ref::<Column>() {
144+
Some(column) => column.name() == SORTED_SERIES_COLUMN,
145+
None => false,
146+
}
147+
}

0 commit comments

Comments
 (0)