From dd454f7ebb228742637dbf72b2955d3be8fa79fa Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 13 May 2026 15:07:52 +0800 Subject: [PATCH 1/6] feat: introduce flush and compaction hook --- src/mito2/src/compaction.rs | 1 + src/mito2/src/compaction/compactor.rs | 55 ++++++++-- src/mito2/src/compaction/task.rs | 40 ++++++++ src/mito2/src/engine.rs | 1 + src/mito2/src/engine/flush_hook.rs | 86 ++++++++++++++++ src/mito2/src/engine/flush_test.rs | 142 +++++++++++++++++++++++++- src/mito2/src/flush.rs | 135 +++++++++++++++++++----- src/mito2/src/manifest/manager.rs | 2 +- src/mito2/src/read.rs | 3 +- src/mito2/src/test_util.rs | 31 +++++- src/mito2/src/worker.rs | 3 + src/mito2/src/worker/handle_flush.rs | 1 + 12 files changed, 459 insertions(+), 41 deletions(-) create mode 100644 src/mito2/src/engine/flush_hook.rs diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 5cf3c444a8a5..4c83064371bb 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -515,6 +515,7 @@ impl CompactionScheduler { file_purger: None, ttl: Some(ttl), max_parallelism, + plugins: self.plugins.clone(), }; let picker_output = { diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index d78b2cc8fe66..d498bc3a18bb 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::num::NonZero; use std::sync::Arc; use std::time::Duration; +use common_base::Plugins; use common_base::cancellation::{CancellableFuture, CancellationHandle}; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{debug, info, warn}; @@ -37,10 +39,12 @@ use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::PickerOutput; use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options}; use crate::config::MitoConfig; +use crate::engine::flush_hook::FlushHookRef; use crate::error; use crate::error::{ EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result, }; +use crate::flush::{SharedPrimaryKeys, wrap_with_pk_collector}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::options::RegionOptions; @@ -53,8 +57,8 @@ use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::region_dir_from_table_dir; -use crate::sst::parquet::WriteOptions; use crate::sst::parquet::metadata::extract_primary_key_range; +use crate::sst::parquet::{SstInfo, WriteOptions}; use crate::sst::version::{SstVersion, SstVersionRef}; /// Region version for compaction that does not hold memtables. @@ -106,6 +110,8 @@ pub struct CompactionRegion { /// The parallel is inside this compaction task, not across different compaction tasks. /// It can be different windows of the same compaction task or something like this. pub max_parallelism: usize, + + pub(crate) plugins: Plugins, } /// OpenCompactionRegionRequest represents the request to open a compaction region. @@ -230,6 +236,7 @@ pub async fn open_compaction_region( file_purger: Some(file_purger), ttl: Some(ttl), max_parallelism: req.max_parallelism, + plugins: Plugins::new(), }) } @@ -250,11 +257,15 @@ impl CompactionRegion { } /// `[MergeOutput]` represents the output of merging SST files. -#[derive(Default, Clone, Debug, Serialize, Deserialize)] +#[derive(Default, Debug, Serialize, Deserialize)] pub struct MergeOutput { pub files_to_add: Vec, pub files_to_remove: Vec, pub compaction_time_window: Option, + #[serde(skip)] + pub sst_infos: Vec, + #[serde(skip)] + pub primary_keys: Vec>, } impl MergeOutput { @@ -300,7 +311,7 @@ pub trait SstMerger: Send + Sync + 'static { compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, - ) -> Result>; + ) -> Result<(Vec, Vec, Vec>)>; } /// The production [`SstMerger`] that reads, merges, and writes SST files. @@ -314,7 +325,7 @@ impl SstMerger for DefaultSstMerger { compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, - ) -> Result> { + ) -> Result<(Vec, Vec, Vec>)> { let region_id = compaction_region.region_id; let storage = compaction_region.region_options.storage.clone(); let index_options = compaction_region @@ -359,6 +370,20 @@ impl SstMerger for DefaultSstMerger { merge_mode, }; let source = builder.build_flat_sst_reader().await?; + + let hook: Option = compaction_region.plugins.get(); + let pk_collector: Option = hook + .as_ref() + .map(|_| Arc::new(std::sync::Mutex::new(HashSet::new()))); + let source = if let Some(collector) = &pk_collector { + crate::read::FlatSource::new_iter( + source.schema().clone(), + wrap_with_pk_collector(source.take_iter(), &Some(collector.clone())), + ) + } else { + source + }; + let mut metrics = Metrics::new(WriteType::Compaction); let region_metadata = compaction_region.region_metadata.clone(); let sst_infos = compaction_region @@ -400,7 +425,7 @@ impl SstMerger for DefaultSstMerger { }; let output_files = sst_infos - .into_iter() + .iter() .map(|sst_info| { let pk_range = sst_info .file_metadata @@ -438,7 +463,10 @@ impl SstMerger for DefaultSstMerger { region_id, input_file_names, output_file_names, flat_format, metrics ); metrics.observe(); - Ok(output_files) + let primary_keys: Vec> = pk_collector + .map(|c| c.lock().unwrap().drain().collect()) + .unwrap_or_default(); + Ok((output_files, sst_infos.into_iter().collect(), primary_keys)) } } @@ -507,6 +535,8 @@ where } let mut output_files = Vec::with_capacity(tasks.len()); + let mut all_sst_infos: Vec = Vec::new(); + let mut all_primary_keys: HashSet> = HashSet::new(); let mut compacted_inputs = Vec::with_capacity( tasks.iter().map(|(inputs, _)| inputs.len()).sum::() + picker_output.expired_ssts.len(), @@ -530,8 +560,10 @@ where while let Some((inputs, handle)) = spawned.pop() { let abort_handle = handle.abort_handle(); match CancellableFuture::new(handle, self.cancel_handle.clone()).await { - Ok(Ok(Ok(files))) => { + Ok(Ok(Ok((files, infos, pks)))) => { output_files.extend(files); + all_sst_infos.extend(infos); + all_primary_keys.extend(pks); compacted_inputs.extend(inputs); } Ok(Ok(Err(e))) => { @@ -591,6 +623,8 @@ where files_to_add: output_files, files_to_remove: compacted_inputs, compaction_time_window: Some(compaction_time_window), + sst_infos: all_sst_infos, + primary_keys: all_primary_keys.into_iter().collect(), }) } @@ -679,6 +713,7 @@ mod tests { file_purger: None, ttl: None, max_parallelism: 1, + plugins: Plugins::new(), } } @@ -707,10 +742,10 @@ mod tests { _compaction_region: CompactionRegion, _output: CompactionOutput, _write_opts: WriteOptions, - ) -> Result> { + ) -> Result<(Vec, Vec, Vec>)> { let idx = self.call_idx.fetch_add(1, Ordering::SeqCst); match self.results.lock().unwrap().get(idx) { - Some(Ok(files)) => Ok(files.clone()), + Some(Ok(files)) => Ok((files.clone(), Vec::new(), Vec::new())), Some(Err(_)) => error::InvalidMetaSnafu { reason: format!("simulated failure at index {idx}"), } @@ -879,7 +914,7 @@ mod tests { _compaction_region: CompactionRegion, _output: CompactionOutput, _write_opts: WriteOptions, - ) -> Result> { + ) -> Result<(Vec, Vec, Vec>)> { self.call_idx.fetch_add(1, Ordering::SeqCst); std::future::pending().await } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index e48866b275c6..91014db837d7 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -27,6 +27,7 @@ use crate::compaction::LocalCompactionState; use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput}; use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager}; use crate::compaction::picker::{CompactionTask, PickerOutput}; +use crate::engine::flush_hook::{FlushHookRef, SstFileInfo}; use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED}; @@ -282,6 +283,43 @@ impl CompactionTaskImpl { ); } } + + async fn invoke_sst_hook(&self, merge_output: &MergeOutput) { + let hook: Option = self.compaction_region.plugins.get(); + if let Some(hook) = hook { + let files: Vec> = merge_output + .sst_infos + .iter() + .zip(merge_output.files_to_add.iter()) + .map(|(info, meta)| SstFileInfo { + file_meta: meta.clone(), + sst_info_ref: info, + }) + .collect(); + hook.on_sst_files_written( + self.compaction_region.region_id, + &self.compaction_region.region_metadata, + &files, + &merge_output.primary_keys, + ) + .await; + } + } + + async fn invoke_manifest_hook(&self, edit: &RegionEdit) { + let hook: Option = self.compaction_region.plugins.get(); + if let Some(hook) = hook { + let manifest_version = self + .compaction_region + .manifest_ctx + .manifest_manager + .read() + .await + .last_version(); + hook.on_manifest_updated(self.compaction_region.region_id, edit, manifest_version) + .await; + } + } } #[async_trait::async_trait] @@ -320,6 +358,7 @@ impl CompactionTask for CompactionTaskImpl { .await { Ok(Ok(merge_output)) => { + self.invoke_sst_hook(&merge_output).await; // Stop accepting cancellation once we are about to publish the compaction edit. if !self.state.mark_commit_started() { let senders = std::mem::take(&mut self.waiters); @@ -330,6 +369,7 @@ impl CompactionTask for CompactionTaskImpl { } else { match self.update_manifest(merge_output).await { Ok(edit) => { + self.invoke_manifest_hook(&edit).await; let senders = std::mem::take(&mut self.waiters); BackgroundNotify::CompactionFinished(CompactionFinished { region_id: self.compaction_region.region_id, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index ec38dec105af..8aa154b085ee 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -40,6 +40,7 @@ mod drop_test; mod edit_region_test; #[cfg(test)] mod filter_deleted_test; +pub mod flush_hook; #[cfg(test)] mod flush_test; #[cfg(test)] diff --git a/src/mito2/src/engine/flush_hook.rs b/src/mito2/src/engine/flush_hook.rs new file mode 100644 index 000000000000..9e0528e173f1 --- /dev/null +++ b/src/mito2/src/engine/flush_hook.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Flush hook extension point for SST and manifest operations. + +use std::sync::Arc; + +use async_trait::async_trait; +use store_api::ManifestVersion; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; + +use crate::manifest::action::RegionEdit; +use crate::sst::file::FileMeta; +use crate::sst::parquet::SstInfo; + +/// Information about a single SST file written during flush. +pub struct SstFileInfo<'a> { + pub sst_info_ref: &'a SstInfo, + pub file_meta: FileMeta, +} + +/// Extension hook for flush operations. +/// +/// Implementations can be registered via the `Plugins` system: +/// ```ignore +/// use std::sync::Arc; +/// use common_base::Plugins; +/// use mito2::engine::flush_hook::{FlushHook, FlushHookRef}; +/// +/// plugins.insert(Arc::new(MyHook) as FlushHookRef); +/// ``` +/// +/// To decode primary keys into tag name-value pairs, use: +/// ```ignore +/// use mito_codec::row_converter::build_primary_key_codec; +/// +/// let codec = build_primary_key_codec(region_metadata); +/// for pk_bytes in primary_keys { +/// let decoded = codec.decode(pk_bytes)?; +/// // Dense: Vec<(ColumnId, Value)> +/// // Sparse: SparseValues with column_id -> value mapping +/// } +/// ``` +#[async_trait] +pub trait FlushHook: Send + Sync { + /// Called after SST files are written during flush. + /// + /// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written. + /// - `primary_keys`: all unique primary keys (encoded bytes) across all files + /// in this flush. Decode with `build_primary_key_codec(region_metadata)`. + /// - `region_metadata`: provides the schema to decode primary keys into + /// tag/label name-value pairs. + async fn on_sst_files_written( + &self, + region_id: RegionId, + region_metadata: &RegionMetadataRef, + files: &[SstFileInfo<'_>], + primary_keys: &[Vec], + ) { + let _ = (region_id, region_metadata, files, primary_keys); + } + + /// Called after the region manifest is successfully updated. + async fn on_manifest_updated( + &self, + region_id: RegionId, + edit: &RegionEdit, + manifest_version: ManifestVersion, + ) { + let _ = (region_id, edit, manifest_version); + } +} + +pub type FlushHookRef = Arc; diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index b86e75c72a0a..c6493f7e843f 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -15,21 +15,28 @@ //! Flush tests for mito engine. use std::sync::Arc; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; use std::time::Duration; use api::v1::Rows; +use async_trait::async_trait; +use common_base::Plugins; use common_recordbatch::RecordBatches; use common_time::util::current_time_millis; use common_wal::options::WAL_OPTIONS_KEY; use rstest::rstest; use rstest_reuse::{self, apply}; +use store_api::ManifestVersion; +use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionFlushRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; +use tokio::sync::Notify; use crate::config::MitoConfig; +use crate::engine::flush_hook::{FlushHook, FlushHookRef, SstFileInfo}; use crate::engine::listener::{FlushListener, StallListener}; +use crate::manifest::action::RegionEdit; use crate::test_util::{ CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv, build_rows, build_rows_for_key, flush_region, kafka_log_store_factory, multiple_log_store_factories, @@ -660,3 +667,136 @@ async fn test_update_topic_latest_entry_id(factory: Option) { .unwrap(); assert_eq!(region.topic_latest_entry_id.load(Ordering::Relaxed), 1); } + +struct MockFlushHook { + sst_written_count: AtomicUsize, + manifest_updated_count: AtomicUsize, + primary_keys_count: AtomicUsize, + notify: Notify, +} + +impl MockFlushHook { + fn new() -> Self { + Self { + sst_written_count: AtomicUsize::new(0), + manifest_updated_count: AtomicUsize::new(0), + primary_keys_count: AtomicUsize::new(0), + notify: Notify::new(), + } + } + + async fn wait_for_manifest_update(&self) { + self.notify.notified().await; + } +} + +#[async_trait] +impl FlushHook for MockFlushHook { + async fn on_sst_files_written( + &self, + region_id: RegionId, + _region_metadata: &RegionMetadataRef, + files: &[SstFileInfo<'_>], + primary_keys: &[Vec], + ) { + self.sst_written_count + .fetch_add(files.len(), Ordering::Relaxed); + self.primary_keys_count + .store(primary_keys.len(), Ordering::Relaxed); + common_telemetry::info!( + "MockFlushHook::on_sst_files_written: region={}, files={}, primary_keys={}", + region_id, + files.len(), + primary_keys.len(), + ); + for (i, file) in files.iter().enumerate() { + common_telemetry::info!( + " file[{}]: file_id={}, num_rows={}, num_series={}, file_size={}", + i, + file.sst_info_ref.file_id, + file.sst_info_ref.num_rows, + file.sst_info_ref.num_series, + file.sst_info_ref.file_size, + ); + } + } + + async fn on_manifest_updated( + &self, + region_id: RegionId, + edit: &RegionEdit, + manifest_version: ManifestVersion, + ) { + self.manifest_updated_count.fetch_add(1, Ordering::Relaxed); + common_telemetry::info!( + "MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}, primary_keys_collected={}", + region_id, + manifest_version, + edit.files_to_add.len(), + self.primary_keys_count.load(Ordering::Relaxed), + ); + self.notify.notify_one(); + } +} + +#[tokio::test] +async fn test_flush_hook() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new().await; + + let hook = Arc::new(MockFlushHook::new()); + let plugins = Plugins::new(); + plugins.insert(hook.clone() as FlushHookRef); + + let engine = env + .create_engine_with_plugins(MitoConfig::default(), plugins) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 10, 0), + }; + put_rows(&engine, region_id, rows).await; + + let rows2 = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 10, 0), + }; + put_rows(&engine, region_id, rows2).await; + + flush_region(&engine, region_id, None).await; + + hook.wait_for_manifest_update().await; + + let sst_count = hook.sst_written_count.load(Ordering::Relaxed); + let manifest_count = hook.manifest_updated_count.load(Ordering::Relaxed); + let pk_count = hook.primary_keys_count.load(Ordering::Relaxed); + + assert!( + sst_count > 0, + "Expected at least 1 SST file, got {sst_count}" + ); + assert_eq!( + manifest_count, 1, + "Expected exactly 1 manifest update, got {manifest_count}" + ); + assert!( + pk_count >= 2, + "Expected at least 2 unique primary keys (tags 'a' and 'b'), got {pk_count}" + ); + + common_telemetry::info!( + "test_flush_hook passed: sst_count={}, manifest_count={}, pk_count={}", + sst_count, + manifest_count, + pk_count + ); +} diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 16260327f5a9..03cd96b9c528 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -14,15 +14,18 @@ //! Flush related utilities and structs. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroU64; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use std::time::Instant; use bytes::Bytes; +use common_base::Plugins; use common_telemetry::{debug, error, info}; -use datatypes::arrow::datatypes::SchemaRef; +use datatypes::arrow::array::{Array, BinaryArray, DictionaryArray}; +use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; +use datatypes::arrow::record_batch::RecordBatch; use partition::expr::PartitionExpr; use smallvec::{SmallVec, smallvec}; use snafu::ResultExt; @@ -36,6 +39,7 @@ use crate::access_layer::{ }; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; +use crate::engine::flush_hook::{FlushHookRef, SstFileInfo}; use crate::error::{ Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, @@ -66,6 +70,36 @@ use crate::sst::parquet::{ use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema}; use crate::worker::WorkerListener; +pub(crate) type SharedPrimaryKeys = Arc>>>; + +pub(crate) struct PkCollectingIter { + inner: BoxedRecordBatchIterator, + primary_keys: SharedPrimaryKeys, +} + +impl Iterator for PkCollectingIter { + type Item = Result; + + fn next(&mut self) -> Option { + let batch = self.inner.next(); + if let Some(Ok(ref record_batch)) = batch { + let pk_col_idx = record_batch.num_columns().saturating_sub(3); + if let Some(pk_col) = record_batch.columns().get(pk_col_idx) + && let Some(pk_dict) = pk_col + .as_any() + .downcast_ref::>() + && let Some(pk_values) = pk_dict.values().as_any().downcast_ref::() + { + let mut keys = self.primary_keys.lock().unwrap(); + for i in 0..pk_values.len() { + keys.insert(pk_values.value(i).to_vec()); + } + } + } + batch + } +} + /// Global write buffer (memtable) manager. /// /// Tracks write buffer (memtable) usages and decide whether the engine needs to flush. @@ -275,6 +309,8 @@ pub(crate) struct RegionFlushTask { /// /// This is used to generate the file meta. pub(crate) partition_expr: Option, + /// Plugins for flush hooks. + pub(crate) plugins: Plugins, } impl RegionFlushTask { @@ -365,8 +401,6 @@ impl RegionFlushTask { /// Flushes memtables to level 0 SSTs and updates the manifest. /// Returns the [RegionEdit] to apply. async fn flush_memtables(&self, version_data: &VersionControlData) -> Result { - // We must use the immutable memtables list and entry ids from the `version_data` - // for consistency as others might already modify the version in the `version_control`. let version = &version_data.version; let timer = FLUSH_ELAPSED .with_label_values(&["flush_memtables"]) @@ -386,6 +420,8 @@ impl RegionFlushTask { series_count, encoded_part_count, flush_metrics, + sst_infos, + primary_keys, } = self.do_flush_memtables(version, write_opts).await?; if !file_metas.is_empty() { @@ -414,12 +450,25 @@ impl RegionFlushTask { ); flush_metrics.observe(); + let hook: Option = self.plugins.get(); + if let Some(hook) = &hook { + let files: Vec> = sst_infos + .iter() + .zip(file_metas.iter()) + .map(|(sst_info, file_meta)| SstFileInfo { + sst_info_ref: sst_info, + file_meta: file_meta.clone(), + }) + .collect(); + hook.on_sst_files_written(self.region_id, &version.metadata, &files, &primary_keys) + .await; + } + let edit = RegionEdit { files_to_add: file_metas, files_to_remove: Vec::new(), timestamp_ms: Some(chrono::Utc::now().timestamp_millis()), compaction_time_window: None, - // The last entry has been flushed. flushed_entry_id: Some(version_data.last_entry_id), flushed_sequence: Some(version_data.committed_sequence), committed_sequence: None, @@ -434,7 +483,6 @@ impl RegionFlushTask { let expected_state = if matches!(self.reason, FlushReason::Downgrading) { RegionLeaderState::Downgrading } else { - // Check if region is in staging mode let current_state = self.manifest_ctx.current_state(); if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) { RegionLeaderState::Staging @@ -442,19 +490,22 @@ impl RegionFlushTask { RegionLeaderState::Writable } }; - // We will leak files if the manifest update fails, but we ignore them for simplicity. We can - // add a cleanup job to remove them later. - let version = self + let manifest_version = self .manifest_ctx .update_manifest(expected_state, action_list, self.is_staging) .await?; info!( - "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}", + "Successfully update manifest version to {manifest_version}, region: {}, is_staging: {}, reason: {}", self.region_id, self.is_staging, self.reason.as_str() ); + if let Some(hook) = &hook { + hook.on_manifest_updated(self.region_id, &edit, manifest_version) + .await; + } + Ok(edit) } @@ -470,13 +521,15 @@ impl RegionFlushTask { let mut encoded_part_count = 0; let mut flush_metrics = Metrics::new(WriteType::Flush); let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?; + let hook: Option = self.plugins.get(); + let shared_pks: Option = + hook.as_ref().map(|_| Arc::new(Mutex::new(HashSet::new()))); + let mut all_sst_infos = Vec::new(); for mem in memtables { if mem.is_empty() { - // Skip empty memtables. continue; } - // Compact the memtable first, this waits the background compaction to finish. let compact_start = std::time::Instant::now(); if let Err(e) = mem.compact(true) { common_telemetry::error!(e; "Failed to compact memtable before flush"); @@ -484,16 +537,12 @@ impl RegionFlushTask { let compact_cost = compact_start.elapsed(); flush_metrics.compact_memtable += compact_cost; - // Sets `for_flush` flag to true. let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?; let num_mem_ranges = mem_ranges.ranges.len(); - // Aggregate stats from all ranges let num_mem_rows = mem_ranges.num_rows(); let memtable_series_count = mem_ranges.series_count(); let memtable_id = mem.id(); - // Increases series count for each mem range. We consider each mem range has different series so - // the counter may have more series than the actual series count. series_count += memtable_series_count; let flush_start = Instant::now(); @@ -502,13 +551,12 @@ impl RegionFlushTask { num_sources, results, } = self - .flush_flat_mem_ranges(version, &write_opts, mem_ranges) + .flush_flat_mem_ranges(version, &write_opts, mem_ranges, shared_pks.clone()) .await?; encoded_part_count += num_encoded; for (source_idx, result) in results.into_iter().enumerate() { let (max_sequence, ssts_written, metrics) = result?; if ssts_written.is_empty() { - // No data written. continue; } @@ -523,7 +571,7 @@ impl RegionFlushTask { flush_metrics = flush_metrics.merge(metrics); - for sst_info in ssts_written { + for sst_info in &ssts_written { flushed_bytes += sst_info.file_size; let pk_range = sst_info .file_metadata @@ -537,6 +585,9 @@ impl RegionFlushTask { pk_range, )); } + if hook.is_some() { + all_sst_infos.extend(ssts_written); + } } common_telemetry::debug!( @@ -552,12 +603,18 @@ impl RegionFlushTask { ); } + let primary_keys = shared_pks + .map(|pks| pks.lock().unwrap().drain().collect()) + .unwrap_or_default(); + Ok(DoFlushMemtablesResult { file_metas, flushed_bytes, series_count, encoded_part_count, flush_metrics, + sst_infos: all_sst_infos, + primary_keys, }) } @@ -566,6 +623,7 @@ impl RegionFlushTask { version: &VersionRef, write_opts: &WriteOptions, mem_ranges: MemtableRanges, + pk_collector: Option, ) -> Result { let batch_schema = to_flat_sst_arrow_schema( &version.metadata, @@ -578,6 +636,7 @@ impl RegionFlushTask { mem_ranges, &version.options, field_column_start, + pk_collector.clone(), )?; let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len()); let num_encoded = flat_sources.encoded.len(); @@ -626,7 +685,7 @@ impl RegionFlushTask { fn new_file_meta( region_id: RegionId, max_sequence: u64, - sst_info: SstInfo, + sst_info: &SstInfo, partition_expr: Option, primary_key_range: Option<(Bytes, Bytes)>, ) -> FileMeta { @@ -722,6 +781,8 @@ struct DoFlushMemtablesResult { series_count: usize, encoded_part_count: usize, flush_metrics: Metrics, + sst_infos: Vec, + primary_keys: Vec>, } struct FlatSources { @@ -729,12 +790,26 @@ struct FlatSources { encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>, } +pub(crate) fn wrap_with_pk_collector( + iter: BoxedRecordBatchIterator, + pk_collector: &Option, +) -> BoxedRecordBatchIterator { + match pk_collector { + Some(collector) => Box::new(PkCollectingIter { + inner: iter, + primary_keys: collector.clone(), + }), + None => iter, + } +} + /// Returns the max sequence and [FlatSource] for the given memtable. fn memtable_flat_sources( schema: SchemaRef, mem_ranges: MemtableRanges, options: &RegionOptions, field_column_start: usize, + pk_collector: Option, ) -> Result { let MemtableRanges { ranges } = mem_ranges; let mut flat_sources = FlatSources { @@ -759,6 +834,7 @@ fn memtable_flat_sources( field_column_start, iter, ); + let iter = wrap_with_pk_collector(iter, &pk_collector); flat_sources .sources .push((FlatSource::new_iter(schema, iter), max_sequence)); @@ -823,6 +899,7 @@ fn memtable_flat_sources( field_column_start, std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)), )?; + let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector); flat_sources.sources.push(( FlatSource::new_iter(schema.clone(), maybe_dedup), @@ -855,6 +932,7 @@ fn memtable_flat_sources( field_column_start, input_iters, )?; + let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector); flat_sources .sources @@ -1370,6 +1448,7 @@ mod tests { flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, partition_expr: None, + plugins: Plugins::new(), }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler @@ -1414,6 +1493,7 @@ mod tests { flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, partition_expr: None, + plugins: Plugins::new(), }) .collect(); // Schedule first task. @@ -1523,6 +1603,7 @@ mod tests { mem_ranges, &options, metadata.primary_key.len(), + None, ) .unwrap(); assert!(flat_sources.encoded.is_empty()); @@ -1549,9 +1630,14 @@ mod tests { ..Default::default() }; - let flat_sources = - memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len()) - .unwrap(); + let flat_sources = memtable_flat_sources( + schema, + mem_ranges, + &options, + metadata.primary_key.len(), + None, + ) + .unwrap(); assert!(flat_sources.encoded.is_empty()); assert_eq!(1, flat_sources.sources.len()); @@ -1600,6 +1686,7 @@ mod tests { flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, partition_expr: None, + plugins: Plugins::new(), }) .collect(); // Schedule first task. diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index a69f08693c9d..c471efb43073 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -693,7 +693,7 @@ impl RegionManifestManager { version } - fn last_version(&self) -> ManifestVersion { + pub fn last_version(&self) -> ManifestVersion { self.last_version.load(Ordering::Relaxed) } diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 567754f939c0..b671152deefc 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -1163,7 +1163,7 @@ impl FlatSource { } } - pub(crate) fn schema(&self) -> &SchemaRef { + pub fn schema(&self) -> &SchemaRef { &self.schema } @@ -1171,7 +1171,6 @@ impl FlatSource { self.inner.next_batch().await } - #[cfg(test)] pub(crate) fn take_iter(self) -> BoxedRecordBatchIterator { match self.inner { FlatSourceInner::Iter(iter) => iter, diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index c4ca7cc7b3f0..792b954a6b88 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -298,10 +298,20 @@ impl TestEnv { } pub(crate) async fn new_mito_engine(&self, config: MitoConfig) -> MitoEngine { + self.new_mito_engine_with_plugins(config, Plugins::new()) + .await + } + + pub(crate) async fn new_mito_engine_with_plugins( + &self, + config: MitoConfig, + plugins: Plugins, + ) -> MitoEngine { async fn create( zelf: &TestEnv, config: MitoConfig, log_store: Arc, + plugins: Plugins, ) -> MitoEngine { let data_home = zelf.data_home().display().to_string(); MitoEngine::new( @@ -312,15 +322,15 @@ impl TestEnv { zelf.schema_metadata_manager.clone(), zelf.file_ref_manager.clone(), zelf.partition_expr_fetcher.clone(), - Plugins::new(), + plugins, ) .await .unwrap() } match self.log_store.as_ref().unwrap().clone() { - LogStoreImpl::RaftEngine(log_store) => create(self, config, log_store).await, - LogStoreImpl::Kafka(log_store) => create(self, config, log_store).await, + LogStoreImpl::RaftEngine(log_store) => create(self, config, log_store, plugins).await, + LogStoreImpl::Kafka(log_store) => create(self, config, log_store, plugins).await, } } @@ -335,6 +345,21 @@ impl TestEnv { self.new_mito_engine(config).await } + /// Creates a new engine with specific config and plugins. + pub async fn create_engine_with_plugins( + &mut self, + config: MitoConfig, + plugins: Plugins, + ) -> MitoEngine { + let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; + + let object_store_manager = Arc::new(object_store_manager); + self.log_store = Some(log_store.clone()); + self.object_store_manager = Some(object_store_manager.clone()); + + self.new_mito_engine_with_plugins(config, plugins).await + } + /// Creates a new engine with specific config and existing logstore and object store manager. pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine { self.new_mito_engine(config).await diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 6e9573a9bbea..88be9084234f 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -619,6 +619,7 @@ impl WorkerStarter { file_ref_manager: self.file_ref_manager.clone(), partition_expr_fetcher: self.partition_expr_fetcher, flush_semaphore: self.flush_semaphore, + plugins: self.plugins, }; let handle = common_runtime::spawn_global(async move { worker_thread.run().await; @@ -892,6 +893,8 @@ struct RegionWorkerLoop { partition_expr_fetcher: PartitionExprFetcherRef, /// Semaphore to control flush concurrency. flush_semaphore: Arc, + /// Plugins for flush hooks. + plugins: Plugins, } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 6ad0e037e1b1..1932a31c7a9f 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -166,6 +166,7 @@ impl RegionWorkerLoop { flush_semaphore: self.flush_semaphore.clone(), is_staging: region.is_staging(), partition_expr: region.maybe_staging_partition_expr_str(), + plugins: self.plugins.clone(), } } } From 51cb226bd07118ac8b1e0f8d37c238a93d1ca357 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 21 May 2026 07:04:22 +0800 Subject: [PATCH 2/6] feat: update hook definitions --- src/datanode/src/datanode.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 9a2fe3d98213..243193bc5943 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -21,7 +21,9 @@ use std::time::{Duration, Instant}; use common_base::Plugins; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; -use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef}; +use common_meta::cache::{ + LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef, +}; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::datanode::TopicStatsReporter; use common_meta::key::runtime_switch::RuntimeSwitchManager; From f2efe3bb3e38a5204bd354747b3581a9bd557db5 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 21 May 2026 09:11:54 +0800 Subject: [PATCH 3/6] chore: fmt --- src/datanode/src/datanode.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 243193bc5943..9a2fe3d98213 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -21,9 +21,7 @@ use std::time::{Duration, Instant}; use common_base::Plugins; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; -use common_meta::cache::{ - LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef, -}; +use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef}; use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::datanode::TopicStatsReporter; use common_meta::key::runtime_switch::RuntimeSwitchManager; From e03ee25214a6c67712b3498fc7e18ea1f926d7d1 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 21 May 2026 09:46:21 +0800 Subject: [PATCH 4/6] fix: address review comments --- src/mito2/src/compaction/task.rs | 2 +- src/mito2/src/engine/flush_hook.rs | 2 +- src/mito2/src/flush.rs | 38 ++++++++++++++++++++---------- 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 91014db837d7..4c52d8a9dff0 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -292,8 +292,8 @@ impl CompactionTaskImpl { .iter() .zip(merge_output.files_to_add.iter()) .map(|(info, meta)| SstFileInfo { - file_meta: meta.clone(), sst_info_ref: info, + file_meta: meta, }) .collect(); hook.on_sst_files_written( diff --git a/src/mito2/src/engine/flush_hook.rs b/src/mito2/src/engine/flush_hook.rs index 9e0528e173f1..2ad3c1d2294e 100644 --- a/src/mito2/src/engine/flush_hook.rs +++ b/src/mito2/src/engine/flush_hook.rs @@ -28,7 +28,7 @@ use crate::sst::parquet::SstInfo; /// Information about a single SST file written during flush. pub struct SstFileInfo<'a> { pub sst_info_ref: &'a SstInfo, - pub file_meta: FileMeta, + pub file_meta: &'a FileMeta, } /// Extension hook for flush operations. diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 03cd96b9c528..ad4ea9ab5084 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -82,18 +82,17 @@ impl Iterator for PkCollectingIter { fn next(&mut self) -> Option { let batch = self.inner.next(); - if let Some(Ok(ref record_batch)) = batch { - let pk_col_idx = record_batch.num_columns().saturating_sub(3); - if let Some(pk_col) = record_batch.columns().get(pk_col_idx) - && let Some(pk_dict) = pk_col - .as_any() - .downcast_ref::>() - && let Some(pk_values) = pk_dict.values().as_any().downcast_ref::() - { - let mut keys = self.primary_keys.lock().unwrap(); - for i in 0..pk_values.len() { - keys.insert(pk_values.value(i).to_vec()); - } + if let Some(Ok(ref record_batch)) = batch + && let Some(pk_col) = + record_batch.column_by_name(store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME) + && let Some(pk_dict) = pk_col + .as_any() + .downcast_ref::>() + && let Some(pk_values) = pk_dict.values().as_any().downcast_ref::() + { + let mut keys = self.primary_keys.lock().unwrap(); + for i in 0..pk_values.len() { + keys.insert(pk_values.value(i).to_vec()); } } batch @@ -401,6 +400,8 @@ impl RegionFlushTask { /// Flushes memtables to level 0 SSTs and updates the manifest. /// Returns the [RegionEdit] to apply. async fn flush_memtables(&self, version_data: &VersionControlData) -> Result { + // We must use the immutable memtables list and entry ids from the `version_data` + // for consistency as others might already modify the version in the `version_control`. let version = &version_data.version; let timer = FLUSH_ELAPSED .with_label_values(&["flush_memtables"]) @@ -457,7 +458,7 @@ impl RegionFlushTask { .zip(file_metas.iter()) .map(|(sst_info, file_meta)| SstFileInfo { sst_info_ref: sst_info, - file_meta: file_meta.clone(), + file_meta, }) .collect(); hook.on_sst_files_written(self.region_id, &version.metadata, &files, &primary_keys) @@ -469,6 +470,7 @@ impl RegionFlushTask { files_to_remove: Vec::new(), timestamp_ms: Some(chrono::Utc::now().timestamp_millis()), compaction_time_window: None, + // The last entry has been flushed. flushed_entry_id: Some(version_data.last_entry_id), flushed_sequence: Some(version_data.committed_sequence), committed_sequence: None, @@ -483,6 +485,7 @@ impl RegionFlushTask { let expected_state = if matches!(self.reason, FlushReason::Downgrading) { RegionLeaderState::Downgrading } else { + // Check if region is in staging mode let current_state = self.manifest_ctx.current_state(); if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) { RegionLeaderState::Staging @@ -490,6 +493,8 @@ impl RegionFlushTask { RegionLeaderState::Writable } }; + // We will leak files if the manifest update fails, but we ignore them for simplicity. We can + // add a cleanup job to remove them later. let manifest_version = self .manifest_ctx .update_manifest(expected_state, action_list, self.is_staging) @@ -527,9 +532,11 @@ impl RegionFlushTask { let mut all_sst_infos = Vec::new(); for mem in memtables { if mem.is_empty() { + // Skip empty memtables. continue; } + // Compact the memtable first, this waits the background compaction to finish. let compact_start = std::time::Instant::now(); if let Err(e) = mem.compact(true) { common_telemetry::error!(e; "Failed to compact memtable before flush"); @@ -537,12 +544,16 @@ impl RegionFlushTask { let compact_cost = compact_start.elapsed(); flush_metrics.compact_memtable += compact_cost; + // Sets `for_flush` flag to true. let mem_ranges = mem.ranges(None, RangesOptions::for_flush())?; let num_mem_ranges = mem_ranges.ranges.len(); + // Aggregate stats from all ranges let num_mem_rows = mem_ranges.num_rows(); let memtable_series_count = mem_ranges.series_count(); let memtable_id = mem.id(); + // Increases series count for each mem range. We consider each mem range has different series so + // the counter may have more series than the actual series count. series_count += memtable_series_count; let flush_start = Instant::now(); @@ -557,6 +568,7 @@ impl RegionFlushTask { for (source_idx, result) in results.into_iter().enumerate() { let (max_sequence, ssts_written, metrics) = result?; if ssts_written.is_empty() { + // No data written. continue; } From 9ac92e50e886cc6ef8b9fa6f53d2aaa889534b12 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Thu, 21 May 2026 22:46:15 +0800 Subject: [PATCH 5/6] feat: remove primary key collection --- src/mito2/src/compaction/compactor.rs | 38 +++---------- src/mito2/src/compaction/task.rs | 1 - src/mito2/src/engine/flush_hook.rs | 20 +------ src/mito2/src/engine/flush_test.rs | 19 +------ src/mito2/src/flush.rs | 81 +++------------------------ 5 files changed, 20 insertions(+), 139 deletions(-) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index d498bc3a18bb..ef608ae0fc30 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; use std::num::NonZero; use std::sync::Arc; use std::time::Duration; @@ -39,12 +38,10 @@ use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::PickerOutput; use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options}; use crate::config::MitoConfig; -use crate::engine::flush_hook::FlushHookRef; use crate::error; use crate::error::{ EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result, }; -use crate::flush::{SharedPrimaryKeys, wrap_with_pk_collector}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; use crate::region::options::RegionOptions; @@ -264,8 +261,6 @@ pub struct MergeOutput { pub compaction_time_window: Option, #[serde(skip)] pub sst_infos: Vec, - #[serde(skip)] - pub primary_keys: Vec>, } impl MergeOutput { @@ -311,7 +306,7 @@ pub trait SstMerger: Send + Sync + 'static { compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, - ) -> Result<(Vec, Vec, Vec>)>; + ) -> Result<(Vec, Vec)>; } /// The production [`SstMerger`] that reads, merges, and writes SST files. @@ -325,7 +320,7 @@ impl SstMerger for DefaultSstMerger { compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, - ) -> Result<(Vec, Vec, Vec>)> { + ) -> Result<(Vec, Vec)> { let region_id = compaction_region.region_id; let storage = compaction_region.region_options.storage.clone(); let index_options = compaction_region @@ -371,19 +366,6 @@ impl SstMerger for DefaultSstMerger { }; let source = builder.build_flat_sst_reader().await?; - let hook: Option = compaction_region.plugins.get(); - let pk_collector: Option = hook - .as_ref() - .map(|_| Arc::new(std::sync::Mutex::new(HashSet::new()))); - let source = if let Some(collector) = &pk_collector { - crate::read::FlatSource::new_iter( - source.schema().clone(), - wrap_with_pk_collector(source.take_iter(), &Some(collector.clone())), - ) - } else { - source - }; - let mut metrics = Metrics::new(WriteType::Compaction); let region_metadata = compaction_region.region_metadata.clone(); let sst_infos = compaction_region @@ -463,10 +445,7 @@ impl SstMerger for DefaultSstMerger { region_id, input_file_names, output_file_names, flat_format, metrics ); metrics.observe(); - let primary_keys: Vec> = pk_collector - .map(|c| c.lock().unwrap().drain().collect()) - .unwrap_or_default(); - Ok((output_files, sst_infos.into_iter().collect(), primary_keys)) + Ok((output_files, sst_infos.into_iter().collect())) } } @@ -536,7 +515,6 @@ where let mut output_files = Vec::with_capacity(tasks.len()); let mut all_sst_infos: Vec = Vec::new(); - let mut all_primary_keys: HashSet> = HashSet::new(); let mut compacted_inputs = Vec::with_capacity( tasks.iter().map(|(inputs, _)| inputs.len()).sum::() + picker_output.expired_ssts.len(), @@ -560,10 +538,9 @@ where while let Some((inputs, handle)) = spawned.pop() { let abort_handle = handle.abort_handle(); match CancellableFuture::new(handle, self.cancel_handle.clone()).await { - Ok(Ok(Ok((files, infos, pks)))) => { + Ok(Ok(Ok((files, infos)))) => { output_files.extend(files); all_sst_infos.extend(infos); - all_primary_keys.extend(pks); compacted_inputs.extend(inputs); } Ok(Ok(Err(e))) => { @@ -624,7 +601,6 @@ where files_to_remove: compacted_inputs, compaction_time_window: Some(compaction_time_window), sst_infos: all_sst_infos, - primary_keys: all_primary_keys.into_iter().collect(), }) } @@ -742,10 +718,10 @@ mod tests { _compaction_region: CompactionRegion, _output: CompactionOutput, _write_opts: WriteOptions, - ) -> Result<(Vec, Vec, Vec>)> { + ) -> Result<(Vec, Vec)> { let idx = self.call_idx.fetch_add(1, Ordering::SeqCst); match self.results.lock().unwrap().get(idx) { - Some(Ok(files)) => Ok((files.clone(), Vec::new(), Vec::new())), + Some(Ok(files)) => Ok((files.clone(), Vec::new())), Some(Err(_)) => error::InvalidMetaSnafu { reason: format!("simulated failure at index {idx}"), } @@ -914,7 +890,7 @@ mod tests { _compaction_region: CompactionRegion, _output: CompactionOutput, _write_opts: WriteOptions, - ) -> Result<(Vec, Vec, Vec>)> { + ) -> Result<(Vec, Vec)> { self.call_idx.fetch_add(1, Ordering::SeqCst); std::future::pending().await } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index 4c52d8a9dff0..0e3725b8e674 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -300,7 +300,6 @@ impl CompactionTaskImpl { self.compaction_region.region_id, &self.compaction_region.region_metadata, &files, - &merge_output.primary_keys, ) .await; } diff --git a/src/mito2/src/engine/flush_hook.rs b/src/mito2/src/engine/flush_hook.rs index 2ad3c1d2294e..771b83704f13 100644 --- a/src/mito2/src/engine/flush_hook.rs +++ b/src/mito2/src/engine/flush_hook.rs @@ -41,35 +41,19 @@ pub struct SstFileInfo<'a> { /// /// plugins.insert(Arc::new(MyHook) as FlushHookRef); /// ``` -/// -/// To decode primary keys into tag name-value pairs, use: -/// ```ignore -/// use mito_codec::row_converter::build_primary_key_codec; -/// -/// let codec = build_primary_key_codec(region_metadata); -/// for pk_bytes in primary_keys { -/// let decoded = codec.decode(pk_bytes)?; -/// // Dense: Vec<(ColumnId, Value)> -/// // Sparse: SparseValues with column_id -> value mapping -/// } -/// ``` #[async_trait] pub trait FlushHook: Send + Sync { /// Called after SST files are written during flush. /// /// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written. - /// - `primary_keys`: all unique primary keys (encoded bytes) across all files - /// in this flush. Decode with `build_primary_key_codec(region_metadata)`. - /// - `region_metadata`: provides the schema to decode primary keys into - /// tag/label name-value pairs. + /// - `region_metadata`: provides the schema for column type information. async fn on_sst_files_written( &self, region_id: RegionId, region_metadata: &RegionMetadataRef, files: &[SstFileInfo<'_>], - primary_keys: &[Vec], ) { - let _ = (region_id, region_metadata, files, primary_keys); + let _ = (region_id, region_metadata, files); } /// Called after the region manifest is successfully updated. diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index c6493f7e843f..e85b854b1ff5 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -671,7 +671,6 @@ async fn test_update_topic_latest_entry_id(factory: Option) { struct MockFlushHook { sst_written_count: AtomicUsize, manifest_updated_count: AtomicUsize, - primary_keys_count: AtomicUsize, notify: Notify, } @@ -680,7 +679,6 @@ impl MockFlushHook { Self { sst_written_count: AtomicUsize::new(0), manifest_updated_count: AtomicUsize::new(0), - primary_keys_count: AtomicUsize::new(0), notify: Notify::new(), } } @@ -697,17 +695,13 @@ impl FlushHook for MockFlushHook { region_id: RegionId, _region_metadata: &RegionMetadataRef, files: &[SstFileInfo<'_>], - primary_keys: &[Vec], ) { self.sst_written_count .fetch_add(files.len(), Ordering::Relaxed); - self.primary_keys_count - .store(primary_keys.len(), Ordering::Relaxed); common_telemetry::info!( - "MockFlushHook::on_sst_files_written: region={}, files={}, primary_keys={}", + "MockFlushHook::on_sst_files_written: region={}, files={}", region_id, files.len(), - primary_keys.len(), ); for (i, file) in files.iter().enumerate() { common_telemetry::info!( @@ -729,11 +723,10 @@ impl FlushHook for MockFlushHook { ) { self.manifest_updated_count.fetch_add(1, Ordering::Relaxed); common_telemetry::info!( - "MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}, primary_keys_collected={}", + "MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}", region_id, manifest_version, edit.files_to_add.len(), - self.primary_keys_count.load(Ordering::Relaxed), ); self.notify.notify_one(); } @@ -778,7 +771,6 @@ async fn test_flush_hook() { let sst_count = hook.sst_written_count.load(Ordering::Relaxed); let manifest_count = hook.manifest_updated_count.load(Ordering::Relaxed); - let pk_count = hook.primary_keys_count.load(Ordering::Relaxed); assert!( sst_count > 0, @@ -788,15 +780,10 @@ async fn test_flush_hook() { manifest_count, 1, "Expected exactly 1 manifest update, got {manifest_count}" ); - assert!( - pk_count >= 2, - "Expected at least 2 unique primary keys (tags 'a' and 'b'), got {pk_count}" - ); common_telemetry::info!( - "test_flush_hook passed: sst_count={}, manifest_count={}, pk_count={}", + "test_flush_hook passed: sst_count={}, manifest_count={}", sst_count, manifest_count, - pk_count ); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index ad4ea9ab5084..c5428ab8be9c 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -14,18 +14,16 @@ //! Flush related utilities and structs. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::num::NonZeroU64; +use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; use std::time::Instant; use bytes::Bytes; use common_base::Plugins; use common_telemetry::{debug, error, info}; -use datatypes::arrow::array::{Array, BinaryArray, DictionaryArray}; -use datatypes::arrow::datatypes::{SchemaRef, UInt32Type}; -use datatypes::arrow::record_batch::RecordBatch; +use datatypes::arrow::datatypes::SchemaRef; use partition::expr::PartitionExpr; use smallvec::{SmallVec, smallvec}; use snafu::ResultExt; @@ -70,35 +68,6 @@ use crate::sst::parquet::{ use crate::sst::{FlatSchemaOptions, FormatType, to_flat_sst_arrow_schema}; use crate::worker::WorkerListener; -pub(crate) type SharedPrimaryKeys = Arc>>>; - -pub(crate) struct PkCollectingIter { - inner: BoxedRecordBatchIterator, - primary_keys: SharedPrimaryKeys, -} - -impl Iterator for PkCollectingIter { - type Item = Result; - - fn next(&mut self) -> Option { - let batch = self.inner.next(); - if let Some(Ok(ref record_batch)) = batch - && let Some(pk_col) = - record_batch.column_by_name(store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME) - && let Some(pk_dict) = pk_col - .as_any() - .downcast_ref::>() - && let Some(pk_values) = pk_dict.values().as_any().downcast_ref::() - { - let mut keys = self.primary_keys.lock().unwrap(); - for i in 0..pk_values.len() { - keys.insert(pk_values.value(i).to_vec()); - } - } - batch - } -} - /// Global write buffer (memtable) manager. /// /// Tracks write buffer (memtable) usages and decide whether the engine needs to flush. @@ -422,7 +391,6 @@ impl RegionFlushTask { encoded_part_count, flush_metrics, sst_infos, - primary_keys, } = self.do_flush_memtables(version, write_opts).await?; if !file_metas.is_empty() { @@ -461,7 +429,7 @@ impl RegionFlushTask { file_meta, }) .collect(); - hook.on_sst_files_written(self.region_id, &version.metadata, &files, &primary_keys) + hook.on_sst_files_written(self.region_id, &version.metadata, &files) .await; } @@ -527,8 +495,6 @@ impl RegionFlushTask { let mut flush_metrics = Metrics::new(WriteType::Flush); let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?; let hook: Option = self.plugins.get(); - let shared_pks: Option = - hook.as_ref().map(|_| Arc::new(Mutex::new(HashSet::new()))); let mut all_sst_infos = Vec::new(); for mem in memtables { if mem.is_empty() { @@ -562,7 +528,7 @@ impl RegionFlushTask { num_sources, results, } = self - .flush_flat_mem_ranges(version, &write_opts, mem_ranges, shared_pks.clone()) + .flush_flat_mem_ranges(version, &write_opts, mem_ranges) .await?; encoded_part_count += num_encoded; for (source_idx, result) in results.into_iter().enumerate() { @@ -615,10 +581,6 @@ impl RegionFlushTask { ); } - let primary_keys = shared_pks - .map(|pks| pks.lock().unwrap().drain().collect()) - .unwrap_or_default(); - Ok(DoFlushMemtablesResult { file_metas, flushed_bytes, @@ -626,7 +588,6 @@ impl RegionFlushTask { encoded_part_count, flush_metrics, sst_infos: all_sst_infos, - primary_keys, }) } @@ -635,7 +596,6 @@ impl RegionFlushTask { version: &VersionRef, write_opts: &WriteOptions, mem_ranges: MemtableRanges, - pk_collector: Option, ) -> Result { let batch_schema = to_flat_sst_arrow_schema( &version.metadata, @@ -648,7 +608,6 @@ impl RegionFlushTask { mem_ranges, &version.options, field_column_start, - pk_collector.clone(), )?; let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len()); let num_encoded = flat_sources.encoded.len(); @@ -794,7 +753,6 @@ struct DoFlushMemtablesResult { encoded_part_count: usize, flush_metrics: Metrics, sst_infos: Vec, - primary_keys: Vec>, } struct FlatSources { @@ -802,26 +760,12 @@ struct FlatSources { encoded: SmallVec<[(EncodedRange, SequenceNumber); 4]>, } -pub(crate) fn wrap_with_pk_collector( - iter: BoxedRecordBatchIterator, - pk_collector: &Option, -) -> BoxedRecordBatchIterator { - match pk_collector { - Some(collector) => Box::new(PkCollectingIter { - inner: iter, - primary_keys: collector.clone(), - }), - None => iter, - } -} - /// Returns the max sequence and [FlatSource] for the given memtable. fn memtable_flat_sources( schema: SchemaRef, mem_ranges: MemtableRanges, options: &RegionOptions, field_column_start: usize, - pk_collector: Option, ) -> Result { let MemtableRanges { ranges } = mem_ranges; let mut flat_sources = FlatSources { @@ -846,7 +790,6 @@ fn memtable_flat_sources( field_column_start, iter, ); - let iter = wrap_with_pk_collector(iter, &pk_collector); flat_sources .sources .push((FlatSource::new_iter(schema, iter), max_sequence)); @@ -911,7 +854,6 @@ fn memtable_flat_sources( field_column_start, std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)), )?; - let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector); flat_sources.sources.push(( FlatSource::new_iter(schema.clone(), maybe_dedup), @@ -944,7 +886,6 @@ fn memtable_flat_sources( field_column_start, input_iters, )?; - let maybe_dedup = wrap_with_pk_collector(maybe_dedup, &pk_collector); flat_sources .sources @@ -1615,7 +1556,6 @@ mod tests { mem_ranges, &options, metadata.primary_key.len(), - None, ) .unwrap(); assert!(flat_sources.encoded.is_empty()); @@ -1642,14 +1582,9 @@ mod tests { ..Default::default() }; - let flat_sources = memtable_flat_sources( - schema, - mem_ranges, - &options, - metadata.primary_key.len(), - None, - ) - .unwrap(); + let flat_sources = + memtable_flat_sources(schema, mem_ranges, &options, metadata.primary_key.len()) + .unwrap(); assert!(flat_sources.encoded.is_empty()); assert_eq!(1, flat_sources.sources.len()); From 4e461adfc329a40af47aaabca3d74db91684f661 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Fri, 22 May 2026 00:52:07 +0800 Subject: [PATCH 6/6] fix: revert changes on flatsource --- src/mito2/src/read.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index b671152deefc..567754f939c0 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -1163,7 +1163,7 @@ impl FlatSource { } } - pub fn schema(&self) -> &SchemaRef { + pub(crate) fn schema(&self) -> &SchemaRef { &self.schema } @@ -1171,6 +1171,7 @@ impl FlatSource { self.inner.next_batch().await } + #[cfg(test)] pub(crate) fn take_iter(self) -> BoxedRecordBatchIterator { match self.inner { FlatSourceInner::Iter(iter) => iter,