Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ impl CompactionScheduler {
file_purger: None,
ttl: Some(ttl),
max_parallelism,
plugins: self.plugins.clone(),
};

let picker_output = {
Expand Down
31 changes: 21 additions & 10 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::num::NonZero;
use std::sync::Arc;
use std::time::Duration;

use common_base::Plugins;
use common_base::cancellation::{CancellableFuture, CancellationHandle};
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, info, warn};
Expand Down Expand Up @@ -53,8 +54,8 @@ use crate::sst::file_purger::LocalFilePurger;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location::region_dir_from_table_dir;
use crate::sst::parquet::WriteOptions;
use crate::sst::parquet::metadata::extract_primary_key_range;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::version::{SstVersion, SstVersionRef};

/// Region version for compaction that does not hold memtables.
Expand Down Expand Up @@ -106,6 +107,8 @@ pub struct CompactionRegion {
/// The parallel is inside this compaction task, not across different compaction tasks.
/// It can be different windows of the same compaction task or something like this.
pub max_parallelism: usize,

pub(crate) plugins: Plugins,
}

/// OpenCompactionRegionRequest represents the request to open a compaction region.
Expand Down Expand Up @@ -230,6 +233,7 @@ pub async fn open_compaction_region(
file_purger: Some(file_purger),
ttl: Some(ttl),
max_parallelism: req.max_parallelism,
plugins: Plugins::new(),
})
}

Expand All @@ -250,11 +254,13 @@ impl CompactionRegion {
}

/// `[MergeOutput]` represents the output of merging SST files.
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct MergeOutput {
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
#[serde(skip)]
pub sst_infos: Vec<SstInfo>,
}

impl MergeOutput {
Expand Down Expand Up @@ -300,7 +306,7 @@ pub trait SstMerger: Send + Sync + 'static {
compaction_region: CompactionRegion,
output: CompactionOutput,
write_opts: WriteOptions,
) -> Result<Vec<FileMeta>>;
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)>;
}

/// The production [`SstMerger`] that reads, merges, and writes SST files.
Expand All @@ -314,7 +320,7 @@ impl SstMerger for DefaultSstMerger {
compaction_region: CompactionRegion,
output: CompactionOutput,
write_opts: WriteOptions,
) -> Result<Vec<FileMeta>> {
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
let region_id = compaction_region.region_id;
let storage = compaction_region.region_options.storage.clone();
let index_options = compaction_region
Expand Down Expand Up @@ -359,6 +365,7 @@ impl SstMerger for DefaultSstMerger {
merge_mode,
};
let source = builder.build_flat_sst_reader().await?;

let mut metrics = Metrics::new(WriteType::Compaction);
let region_metadata = compaction_region.region_metadata.clone();
let sst_infos = compaction_region
Expand Down Expand Up @@ -400,7 +407,7 @@ impl SstMerger for DefaultSstMerger {
};

let output_files = sst_infos
.into_iter()
.iter()
.map(|sst_info| {
let pk_range = sst_info
.file_metadata
Expand Down Expand Up @@ -438,7 +445,7 @@ impl SstMerger for DefaultSstMerger {
region_id, input_file_names, output_file_names, flat_format, metrics
);
metrics.observe();
Ok(output_files)
Ok((output_files, sst_infos.into_iter().collect()))
}
}

Expand Down Expand Up @@ -507,6 +514,7 @@ where
}

let mut output_files = Vec::with_capacity(tasks.len());
let mut all_sst_infos: Vec<SstInfo> = Vec::new();
let mut compacted_inputs = Vec::with_capacity(
tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
+ picker_output.expired_ssts.len(),
Expand All @@ -530,8 +538,9 @@ where
while let Some((inputs, handle)) = spawned.pop() {
let abort_handle = handle.abort_handle();
match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
Ok(Ok(Ok(files))) => {
Ok(Ok(Ok((files, infos)))) => {
output_files.extend(files);
all_sst_infos.extend(infos);
compacted_inputs.extend(inputs);
}
Ok(Ok(Err(e))) => {
Expand Down Expand Up @@ -591,6 +600,7 @@ where
files_to_add: output_files,
files_to_remove: compacted_inputs,
compaction_time_window: Some(compaction_time_window),
sst_infos: all_sst_infos,
})
}

Expand Down Expand Up @@ -679,6 +689,7 @@ mod tests {
file_purger: None,
ttl: None,
max_parallelism: 1,
plugins: Plugins::new(),
}
}

Expand Down Expand Up @@ -707,10 +718,10 @@ mod tests {
_compaction_region: CompactionRegion,
_output: CompactionOutput,
_write_opts: WriteOptions,
) -> Result<Vec<FileMeta>> {
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
match self.results.lock().unwrap().get(idx) {
Some(Ok(files)) => Ok(files.clone()),
Some(Ok(files)) => Ok((files.clone(), Vec::new())),
Some(Err(_)) => error::InvalidMetaSnafu {
reason: format!("simulated failure at index {idx}"),
}
Expand Down Expand Up @@ -879,7 +890,7 @@ mod tests {
_compaction_region: CompactionRegion,
_output: CompactionOutput,
_write_opts: WriteOptions,
) -> Result<Vec<FileMeta>> {
) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
self.call_idx.fetch_add(1, Ordering::SeqCst);
std::future::pending().await
}
Expand Down
39 changes: 39 additions & 0 deletions src/mito2/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::compaction::LocalCompactionState;
use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput};
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::engine::flush_hook::{FlushHookRef, SstFileInfo};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
Expand Down Expand Up @@ -282,6 +283,42 @@ impl CompactionTaskImpl {
);
}
}

async fn invoke_sst_hook(&self, merge_output: &MergeOutput) {
let hook: Option<FlushHookRef> = self.compaction_region.plugins.get();
if let Some(hook) = hook {
let files: Vec<SstFileInfo<'_>> = merge_output
.sst_infos
.iter()
.zip(merge_output.files_to_add.iter())
.map(|(info, meta)| SstFileInfo {
sst_info_ref: info,
file_meta: meta,
})
.collect();
hook.on_sst_files_written(
self.compaction_region.region_id,
&self.compaction_region.region_metadata,
&files,
)
.await;
}
}

async fn invoke_manifest_hook(&self, edit: &RegionEdit) {
let hook: Option<FlushHookRef> = self.compaction_region.plugins.get();
if let Some(hook) = hook {
let manifest_version = self
.compaction_region
.manifest_ctx
.manifest_manager
.read()
.await
.last_version();
hook.on_manifest_updated(self.compaction_region.region_id, edit, manifest_version)
.await;
}
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -320,6 +357,7 @@ impl CompactionTask for CompactionTaskImpl {
.await
{
Ok(Ok(merge_output)) => {
self.invoke_sst_hook(&merge_output).await;
// Stop accepting cancellation once we are about to publish the compaction edit.
if !self.state.mark_commit_started() {
let senders = std::mem::take(&mut self.waiters);
Expand All @@ -330,6 +368,7 @@ impl CompactionTask for CompactionTaskImpl {
} else {
match self.update_manifest(merge_output).await {
Ok(edit) => {
self.invoke_manifest_hook(&edit).await;
let senders = std::mem::take(&mut self.waiters);
BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.compaction_region.region_id,
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod drop_test;
mod edit_region_test;
#[cfg(test)]
mod filter_deleted_test;
pub mod flush_hook;
#[cfg(test)]
mod flush_test;
#[cfg(test)]
Expand Down
70 changes: 70 additions & 0 deletions src/mito2/src/engine/flush_hook.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Flush hook extension point for SST and manifest operations.

use std::sync::Arc;

use async_trait::async_trait;
use store_api::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;

use crate::manifest::action::RegionEdit;
use crate::sst::file::FileMeta;
use crate::sst::parquet::SstInfo;

/// Information about a single SST file written during flush.
pub struct SstFileInfo<'a> {
pub sst_info_ref: &'a SstInfo,
pub file_meta: &'a FileMeta,
}

/// Extension hook for flush operations.
///
/// Implementations can be registered via the `Plugins` system:
/// ```ignore
/// use std::sync::Arc;
/// use common_base::Plugins;
/// use mito2::engine::flush_hook::{FlushHook, FlushHookRef};
///
/// plugins.insert(Arc::new(MyHook) as FlushHookRef);
/// ```
#[async_trait]
pub trait FlushHook: Send + Sync {
/// Called after SST files are written during flush.
///
/// - `files`: per-file metadata (SstInfo + FileMeta) for each SST written.
/// - `region_metadata`: provides the schema for column type information.
async fn on_sst_files_written(
&self,
region_id: RegionId,
region_metadata: &RegionMetadataRef,
files: &[SstFileInfo<'_>],
) {
let _ = (region_id, region_metadata, files);
}

/// Called after the region manifest is successfully updated.
async fn on_manifest_updated(
&self,
region_id: RegionId,
edit: &RegionEdit,
manifest_version: ManifestVersion,
) {
let _ = (region_id, edit, manifest_version);
}
}

pub type FlushHookRef = Arc<dyn FlushHook>;
Loading
Loading