Skip to content

Commit 2257c03

Browse files
committed
wip
1 parent efa0e46 commit 2257c03

File tree

28 files changed

+213
-69
lines changed

28 files changed

+213
-69
lines changed

src/query/ee/src/storages/fuse/operations/vacuum_table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ pub async fn get_snapshot_referenced_files(
9797
format_version: ver,
9898
snapshot_id: root_snapshot.snapshot_id,
9999
timestamp: root_snapshot.timestamp,
100-
segments: HashSet::from_iter(root_snapshot.segments.clone()),
100+
segments: HashSet::from_iter(root_snapshot.segments.iter().map(|v| v.location.clone())),
101101
table_statistics_location: root_snapshot.table_statistics_location.clone(),
102102
});
103103
drop(root_snapshot);

src/query/ee/src/storages/fuse/operations/virtual_columns.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,11 @@ pub async fn do_refresh_virtual_column(
9999
let segment_locs = if let Some(segment_locs) = segment_locs {
100100
segment_locs
101101
} else {
102-
snapshot.segments.clone()
102+
snapshot
103+
.segments
104+
.iter()
105+
.map(|v| v.location.clone())
106+
.collect()
103107
};
104108

105109
// Read source variant columns and extract inner fields as virtual columns.

src/query/service/src/interpreters/interpreter_merge_into.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ impl MergeIntoInterpreter {
441441
.segments
442442
.clone()
443443
.into_iter()
444+
.map(|v| v.location)
444445
.enumerate()
445446
.collect();
446447

src/query/service/src/interpreters/interpreter_replace.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ impl ReplaceInterpreter {
308308
.segments
309309
.clone()
310310
.into_iter()
311+
.map(|v| v.location)
311312
.enumerate()
312313
.collect(),
313314
block_slots: None,

src/query/service/src/test_kits/fuse.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub async fn generate_snapshot_with_segments(
6767
let operator = fuse_table.get_operator();
6868
let location_gen = fuse_table.meta_location_generator();
6969
let mut new_snapshot = TableSnapshot::from_previous(current_snapshot.as_ref());
70-
new_snapshot.segments = segment_locations;
70+
new_snapshot.segments = segment_locations.into_iter().map(|v| v.into()).collect();
7171
let new_snapshot_location = location_gen
7272
.snapshot_location_from_uuid(&new_snapshot.snapshot_id, TableSnapshot::VERSION)?;
7373
if let Some(ts) = time_stamp {
@@ -199,6 +199,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
199199

200200
// create snapshot 1, the format version is 3.
201201
let locations = vec![segments_v3[0].0.clone(), segments_v2[0].0.clone()];
202+
let locations = locations.into_iter().map(|v| v.into()).collect();
202203
let mut snapshot_1 = TableSnapshot::new(
203204
Uuid::new_v4(),
204205
&snapshot_0.timestamp,
@@ -224,7 +225,7 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
224225
segments_v2[0].0.clone(),
225226
];
226227
let mut snapshot_2 = TableSnapshot::from_previous(&snapshot_1);
227-
snapshot_2.segments = locations;
228+
snapshot_2.segments = locations.into_iter().map(|v| v.into()).collect();
228229
snapshot_2.timestamp = Some(now);
229230
snapshot_2.summary = merge_statistics(&snapshot_1.summary, &segments_v3[1].1.summary, None);
230231
let new_snapshot_location = location_gen

src/query/storages/common/table_meta/src/meta/current/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ pub use v2::ColumnMeta;
2020
pub use v2::ColumnStatistics;
2121
pub use v2::Statistics;
2222
pub use v4::CompactSegmentInfo;
23+
pub use v4::SegmentDescriptor;
2324
pub use v4::SegmentInfo;
25+
pub use v4::SegmentSummary;
2426
pub use v4::TableSnapshot;
2527
pub use v4::TableSnapshotLite;
2628

src/query/storages/common/table_meta/src/meta/v4/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,7 @@ mod snapshot;
1717

1818
pub use segment::CompactSegmentInfo;
1919
pub use segment::SegmentInfo;
20+
pub use snapshot::SegmentDescriptor;
21+
pub use snapshot::SegmentSummary;
2022
pub use snapshot::TableSnapshot;
2123
pub use snapshot::TableSnapshotLite;

src/query/storages/common/table_meta/src/meta/v4/snapshot.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,21 +78,42 @@ pub struct TableSnapshot {
7878
///
7979
/// We rely on background merge tasks to keep merging segments, so that
8080
/// this the size of this vector could be kept reasonable
81-
pub segments: Vec<Location>,
81+
pub segments: Vec<SegmentDescriptor>,
8282

8383
// The metadata of the cluster keys.
8484
pub cluster_key_meta: Option<ClusterKey>,
8585
pub table_statistics_location: Option<String>,
8686
}
8787

88+
// TODO remove PartialEq
89+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
90+
pub struct SegmentSummary {
91+
pub row_count: u64,
92+
pub block_count: u64,
93+
}
94+
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
95+
pub struct SegmentDescriptor {
96+
pub location: Location,
97+
pub summary: Option<SegmentSummary>,
98+
}
99+
100+
impl From<Location> for SegmentDescriptor {
101+
fn from(location: Location) -> Self {
102+
SegmentDescriptor {
103+
location,
104+
summary: None,
105+
}
106+
}
107+
}
108+
88109
impl TableSnapshot {
89110
pub fn new(
90111
snapshot_id: SnapshotId,
91112
prev_timestamp: &Option<DateTime<Utc>>,
92113
prev_snapshot_id: Option<(SnapshotId, FormatVersion)>,
93114
schema: TableSchema,
94115
summary: Statistics,
95-
segments: Vec<Location>,
116+
segments: Vec<SegmentDescriptor>,
96117
cluster_key_meta: Option<ClusterKey>,
97118
table_statistics_location: Option<String>,
98119
) -> Self {
@@ -218,7 +239,7 @@ impl From<v2::TableSnapshot> for TableSnapshot {
218239
prev_snapshot_id: s.prev_snapshot_id,
219240
schema: s.schema,
220241
summary: s.summary,
221-
segments: s.segments,
242+
segments: s.segments.into_iter().map(|v| v.into()).collect(),
222243
cluster_key_meta: s.cluster_key_meta,
223244
table_statistics_location: s.table_statistics_location,
224245
}
@@ -239,7 +260,7 @@ where T: Into<v3::TableSnapshot>
239260
prev_snapshot_id: s.prev_snapshot_id,
240261
schema: s.schema.into(),
241262
summary: s.summary.into(),
242-
segments: s.segments,
263+
segments: s.segments.into_iter().map(|v| v.into()).collect(),
243264
cluster_key_meta: s.cluster_key_meta,
244265
table_statistics_location: s.table_statistics_location,
245266
}

src/query/storages/fuse/src/io/snapshots.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,10 +267,10 @@ impl SnapshotsIO {
267267
let mut segments = HashSet::new();
268268
// collects extended segments.
269269
for segment_location in &snapshot.segments {
270-
if root_snapshot.segments.contains(segment_location) {
270+
if root_snapshot.segments.contains(&segment_location.location) {
271271
continue;
272272
}
273-
segments.insert(segment_location.clone());
273+
segments.insert(segment_location.location.clone());
274274
}
275275
let table_statistics_location =
276276
if snapshot.table_statistics_location != root_snapshot.table_statistics_location {

src/query/storages/fuse/src/operations/analyze.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,9 @@ impl FuseTable {
7272
blocks_cluster_stats.push(cluster_stats.clone());
7373
}
7474

75+
let locations = chunk.iter().map(|v| v.location.clone()).collect::<Vec<_>>();
7576
let segments = segments_io
76-
.read_segments::<SegmentInfo>(chunk, true)
77+
.read_segments::<SegmentInfo>(&locations, true)
7778
.await?;
7879
for segment in segments {
7980
let segment = segment?;

src/query/storages/fuse/src/operations/commit.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransforme
3737
use databend_common_sql::executor::physical_plans::MutationKind;
3838
use databend_storages_common_cache::CacheAccessor;
3939
use databend_storages_common_cache_manager::CachedObject;
40-
use databend_storages_common_table_meta::meta::Location;
40+
use databend_storages_common_table_meta::meta::SegmentDescriptor;
4141
use databend_storages_common_table_meta::meta::SegmentInfo;
4242
use databend_storages_common_table_meta::meta::SnapshotId;
4343
use databend_storages_common_table_meta::meta::Statistics;
@@ -287,7 +287,7 @@ impl FuseTable {
287287
&self,
288288
ctx: &Arc<dyn TableContext>,
289289
base_snapshot: Arc<TableSnapshot>,
290-
base_segments: &[Location],
290+
base_segments: &[SegmentDescriptor],
291291
base_summary: Statistics,
292292
abort_operation: AbortOperation,
293293
max_retry_elapsed: Option<Duration>,
@@ -303,7 +303,7 @@ impl FuseTable {
303303
let mut latest_table_ref: Arc<dyn Table>;
304304

305305
// potentially concurrently appended segments, init it to empty
306-
let mut concurrently_appended_segment_locations: &[Location] = &[];
306+
let mut concurrently_appended_segment_locations: &[SegmentDescriptor] = &[];
307307

308308
// Status
309309
ctx.set_status_info("mutation: begin try to commit");
@@ -422,12 +422,12 @@ impl FuseTable {
422422
async fn merge_with_base(
423423
ctx: Arc<dyn TableContext>,
424424
operator: Operator,
425-
base_segments: &[Location],
425+
base_segments: &[SegmentDescriptor],
426426
base_summary: &Statistics,
427-
concurrently_appended_segment_locations: &[Location],
427+
concurrently_appended_segment_locations: &[SegmentDescriptor],
428428
schema: TableSchemaRef,
429429
default_cluster_key_id: Option<u32>,
430-
) -> Result<(Vec<Location>, Statistics)> {
430+
) -> Result<(Vec<SegmentDescriptor>, Statistics)> {
431431
if concurrently_appended_segment_locations.is_empty() {
432432
Ok((base_segments.to_owned(), base_summary.clone()))
433433
} else {
@@ -439,8 +439,13 @@ impl FuseTable {
439439
.collect();
440440

441441
let fuse_segment_io = SegmentsIO::create(ctx, operator, schema);
442+
// TODO
443+
let locations = concurrently_appended_segment_locations
444+
.iter()
445+
.map(|v| v.location.clone())
446+
.collect::<Vec<_>>();
442447
let concurrent_appended_segment_infos = fuse_segment_io
443-
.read_segments::<SegmentInfo>(concurrently_appended_segment_locations, true)
448+
.read_segments::<SegmentInfo>(&locations, true)
444449
.await?;
445450

446451
let mut new_statistics = base_summary.clone();

src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ use databend_common_expression::TableSchemaRef;
2929
use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransform;
3030
use databend_common_sql::executor::physical_plans::MutationKind;
3131
use databend_storages_common_table_meta::meta::BlockMeta;
32-
use databend_storages_common_table_meta::meta::Location;
32+
use databend_storages_common_table_meta::meta::SegmentDescriptor;
3333
use databend_storages_common_table_meta::meta::SegmentInfo;
34+
use databend_storages_common_table_meta::meta::SegmentSummary;
3435
use databend_storages_common_table_meta::meta::Statistics;
3536
use databend_storages_common_table_meta::meta::Versioned;
3637
use itertools::Itertools;
@@ -62,10 +63,10 @@ pub struct TableMutationAggregator {
6263
location_gen: TableMetaLocationGenerator,
6364
thresholds: BlockThresholds,
6465
default_cluster_key_id: Option<u32>,
65-
base_segments: Vec<Location>,
66+
base_segments: Vec<SegmentDescriptor>,
6667

6768
mutations: HashMap<SegmentIndex, BlockMutations>,
68-
appended_segments: Vec<Location>,
69+
appended_segments: Vec<SegmentDescriptor>,
6970
appended_statistics: Statistics,
7071
removed_segment_indexes: Vec<SegmentIndex>,
7172
removed_statistics: Statistics,
@@ -105,7 +106,7 @@ impl TableMutationAggregator {
105106
pub fn new(
106107
table: &FuseTable,
107108
ctx: Arc<dyn TableContext>,
108-
base_segments: Vec<Location>,
109+
base_segments: Vec<SegmentDescriptor>,
109110
kind: MutationKind,
110111
) -> Self {
111112
TableMutationAggregator {
@@ -184,8 +185,16 @@ impl TableMutationAggregator {
184185
self.default_cluster_key_id,
185186
);
186187

188+
let desc = SegmentDescriptor {
189+
location: (segment_location, format_version),
190+
summary: Some(SegmentSummary {
191+
row_count: summary.row_count,
192+
block_count: summary.block_count,
193+
}),
194+
};
187195
self.appended_segments
188-
.push((segment_location, format_version))
196+
//.push((segment_location, format_version))
197+
.push(desc)
189198
}
190199
MutationLogEntry::CompactExtras { extras } => {
191200
match self.mutations.entry(extras.segment_index) {
@@ -241,8 +250,14 @@ impl TableMutationAggregator {
241250
&summary,
242251
self.default_cluster_key_id,
243252
);
244-
replaced_segments
245-
.insert(result.index, (location, SegmentInfo::VERSION));
253+
let desc = SegmentDescriptor {
254+
location: (location, SegmentInfo::VERSION),
255+
summary: Some(SegmentSummary {
256+
row_count: summary.row_count,
257+
block_count: summary.block_count,
258+
}),
259+
};
260+
replaced_segments.insert(result.index, desc);
246261
} else {
247262
self.removed_segment_indexes.push(result.index);
248263
}
@@ -311,7 +326,7 @@ impl TableMutationAggregator {
311326
let (new_blocks, origin_summary) = if let Some(loc) = location {
312327
// read the old segment
313328
let compact_segment_info =
314-
SegmentsIO::read_compact_segment(op.clone(), loc, schema, false).await?;
329+
SegmentsIO::read_compact_segment(op.clone(), loc.location, schema, false).await?;
315330
let mut segment_info = SegmentInfo::try_from(compact_segment_info)?;
316331

317332
// take away the blocks, they are being mutated

0 commit comments

Comments
 (0)