Skip to content

Commit 9ac92e5

Browse files
committed
feat: remove primary key collection
1 parent e03ee25 commit 9ac92e5

5 files changed

Lines changed: 20 additions & 139 deletions

File tree

src/mito2/src/compaction/compactor.rs

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use std::collections::HashSet;
1615
use std::num::NonZero;
1716
use std::sync::Arc;
1817
use std::time::Duration;
@@ -39,12 +38,10 @@ use crate::cache::{CacheManager, CacheManagerRef};
3938
use crate::compaction::picker::PickerOutput;
4039
use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
4140
use crate::config::MitoConfig;
42-
use crate::engine::flush_hook::FlushHookRef;
4341
use crate::error;
4442
use crate::error::{
4543
EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
4644
};
47-
use crate::flush::{SharedPrimaryKeys, wrap_with_pk_collector};
4845
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
4946
use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
5047
use crate::region::options::RegionOptions;
@@ -264,8 +261,6 @@ pub struct MergeOutput {
264261
pub compaction_time_window: Option<i64>,
265262
#[serde(skip)]
266263
pub sst_infos: Vec<SstInfo>,
267-
#[serde(skip)]
268-
pub primary_keys: Vec<Vec<u8>>,
269264
}
270265

271266
impl MergeOutput {
@@ -311,7 +306,7 @@ pub trait SstMerger: Send + Sync + 'static {
311306
compaction_region: CompactionRegion,
312307
output: CompactionOutput,
313308
write_opts: WriteOptions,
314-
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)>;
309+
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)>;
315310
}
316311

317312
/// The production [`SstMerger`] that reads, merges, and writes SST files.
@@ -325,7 +320,7 @@ impl SstMerger for DefaultSstMerger {
325320
compaction_region: CompactionRegion,
326321
output: CompactionOutput,
327322
write_opts: WriteOptions,
328-
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)> {
323+
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
329324
let region_id = compaction_region.region_id;
330325
let storage = compaction_region.region_options.storage.clone();
331326
let index_options = compaction_region
@@ -371,19 +366,6 @@ impl SstMerger for DefaultSstMerger {
371366
};
372367
let source = builder.build_flat_sst_reader().await?;
373368

374-
let hook: Option<FlushHookRef> = compaction_region.plugins.get();
375-
let pk_collector: Option<SharedPrimaryKeys> = hook
376-
.as_ref()
377-
.map(|_| Arc::new(std::sync::Mutex::new(HashSet::new())));
378-
let source = if let Some(collector) = &pk_collector {
379-
crate::read::FlatSource::new_iter(
380-
source.schema().clone(),
381-
wrap_with_pk_collector(source.take_iter(), &Some(collector.clone())),
382-
)
383-
} else {
384-
source
385-
};
386-
387369
let mut metrics = Metrics::new(WriteType::Compaction);
388370
let region_metadata = compaction_region.region_metadata.clone();
389371
let sst_infos = compaction_region
@@ -463,10 +445,7 @@ impl SstMerger for DefaultSstMerger {
463445
region_id, input_file_names, output_file_names, flat_format, metrics
464446
);
465447
metrics.observe();
466-
let primary_keys: Vec<Vec<u8>> = pk_collector
467-
.map(|c| c.lock().unwrap().drain().collect())
468-
.unwrap_or_default();
469-
Ok((output_files, sst_infos.into_iter().collect(), primary_keys))
448+
Ok((output_files, sst_infos.into_iter().collect()))
470449
}
471450
}
472451

@@ -536,7 +515,6 @@ where
536515

537516
let mut output_files = Vec::with_capacity(tasks.len());
538517
let mut all_sst_infos: Vec<SstInfo> = Vec::new();
539-
let mut all_primary_keys: HashSet<Vec<u8>> = HashSet::new();
540518
let mut compacted_inputs = Vec::with_capacity(
541519
tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
542520
+ picker_output.expired_ssts.len(),
@@ -560,10 +538,9 @@ where
560538
while let Some((inputs, handle)) = spawned.pop() {
561539
let abort_handle = handle.abort_handle();
562540
match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
563-
Ok(Ok(Ok((files, infos, pks)))) => {
541+
Ok(Ok(Ok((files, infos)))) => {
564542
output_files.extend(files);
565543
all_sst_infos.extend(infos);
566-
all_primary_keys.extend(pks);
567544
compacted_inputs.extend(inputs);
568545
}
569546
Ok(Ok(Err(e))) => {
@@ -624,7 +601,6 @@ where
624601
files_to_remove: compacted_inputs,
625602
compaction_time_window: Some(compaction_time_window),
626603
sst_infos: all_sst_infos,
627-
primary_keys: all_primary_keys.into_iter().collect(),
628604
})
629605
}
630606

@@ -742,10 +718,10 @@ mod tests {
742718
_compaction_region: CompactionRegion,
743719
_output: CompactionOutput,
744720
_write_opts: WriteOptions,
745-
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)> {
721+
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
746722
let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
747723
match self.results.lock().unwrap().get(idx) {
748-
Some(Ok(files)) => Ok((files.clone(), Vec::new(), Vec::new())),
724+
Some(Ok(files)) => Ok((files.clone(), Vec::new())),
749725
Some(Err(_)) => error::InvalidMetaSnafu {
750726
reason: format!("simulated failure at index {idx}"),
751727
}
@@ -914,7 +890,7 @@ mod tests {
914890
_compaction_region: CompactionRegion,
915891
_output: CompactionOutput,
916892
_write_opts: WriteOptions,
917-
) -> Result<(Vec<FileMeta>, Vec<SstInfo>, Vec<Vec<u8>>)> {
893+
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
918894
self.call_idx.fetch_add(1, Ordering::SeqCst);
919895
std::future::pending().await
920896
}

src/mito2/src/compaction/task.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,6 @@ impl CompactionTaskImpl {
300300
self.compaction_region.region_id,
301301
&self.compaction_region.region_metadata,
302302
&files,
303-
&merge_output.primary_keys,
304303
)
305304
.await;
306305
}

src/mito2/src/engine/flush_hook.rs

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -41,35 +41,19 @@ pub struct SstFileInfo<'a> {
4141
///
4242
/// plugins.insert(Arc::new(MyHook) as FlushHookRef);
4343
/// ```
44-
///
45-
/// To decode primary keys into tag name-value pairs, use:
46-
/// ```ignore
47-
/// use mito_codec::row_converter::build_primary_key_codec;
48-
///
49-
/// let codec = build_primary_key_codec(region_metadata);
50-
/// for pk_bytes in primary_keys {
51-
/// let decoded = codec.decode(pk_bytes)?;
52-
/// // Dense: Vec<(ColumnId, Value)>
53-
/// // Sparse: SparseValues with column_id -> value mapping
54-
/// }
55-
/// ```
5644
#[async_trait]
5745
pub trait FlushHook: Send + Sync {
5846
/// Called after SST files are written during flush.
5947
///
6048
/// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written.
61-
/// - `primary_keys`: all unique primary keys (encoded bytes) across all files
62-
/// in this flush. Decode with `build_primary_key_codec(region_metadata)`.
63-
/// - `region_metadata`: provides the schema to decode primary keys into
64-
/// tag/label name-value pairs.
49+
/// - `region_metadata`: provides the schema for column type information.
6550
async fn on_sst_files_written(
6651
&self,
6752
region_id: RegionId,
6853
region_metadata: &RegionMetadataRef,
6954
files: &[SstFileInfo<'_>],
70-
primary_keys: &[Vec<u8>],
7155
) {
72-
let _ = (region_id, region_metadata, files, primary_keys);
56+
let _ = (region_id, region_metadata, files);
7357
}
7458

7559
/// Called after the region manifest is successfully updated.

src/mito2/src/engine/flush_test.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,6 @@ async fn test_update_topic_latest_entry_id(factory: Option<LogStoreFactory>) {
671671
struct MockFlushHook {
672672
sst_written_count: AtomicUsize,
673673
manifest_updated_count: AtomicUsize,
674-
primary_keys_count: AtomicUsize,
675674
notify: Notify,
676675
}
677676

@@ -680,7 +679,6 @@ impl MockFlushHook {
680679
Self {
681680
sst_written_count: AtomicUsize::new(0),
682681
manifest_updated_count: AtomicUsize::new(0),
683-
primary_keys_count: AtomicUsize::new(0),
684682
notify: Notify::new(),
685683
}
686684
}
@@ -697,17 +695,13 @@ impl FlushHook for MockFlushHook {
697695
region_id: RegionId,
698696
_region_metadata: &RegionMetadataRef,
699697
files: &[SstFileInfo<'_>],
700-
primary_keys: &[Vec<u8>],
701698
) {
702699
self.sst_written_count
703700
.fetch_add(files.len(), Ordering::Relaxed);
704-
self.primary_keys_count
705-
.store(primary_keys.len(), Ordering::Relaxed);
706701
common_telemetry::info!(
707-
"MockFlushHook::on_sst_files_written: region={}, files={}, primary_keys={}",
702+
"MockFlushHook::on_sst_files_written: region={}, files={}",
708703
region_id,
709704
files.len(),
710-
primary_keys.len(),
711705
);
712706
for (i, file) in files.iter().enumerate() {
713707
common_telemetry::info!(
@@ -729,11 +723,10 @@ impl FlushHook for MockFlushHook {
729723
) {
730724
self.manifest_updated_count.fetch_add(1, Ordering::Relaxed);
731725
common_telemetry::info!(
732-
"MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}, primary_keys_collected={}",
726+
"MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}",
733727
region_id,
734728
manifest_version,
735729
edit.files_to_add.len(),
736-
self.primary_keys_count.load(Ordering::Relaxed),
737730
);
738731
self.notify.notify_one();
739732
}
@@ -778,7 +771,6 @@ async fn test_flush_hook() {
778771

779772
let sst_count = hook.sst_written_count.load(Ordering::Relaxed);
780773
let manifest_count = hook.manifest_updated_count.load(Ordering::Relaxed);
781-
let pk_count = hook.primary_keys_count.load(Ordering::Relaxed);
782774

783775
assert!(
784776
sst_count > 0,
@@ -788,15 +780,10 @@ async fn test_flush_hook() {
788780
manifest_count, 1,
789781
"Expected exactly 1 manifest update, got {manifest_count}"
790782
);
791-
assert!(
792-
pk_count >= 2,
793-
"Expected at least 2 unique primary keys (tags 'a' and 'b'), got {pk_count}"
794-
);
795783

796784
common_telemetry::info!(
797-
"test_flush_hook passed: sst_count={}, manifest_count={}, pk_count={}",
785+
"test_flush_hook passed: sst_count={}, manifest_count={}",
798786
sst_count,
799787
manifest_count,
800-
pk_count
801788
);
802789
}

0 commit comments

Comments
 (0)