diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 5cf3c444a8a5..4c83064371bb 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -515,6 +515,7 @@ impl CompactionScheduler { file_purger: None, ttl: Some(ttl), max_parallelism, + plugins: self.plugins.clone(), }; let picker_output = { diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index d78b2cc8fe66..ef608ae0fc30 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -16,6 +16,7 @@ use std::num::NonZero; use std::sync::Arc; use std::time::Duration; +use common_base::Plugins; use common_base::cancellation::{CancellableFuture, CancellationHandle}; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{debug, info, warn}; @@ -53,8 +54,8 @@ use crate::sst::file_purger::LocalFilePurger; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::region_dir_from_table_dir; -use crate::sst::parquet::WriteOptions; use crate::sst::parquet::metadata::extract_primary_key_range; +use crate::sst::parquet::{SstInfo, WriteOptions}; use crate::sst::version::{SstVersion, SstVersionRef}; /// Region version for compaction that does not hold memtables. @@ -106,6 +107,8 @@ pub struct CompactionRegion { /// The parallel is inside this compaction task, not across different compaction tasks. /// It can be different windows of the same compaction task or something like this. pub max_parallelism: usize, + + pub(crate) plugins: Plugins, } /// OpenCompactionRegionRequest represents the request to open a compaction region. @@ -230,6 +233,7 @@ pub async fn open_compaction_region( file_purger: Some(file_purger), ttl: Some(ttl), max_parallelism: req.max_parallelism, + plugins: Plugins::new(), }) } @@ -250,11 +254,13 @@ impl CompactionRegion { } /// `[MergeOutput]` represents the output of merging SST files. -#[derive(Default, Clone, Debug, Serialize, Deserialize)] +#[derive(Default, Debug, Serialize, Deserialize)] pub struct MergeOutput { pub files_to_add: Vec, pub files_to_remove: Vec, pub compaction_time_window: Option, + #[serde(skip)] + pub sst_infos: Vec, } impl MergeOutput { @@ -300,7 +306,7 @@ pub trait SstMerger: Send + Sync + 'static { compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, - ) -> Result>; + ) -> Result<(Vec, Vec)>; } /// The production [`SstMerger`] that reads, merges, and writes SST files. @@ -314,7 +320,7 @@ impl SstMerger for DefaultSstMerger { compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, - ) -> Result> { + ) -> Result<(Vec, Vec)> { let region_id = compaction_region.region_id; let storage = compaction_region.region_options.storage.clone(); let index_options = compaction_region @@ -359,6 +365,7 @@ impl SstMerger for DefaultSstMerger { merge_mode, }; let source = builder.build_flat_sst_reader().await?; + let mut metrics = Metrics::new(WriteType::Compaction); let region_metadata = compaction_region.region_metadata.clone(); let sst_infos = compaction_region @@ -400,7 +407,7 @@ impl SstMerger for DefaultSstMerger { }; let output_files = sst_infos - .into_iter() + .iter() .map(|sst_info| { let pk_range = sst_info .file_metadata @@ -438,7 +445,7 @@ impl SstMerger for DefaultSstMerger { region_id, input_file_names, output_file_names, flat_format, metrics ); metrics.observe(); - Ok(output_files) + Ok((output_files, sst_infos.into_iter().collect())) } } @@ -507,6 +514,7 @@ where } let mut output_files = Vec::with_capacity(tasks.len()); + let mut all_sst_infos: Vec = Vec::new(); let mut compacted_inputs = Vec::with_capacity( tasks.iter().map(|(inputs, _)| inputs.len()).sum::() + picker_output.expired_ssts.len(), @@ -530,8 +538,9 @@ where while let Some((inputs, handle)) = spawned.pop() { let abort_handle = handle.abort_handle(); match CancellableFuture::new(handle, self.cancel_handle.clone()).await { - Ok(Ok(Ok(files))) => { + Ok(Ok(Ok((files, infos)))) => { output_files.extend(files); + all_sst_infos.extend(infos); compacted_inputs.extend(inputs); } Ok(Ok(Err(e))) => { @@ -591,6 +600,7 @@ where files_to_add: output_files, files_to_remove: compacted_inputs, compaction_time_window: Some(compaction_time_window), + sst_infos: all_sst_infos, }) } @@ -679,6 +689,7 @@ mod tests { file_purger: None, ttl: None, max_parallelism: 1, + plugins: Plugins::new(), } } @@ -707,10 +718,10 @@ mod tests { _compaction_region: CompactionRegion, _output: CompactionOutput, _write_opts: WriteOptions, - ) -> Result> { + ) -> Result<(Vec, Vec)> { let idx = self.call_idx.fetch_add(1, Ordering::SeqCst); match self.results.lock().unwrap().get(idx) { - Some(Ok(files)) => Ok(files.clone()), + Some(Ok(files)) => Ok((files.clone(), Vec::new())), Some(Err(_)) => error::InvalidMetaSnafu { reason: format!("simulated failure at index {idx}"), } @@ -879,7 +890,7 @@ mod tests { _compaction_region: CompactionRegion, _output: CompactionOutput, _write_opts: WriteOptions, - ) -> Result> { + ) -> Result<(Vec, Vec)> { self.call_idx.fetch_add(1, Ordering::SeqCst); std::future::pending().await } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index e48866b275c6..0e3725b8e674 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -27,6 +27,7 @@ use crate::compaction::LocalCompactionState; use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput}; use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager}; use crate::compaction::picker::{CompactionTask, PickerOutput}; +use crate::engine::flush_hook::{FlushHookRef, SstFileInfo}; use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu}; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED}; @@ -282,6 +283,42 @@ impl CompactionTaskImpl { ); } } + + async fn invoke_sst_hook(&self, merge_output: &MergeOutput) { + let hook: Option = self.compaction_region.plugins.get(); + if let Some(hook) = hook { + let files: Vec> = merge_output + .sst_infos + .iter() + .zip(merge_output.files_to_add.iter()) + .map(|(info, meta)| SstFileInfo { + sst_info_ref: info, + file_meta: meta, + }) + .collect(); + hook.on_sst_files_written( + self.compaction_region.region_id, + &self.compaction_region.region_metadata, + &files, + ) + .await; + } + } + + async fn invoke_manifest_hook(&self, edit: &RegionEdit) { + let hook: Option = self.compaction_region.plugins.get(); + if let Some(hook) = hook { + let manifest_version = self + .compaction_region + .manifest_ctx + .manifest_manager + .read() + .await + .last_version(); + hook.on_manifest_updated(self.compaction_region.region_id, edit, manifest_version) + .await; + } + } } #[async_trait::async_trait] @@ -320,6 +357,7 @@ impl CompactionTask for CompactionTaskImpl { .await { Ok(Ok(merge_output)) => { + self.invoke_sst_hook(&merge_output).await; // Stop accepting cancellation once we are about to publish the compaction edit. if !self.state.mark_commit_started() { let senders = std::mem::take(&mut self.waiters); @@ -330,6 +368,7 @@ impl CompactionTask for CompactionTaskImpl { } else { match self.update_manifest(merge_output).await { Ok(edit) => { + self.invoke_manifest_hook(&edit).await; let senders = std::mem::take(&mut self.waiters); BackgroundNotify::CompactionFinished(CompactionFinished { region_id: self.compaction_region.region_id, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index ec38dec105af..8aa154b085ee 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -40,6 +40,7 @@ mod drop_test; mod edit_region_test; #[cfg(test)] mod filter_deleted_test; +pub mod flush_hook; #[cfg(test)] mod flush_test; #[cfg(test)] diff --git a/src/mito2/src/engine/flush_hook.rs b/src/mito2/src/engine/flush_hook.rs new file mode 100644 index 000000000000..771b83704f13 --- /dev/null +++ b/src/mito2/src/engine/flush_hook.rs @@ -0,0 +1,70 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Flush hook extension point for SST and manifest operations. + +use std::sync::Arc; + +use async_trait::async_trait; +use store_api::ManifestVersion; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; + +use crate::manifest::action::RegionEdit; +use crate::sst::file::FileMeta; +use crate::sst::parquet::SstInfo; + +/// Information about a single SST file written during flush. +pub struct SstFileInfo<'a> { + pub sst_info_ref: &'a SstInfo, + pub file_meta: &'a FileMeta, +} + +/// Extension hook for flush operations. +/// +/// Implementations can be registered via the `Plugins` system: +/// ```ignore +/// use std::sync::Arc; +/// use common_base::Plugins; +/// use mito2::engine::flush_hook::{FlushHook, FlushHookRef}; +/// +/// plugins.insert(Arc::new(MyHook) as FlushHookRef); +/// ``` +#[async_trait] +pub trait FlushHook: Send + Sync { + /// Called after SST files are written during flush. + /// + /// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written. + /// - `region_metadata`: provides the schema for column type information. + async fn on_sst_files_written( + &self, + region_id: RegionId, + region_metadata: &RegionMetadataRef, + files: &[SstFileInfo<'_>], + ) { + let _ = (region_id, region_metadata, files); + } + + /// Called after the region manifest is successfully updated. + async fn on_manifest_updated( + &self, + region_id: RegionId, + edit: &RegionEdit, + manifest_version: ManifestVersion, + ) { + let _ = (region_id, edit, manifest_version); + } +} + +pub type FlushHookRef = Arc; diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index b86e75c72a0a..e85b854b1ff5 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -15,21 +15,28 @@ //! Flush tests for mito engine. use std::sync::Arc; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; use std::time::Duration; use api::v1::Rows; +use async_trait::async_trait; +use common_base::Plugins; use common_recordbatch::RecordBatches; use common_time::util::current_time_millis; use common_wal::options::WAL_OPTIONS_KEY; use rstest::rstest; use rstest_reuse::{self, apply}; +use store_api::ManifestVersion; +use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionFlushRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; +use tokio::sync::Notify; use crate::config::MitoConfig; +use crate::engine::flush_hook::{FlushHook, FlushHookRef, SstFileInfo}; use crate::engine::listener::{FlushListener, StallListener}; +use crate::manifest::action::RegionEdit; use crate::test_util::{ CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv, build_rows, build_rows_for_key, flush_region, kafka_log_store_factory, multiple_log_store_factories, @@ -660,3 +667,123 @@ async fn test_update_topic_latest_entry_id(factory: Option) { .unwrap(); assert_eq!(region.topic_latest_entry_id.load(Ordering::Relaxed), 1); } + +struct MockFlushHook { + sst_written_count: AtomicUsize, + manifest_updated_count: AtomicUsize, + notify: Notify, +} + +impl MockFlushHook { + fn new() -> Self { + Self { + sst_written_count: AtomicUsize::new(0), + manifest_updated_count: AtomicUsize::new(0), + notify: Notify::new(), + } + } + + async fn wait_for_manifest_update(&self) { + self.notify.notified().await; + } +} + +#[async_trait] +impl FlushHook for MockFlushHook { + async fn on_sst_files_written( + &self, + region_id: RegionId, + _region_metadata: &RegionMetadataRef, + files: &[SstFileInfo<'_>], + ) { + self.sst_written_count + .fetch_add(files.len(), Ordering::Relaxed); + common_telemetry::info!( + "MockFlushHook::on_sst_files_written: region={}, files={}", + region_id, + files.len(), + ); + for (i, file) in files.iter().enumerate() { + common_telemetry::info!( + " file[{}]: file_id={}, num_rows={}, num_series={}, file_size={}", + i, + file.sst_info_ref.file_id, + file.sst_info_ref.num_rows, + file.sst_info_ref.num_series, + file.sst_info_ref.file_size, + ); + } + } + + async fn on_manifest_updated( + &self, + region_id: RegionId, + edit: &RegionEdit, + manifest_version: ManifestVersion, + ) { + self.manifest_updated_count.fetch_add(1, Ordering::Relaxed); + common_telemetry::info!( + "MockFlushHook::on_manifest_updated: region={}, manifest_version={}, files_added={}", + region_id, + manifest_version, + edit.files_to_add.len(), + ); + self.notify.notify_one(); + } +} + +#[tokio::test] +async fn test_flush_hook() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new().await; + + let hook = Arc::new(MockFlushHook::new()); + let plugins = Plugins::new(); + plugins.insert(hook.clone() as FlushHookRef); + + let engine = env + .create_engine_with_plugins(MitoConfig::default(), plugins) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 10, 0), + }; + put_rows(&engine, region_id, rows).await; + + let rows2 = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 10, 0), + }; + put_rows(&engine, region_id, rows2).await; + + flush_region(&engine, region_id, None).await; + + hook.wait_for_manifest_update().await; + + let sst_count = hook.sst_written_count.load(Ordering::Relaxed); + let manifest_count = hook.manifest_updated_count.load(Ordering::Relaxed); + + assert!( + sst_count > 0, + "Expected at least 1 SST file, got {sst_count}" + ); + assert_eq!( + manifest_count, 1, + "Expected exactly 1 manifest update, got {manifest_count}" + ); + + common_telemetry::info!( + "test_flush_hook passed: sst_count={}, manifest_count={}", + sst_count, + manifest_count, + ); +} diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 16260327f5a9..c5428ab8be9c 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Instant; use bytes::Bytes; +use common_base::Plugins; use common_telemetry::{debug, error, info}; use datatypes::arrow::datatypes::SchemaRef; use partition::expr::PartitionExpr; @@ -36,6 +37,7 @@ use crate::access_layer::{ }; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; +use crate::engine::flush_hook::{FlushHookRef, SstFileInfo}; use crate::error::{ Error, FlushRegionSnafu, JoinSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, @@ -275,6 +277,8 @@ pub(crate) struct RegionFlushTask { /// /// This is used to generate the file meta. pub(crate) partition_expr: Option, + /// Plugins for flush hooks. + pub(crate) plugins: Plugins, } impl RegionFlushTask { @@ -386,6 +390,7 @@ impl RegionFlushTask { series_count, encoded_part_count, flush_metrics, + sst_infos, } = self.do_flush_memtables(version, write_opts).await?; if !file_metas.is_empty() { @@ -414,6 +419,20 @@ impl RegionFlushTask { ); flush_metrics.observe(); + let hook: Option = self.plugins.get(); + if let Some(hook) = &hook { + let files: Vec> = sst_infos + .iter() + .zip(file_metas.iter()) + .map(|(sst_info, file_meta)| SstFileInfo { + sst_info_ref: sst_info, + file_meta, + }) + .collect(); + hook.on_sst_files_written(self.region_id, &version.metadata, &files) + .await; + } + let edit = RegionEdit { files_to_add: file_metas, files_to_remove: Vec::new(), @@ -444,17 +463,22 @@ impl RegionFlushTask { }; // We will leak files if the manifest update fails, but we ignore them for simplicity. We can // add a cleanup job to remove them later. - let version = self + let manifest_version = self .manifest_ctx .update_manifest(expected_state, action_list, self.is_staging) .await?; info!( - "Successfully update manifest version to {version}, region: {}, is_staging: {}, reason: {}", + "Successfully update manifest version to {manifest_version}, region: {}, is_staging: {}, reason: {}", self.region_id, self.is_staging, self.reason.as_str() ); + if let Some(hook) = &hook { + hook.on_manifest_updated(self.region_id, &edit, manifest_version) + .await; + } + Ok(edit) } @@ -470,6 +494,8 @@ impl RegionFlushTask { let mut encoded_part_count = 0; let mut flush_metrics = Metrics::new(WriteType::Flush); let partition_expr = parse_partition_expr(self.partition_expr.as_deref())?; + let hook: Option = self.plugins.get(); + let mut all_sst_infos = Vec::new(); for mem in memtables { if mem.is_empty() { // Skip empty memtables. @@ -523,7 +549,7 @@ impl RegionFlushTask { flush_metrics = flush_metrics.merge(metrics); - for sst_info in ssts_written { + for sst_info in &ssts_written { flushed_bytes += sst_info.file_size; let pk_range = sst_info .file_metadata @@ -537,6 +563,9 @@ impl RegionFlushTask { pk_range, )); } + if hook.is_some() { + all_sst_infos.extend(ssts_written); + } } common_telemetry::debug!( @@ -558,6 +587,7 @@ impl RegionFlushTask { series_count, encoded_part_count, flush_metrics, + sst_infos: all_sst_infos, }) } @@ -626,7 +656,7 @@ impl RegionFlushTask { fn new_file_meta( region_id: RegionId, max_sequence: u64, - sst_info: SstInfo, + sst_info: &SstInfo, partition_expr: Option, primary_key_range: Option<(Bytes, Bytes)>, ) -> FileMeta { @@ -722,6 +752,7 @@ struct DoFlushMemtablesResult { series_count: usize, encoded_part_count: usize, flush_metrics: Metrics, + sst_infos: Vec, } struct FlatSources { @@ -1370,6 +1401,7 @@ mod tests { flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, partition_expr: None, + plugins: Plugins::new(), }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler @@ -1414,6 +1446,7 @@ mod tests { flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, partition_expr: None, + plugins: Plugins::new(), }) .collect(); // Schedule first task. @@ -1600,6 +1633,7 @@ mod tests { flush_semaphore: Arc::new(Semaphore::new(2)), is_staging: false, partition_expr: None, + plugins: Plugins::new(), }) .collect(); // Schedule first task. diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index a69f08693c9d..c471efb43073 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -693,7 +693,7 @@ impl RegionManifestManager { version } - fn last_version(&self) -> ManifestVersion { + pub fn last_version(&self) -> ManifestVersion { self.last_version.load(Ordering::Relaxed) } diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index c4ca7cc7b3f0..792b954a6b88 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -298,10 +298,20 @@ impl TestEnv { } pub(crate) async fn new_mito_engine(&self, config: MitoConfig) -> MitoEngine { + self.new_mito_engine_with_plugins(config, Plugins::new()) + .await + } + + pub(crate) async fn new_mito_engine_with_plugins( + &self, + config: MitoConfig, + plugins: Plugins, + ) -> MitoEngine { async fn create( zelf: &TestEnv, config: MitoConfig, log_store: Arc, + plugins: Plugins, ) -> MitoEngine { let data_home = zelf.data_home().display().to_string(); MitoEngine::new( @@ -312,15 +322,15 @@ impl TestEnv { zelf.schema_metadata_manager.clone(), zelf.file_ref_manager.clone(), zelf.partition_expr_fetcher.clone(), - Plugins::new(), + plugins, ) .await .unwrap() } match self.log_store.as_ref().unwrap().clone() { - LogStoreImpl::RaftEngine(log_store) => create(self, config, log_store).await, - LogStoreImpl::Kafka(log_store) => create(self, config, log_store).await, + LogStoreImpl::RaftEngine(log_store) => create(self, config, log_store, plugins).await, + LogStoreImpl::Kafka(log_store) => create(self, config, log_store, plugins).await, } } @@ -335,6 +345,21 @@ impl TestEnv { self.new_mito_engine(config).await } + /// Creates a new engine with specific config and plugins. + pub async fn create_engine_with_plugins( + &mut self, + config: MitoConfig, + plugins: Plugins, + ) -> MitoEngine { + let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; + + let object_store_manager = Arc::new(object_store_manager); + self.log_store = Some(log_store.clone()); + self.object_store_manager = Some(object_store_manager.clone()); + + self.new_mito_engine_with_plugins(config, plugins).await + } + /// Creates a new engine with specific config and existing logstore and object store manager. pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine { self.new_mito_engine(config).await diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 6e9573a9bbea..88be9084234f 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -619,6 +619,7 @@ impl WorkerStarter { file_ref_manager: self.file_ref_manager.clone(), partition_expr_fetcher: self.partition_expr_fetcher, flush_semaphore: self.flush_semaphore, + plugins: self.plugins, }; let handle = common_runtime::spawn_global(async move { worker_thread.run().await; @@ -892,6 +893,8 @@ struct RegionWorkerLoop { partition_expr_fetcher: PartitionExprFetcherRef, /// Semaphore to control flush concurrency. flush_semaphore: Arc, + /// Plugins for flush hooks. + plugins: Plugins, } impl RegionWorkerLoop { diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 6ad0e037e1b1..1932a31c7a9f 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -166,6 +166,7 @@ impl RegionWorkerLoop { flush_semaphore: self.flush_semaphore.clone(), is_staging: region.is_staging(), partition_expr: region.maybe_staging_partition_expr_str(), + plugins: self.plugins.clone(), } } }