diff --git a/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs b/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs index ba35cb97620..02a4bca1661 100644 --- a/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs +++ b/quickwit/quickwit-indexing/src/actors/log_publisher_impl.rs @@ -39,8 +39,16 @@ impl Handler for Publisher { _: DisconnectMergePlanner, _ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { + // Clear both Tantivy and Parquet planner mailboxes. Each Publisher + // instance only has one of these set (depending on which pipeline it + // serves), but clearing both is safe and avoids needing separate + // disconnect message types. info!("disconnecting merge planner mailbox"); self.merge_planner_mailbox_opt = None; + #[cfg(feature = "metrics")] + { + self.parquet_merge_planner_mailbox_opt = None; + } Ok(()) } } diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs index 96022da9f8c..9666c6eccb4 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/mod.rs @@ -27,6 +27,7 @@ mod parquet_doc_processor; mod parquet_indexer; mod parquet_merge_executor; pub(crate) mod parquet_merge_messages; +mod parquet_merge_pipeline; mod parquet_merge_planner; mod parquet_merge_split_downloader; mod parquet_packager; @@ -50,6 +51,7 @@ pub use parquet_doc_processor::{ pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch}; pub use parquet_merge_executor::ParquetMergeExecutor; pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits}; +pub use parquet_merge_pipeline::{ParquetMergePipeline, ParquetMergePipelineParams}; pub use parquet_merge_planner::ParquetMergePlanner; pub use parquet_merge_split_downloader::ParquetMergeSplitDownloader; pub use parquet_packager::{ParquetBatchForPackager, ParquetPackager, ParquetPackagerCounters}; diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs new file mode 100644 index 00000000000..e20037c34d9 --- /dev/null +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/parquet_merge_pipeline.rs @@ -0,0 +1,621 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Parquet merge pipeline supervisor. +//! +//! Spawns and supervises the merge actor chain: +//! +//! ```text +//! ParquetMergePlanner → MergeSchedulerService → ParquetMergeSplitDownloader +//! → ParquetMergeExecutor → ParquetUploader → Sequencer → Publisher +//! │ +//! (feedback to Planner) +//! ``` +//! +//! Follows the same pattern as the Tantivy [`MergePipeline`] but without +//! `doc_mapper` or Packager (Parquet merges are schema-agnostic and produce +//! ready-to-upload files). + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use quickwit_actors::{ + Actor, ActorContext, ActorExitStatus, ActorHandle, HEARTBEAT, Handler, Health, Inbox, Mailbox, + QueueCapacity, SpawnContext, Supervisable, +}; +use quickwit_common::KillSwitch; +use quickwit_common::pubsub::EventBroker; +use quickwit_common::temp_dir::TempDirectory; +use quickwit_parquet_engine::merge::policy::ParquetMergePolicy; +use quickwit_parquet_engine::split::ParquetSplitMetadata; +use quickwit_proto::metastore::MetastoreServiceClient; +use quickwit_storage::Storage; +use tokio::sync::Semaphore; +use tracing::{debug, error, info, instrument}; + +use super::parquet_merge_executor::ParquetMergeExecutor; +use super::parquet_merge_planner::{ParquetMergePlanner, RunFinalizeMergePolicyAndQuit}; +use super::parquet_merge_split_downloader::ParquetMergeSplitDownloader; +use super::{METRICS_PUBLISHER_NAME, ParquetUploader}; +use crate::actors::pipeline_shared::wait_duration_before_retry; +use crate::actors::publisher::DisconnectMergePlanner; +use crate::actors::{MergeSchedulerService, Publisher, Sequencer, UploaderType}; + +/// Limits concurrent Parquet merge pipeline spawns to avoid overwhelming the +/// metastore. This is a separate semaphore from the Tantivy merge pipeline's. +static SPAWN_PIPELINE_SEMAPHORE: Semaphore = Semaphore::const_new(10); + +pub const SUPERVISE_LOOP_INTERVAL: Duration = Duration::from_secs(1); + +/// Holds actor handles for health-checking and lifecycle management. +/// When `None` in the parent pipeline, no actors are running (pre-spawn or +/// post-terminate). The supervisor checks these on each `SuperviseLoop` tick. +struct ParquetMergePipelineHandles { + merge_planner: ActorHandle, + merge_split_downloader: ActorHandle, + merge_executor: ActorHandle, + merge_uploader: ActorHandle, + merge_publisher: ActorHandle, + next_check_for_progress: Instant, +} + +impl ParquetMergePipelineHandles { + /// Rate-limits progress checks to once per HEARTBEAT interval. + /// Without this, every supervision tick would check progress, which + /// is wasteful — actors only need to demonstrate liveness periodically. + fn should_check_for_progress(&mut self) -> bool { + let now = Instant::now(); + let check_for_progress = now > self.next_check_for_progress; + if check_for_progress { + self.next_check_for_progress = now + *HEARTBEAT; + } + check_for_progress + } +} + +#[derive(Debug)] +struct SuperviseLoop; + +#[derive(Clone, Copy, Debug, Default)] +struct Spawn { + retry_count: usize, +} + +/// Parquet merge pipeline supervisor. +/// +/// Spawns and supervises the merge actor chain. On actor failure, the entire +/// pipeline is killed and respawned after a backoff delay. On graceful +/// shutdown, in-flight merges drain to completion. +pub struct ParquetMergePipeline { + params: ParquetMergePipelineParams, + /// The planner's mailbox and inbox are created once and recycled across + /// pipeline restarts. This lets the publisher's feedback loop survive a + /// respawn — messages sent to the old planner's mailbox are delivered to + /// the new planner instance. + merge_planner_mailbox: Mailbox, + merge_planner_inbox: Inbox, + handles_opt: Option, + /// Child kill switch — killing this kills all actors in the pipeline + /// without affecting the supervisor itself. + kill_switch: KillSwitch, + /// Increments on each spawn. Used for log correlation. + generation: usize, + num_spawn_attempts: usize, + /// Immature splits passed to the planner on first spawn. On subsequent + /// spawns (after crash/respawn), the planner starts empty and picks up + /// new splits from the feedback loop. + initial_immature_splits_opt: Option>, + shutdown_initiated: bool, +} + +#[async_trait] +impl Actor for ParquetMergePipeline { + type ObservableState = (); + + fn observable_state(&self) {} + + fn name(&self) -> String { + "ParquetMergePipeline".to_string() + } + + /// Kicks off the pipeline by sending Spawn (which spawns all actors) + /// followed by SuperviseLoop (which starts periodic health checks). + async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { + self.handle(Spawn::default(), ctx).await?; + self.handle(SuperviseLoop, ctx).await?; + Ok(()) + } +} + +impl ParquetMergePipeline { + pub fn new( + params: ParquetMergePipelineParams, + initial_immature_splits_opt: Option>, + spawn_ctx: &SpawnContext, + ) -> Self { + let (merge_planner_mailbox, merge_planner_inbox) = spawn_ctx + .create_mailbox::( + "ParquetMergePlanner", + QueueCapacity::Bounded(1), + ); + Self { + params, + handles_opt: None, + kill_switch: KillSwitch::default(), + generation: 0, + num_spawn_attempts: 0, + merge_planner_inbox, + merge_planner_mailbox, + initial_immature_splits_opt, + shutdown_initiated: false, + } + } + + pub fn merge_planner_mailbox(&self) -> &Mailbox { + &self.merge_planner_mailbox + } + + fn supervisables(&self) -> Vec<&dyn Supervisable> { + let Some(handles) = &self.handles_opt else { + return Vec::new(); + }; + vec![ + &handles.merge_planner, + &handles.merge_split_downloader, + &handles.merge_executor, + &handles.merge_uploader, + &handles.merge_publisher, + ] + } + + /// Consolidates health from all supervised actors into a single verdict. + /// Any single actor failure makes the whole pipeline unhealthy (triggers + /// terminate + respawn). All actors exiting with Success means the pipeline + /// completed (e.g., after shutdown drain). + fn healthcheck(&self, check_for_progress: bool) -> Health { + let mut healthy_actors: Vec<&str> = Vec::new(); + let mut failure_or_unhealthy_actors: Vec<&str> = Vec::new(); + let mut success_actors: Vec<&str> = Vec::new(); + + for supervisable in self.supervisables() { + match supervisable.check_health(check_for_progress) { + Health::Healthy => { + healthy_actors.push(supervisable.name()); + } + Health::FailureOrUnhealthy => { + failure_or_unhealthy_actors.push(supervisable.name()); + } + Health::Success => { + success_actors.push(supervisable.name()); + } + } + } + if !failure_or_unhealthy_actors.is_empty() { + error!( + generation = self.generation, + healthy_actors = ?healthy_actors, + failed_or_unhealthy_actors = ?failure_or_unhealthy_actors, + success_actors = ?success_actors, + "parquet merge pipeline failed" + ); + return Health::FailureOrUnhealthy; + } + if healthy_actors.is_empty() { + info!( + generation = self.generation, + "parquet merge pipeline completed successfully" + ); + return Health::Success; + } + debug!( + generation = self.generation, + healthy_actors = ?healthy_actors, + success_actors = ?success_actors, + "parquet merge pipeline is running and healthy" + ); + Health::Healthy + } + + #[instrument(name="spawn_parquet_merge_pipeline", level="info", skip_all, fields(generation=self.generation))] + async fn spawn_pipeline(&mut self, ctx: &ActorContext) -> anyhow::Result<()> { + let _spawn_permit = ctx + .protect_future(SPAWN_PIPELINE_SEMAPHORE.acquire()) + .await + .expect("semaphore should not be closed"); + + self.num_spawn_attempts += 1; + self.kill_switch = ctx.kill_switch().child(); + + info!( + generation = self.generation, + root_dir = %self.params.indexing_directory.path().display(), + "spawning parquet merge pipeline" + ); + + let immature_splits = self.initial_immature_splits_opt.take().unwrap_or_default(); + + // Spawn actors bottom-up: each actor's constructor needs a mailbox + // for the actor below it in the chain, so we start from the publisher + // (bottom) and work up to the planner (top). + + // 1. Merge publisher — publishes merged splits to the metastore and feeds back + // ParquetNewSplits to the planner for further merging. + let merge_publisher = Publisher::new( + METRICS_PUBLISHER_NAME, + QueueCapacity::Unbounded, + self.params.metastore.clone(), + None, // No Tantivy planner + None, // No source + ) + .set_parquet_merge_planner_mailbox(self.merge_planner_mailbox.clone()); + + let (merge_publisher_mailbox, merge_publisher_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(merge_publisher); + + // 2. Sequencer — ensures merged splits are published in the order they were uploaded, even + // if uploads complete out of order. + let sequencer = Sequencer::new(merge_publisher_mailbox.clone()); + let (sequencer_mailbox, _sequencer_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(sequencer); + + // 3. Merge uploader + let merge_uploader = ParquetUploader::new( + UploaderType::MergeUploader, + self.params.metastore.clone(), + self.params.storage.clone(), + sequencer_mailbox, + self.params.max_concurrent_split_uploads, + ); + let (merge_uploader_mailbox, merge_uploader_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(merge_uploader); + + // 4. Merge executor + let merge_executor = ParquetMergeExecutor::new(merge_uploader_mailbox); + let (merge_executor_mailbox, merge_executor_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(merge_executor); + + // 5. Merge split downloader + let merge_split_downloader = ParquetMergeSplitDownloader::new( + self.params.indexing_directory.clone(), + self.params.storage.clone(), + merge_executor_mailbox, + ); + let (merge_split_downloader_mailbox, merge_split_downloader_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .spawn(merge_split_downloader); + + // 6. Merge planner — uses recycled mailbox/inbox so the publisher's feedback loop (which + // holds a clone of the planner mailbox) survives pipeline restarts without needing to be + // re-wired. + let merge_planner = ParquetMergePlanner::new( + immature_splits, + self.params.merge_policy.clone(), + merge_split_downloader_mailbox, + self.params.merge_scheduler_service.clone(), + ); + let (_, merge_planner_handle) = ctx + .spawn_actor() + .set_kill_switch(self.kill_switch.clone()) + .set_mailboxes( + self.merge_planner_mailbox.clone(), + self.merge_planner_inbox.clone(), + ) + .spawn(merge_planner); + + self.generation += 1; + self.handles_opt = Some(ParquetMergePipelineHandles { + merge_planner: merge_planner_handle, + merge_split_downloader: merge_split_downloader_handle, + merge_executor: merge_executor_handle, + merge_uploader: merge_uploader_handle, + merge_publisher: merge_publisher_handle, + next_check_for_progress: Instant::now() + *HEARTBEAT, + }); + Ok(()) + } + + /// Kills all actors in the pipeline immediately. Used when the health check + /// detects a failure — we tear everything down and schedule a respawn. + async fn terminate(&mut self) { + self.kill_switch.kill(); + if let Some(handles) = self.handles_opt.take() { + tokio::join!( + handles.merge_planner.kill(), + handles.merge_split_downloader.kill(), + handles.merge_executor.kill(), + handles.merge_uploader.kill(), + handles.merge_publisher.kill(), + ); + } + } + + async fn perform_health_check( + &mut self, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + let Some(handles) = self.handles_opt.as_mut() else { + return Ok(()); + }; + let check_for_progress = handles.should_check_for_progress(); + let health = self.healthcheck(check_for_progress); + match health { + Health::Healthy => {} + Health::FailureOrUnhealthy => { + self.terminate().await; + ctx.schedule_self_msg(*HEARTBEAT, Spawn { retry_count: 0 }); + } + Health::Success => { + info!("parquet merge pipeline success, shutting down"); + return Err(ActorExitStatus::Success); + } + } + Ok(()) + } +} + +#[async_trait] +impl Handler for ParquetMergePipeline { + type Reply = (); + + async fn handle( + &mut self, + supervise_loop_token: SuperviseLoop, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + self.perform_health_check(ctx).await?; + ctx.schedule_self_msg(SUPERVISE_LOOP_INTERVAL, supervise_loop_token); + Ok(()) + } +} + +/// Instructs the merge pipeline to finish pending merges and shut down. +/// +/// Reuses the same message type as the Tantivy merge pipeline for +/// consistency in the IndexingService shutdown path. +pub use crate::actors::FinishPendingMergesAndShutdownPipeline; + +#[async_trait] +impl Handler for ParquetMergePipeline { + type Reply = (); + + async fn handle( + &mut self, + _: FinishPendingMergesAndShutdownPipeline, + _ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + info!("shutdown parquet merge pipeline initiated"); + // Prevent respawn on failure from this point forward. + self.shutdown_initiated = true; + if let Some(handles) = &self.handles_opt { + // Two-phase graceful shutdown: + // + // Phase 1: Break the feedback loop so completed merges don't + // trigger new merge planning. Without this, the pipeline would + // never drain — each completed merge feeds back new splits that + // trigger more merges. + let _ = handles + .merge_publisher + .mailbox() + .send_message(DisconnectMergePlanner) + .await; + + // Phase 2: Run the finalize policy (merges cold-window stragglers + // with a lower merge factor), then the planner exits. Downstream + // actors drain naturally as their inboxes empty. + let _ = handles + .merge_planner + .mailbox() + .send_message(RunFinalizeMergePolicyAndQuit) + .await; + } + Ok(()) + } +} + +#[async_trait] +impl Handler for ParquetMergePipeline { + type Reply = (); + + async fn handle( + &mut self, + spawn: Spawn, + ctx: &ActorContext, + ) -> Result<(), ActorExitStatus> { + // Don't respawn after graceful shutdown was requested. + if self.shutdown_initiated { + return Ok(()); + } + // Don't spawn if actors are already running (duplicate Spawn message). + if self.handles_opt.is_some() { + return Ok(()); + } + if let Err(spawn_error) = self.spawn_pipeline(ctx).await { + let retry_delay = wait_duration_before_retry(spawn.retry_count); + error!( + error = ?spawn_error, + retry_count = spawn.retry_count, + retry_delay = ?retry_delay, + "error while spawning parquet merge pipeline, retrying" + ); + ctx.schedule_self_msg( + retry_delay, + Spawn { + retry_count: spawn.retry_count + 1, + }, + ); + } + Ok(()) + } +} + +/// Parameters for spawning a `ParquetMergePipeline`. +/// +/// Constructed by the IndexingService from `IndexConfig` + node-level settings. +/// All actors in the pipeline share these resources via `Arc`/`Clone`. +#[derive(Clone)] +pub struct ParquetMergePipelineParams { + /// Root temp directory for scratch files (downloads, merge output). + pub indexing_directory: TempDirectory, + /// Metastore client for staging/publishing merged splits and for + /// re-seeding the planner with immature splits on respawn. + pub metastore: MetastoreServiceClient, + /// Object storage for downloading input splits and uploading merge output. + pub storage: Arc, + /// Determines which splits to merge and when. Read from the index's + /// `parquet_merge_policy` config section. + pub merge_policy: Arc, + /// Node-wide merge scheduler — shared with Tantivy merge pipelines for + /// global concurrency control via a single semaphore. + pub merge_scheduler_service: Mailbox, + pub max_concurrent_split_uploads: usize, + pub event_broker: EventBroker, +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use quickwit_actors::{ActorExitStatus, Universe}; + use quickwit_common::temp_dir::TempDirectory; + use quickwit_parquet_engine::merge::policy::{ + ConstWriteAmplificationParquetMergePolicy, ParquetMergePolicyConfig, + }; + use quickwit_parquet_engine::split::{ParquetSplitId, ParquetSplitMetadata, TimeRange}; + use quickwit_proto::metastore::{MetastoreServiceClient, MockMetastoreService}; + + use super::*; + + fn make_pipeline_params(universe: &Universe) -> ParquetMergePipelineParams { + let mock_metastore = MockMetastoreService::new(); + let storage = Arc::new(quickwit_storage::RamStorage::default()); + let merge_policy = Arc::new(ConstWriteAmplificationParquetMergePolicy::new( + ParquetMergePolicyConfig { + merge_factor: 2, + max_merge_factor: 2, + max_merge_ops: 5, + target_split_size_bytes: 256 * 1024 * 1024, + maturation_period: Duration::from_secs(3600), + max_finalize_merge_operations: 3, + }, + )); + ParquetMergePipelineParams { + indexing_directory: TempDirectory::for_test(), + metastore: MetastoreServiceClient::from_mock(mock_metastore), + storage, + merge_policy, + merge_scheduler_service: universe.get_or_spawn_one(), + max_concurrent_split_uploads: 4, + event_broker: EventBroker::default(), + } + } + + fn make_split(split_id: &str) -> ParquetSplitMetadata { + ParquetSplitMetadata::metrics_builder() + .split_id(ParquetSplitId::new(split_id)) + .index_uid("test-index:00000000000000000000000001") + .partition_id(0) + .time_range(TimeRange::new(1000, 2000)) + .num_rows(100) + .size_bytes(1_000_000) + .sort_fields("metric_name|host|timestamp_secs/V2") + .window_start_secs(0) + .window_duration_secs(3600) + .build() + } + + #[tokio::test] + async fn test_pipeline_spawns_and_supervises() { + let universe = Universe::with_accelerated_time(); + let params = make_pipeline_params(&universe); + + let pipeline = ParquetMergePipeline::new(params, None, universe.spawn_ctx()); + let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); + + // Give the pipeline time to initialize and spawn actors. + universe.sleep(Duration::from_secs(2)).await; + + let observation = pipeline_handle.process_pending_and_observe().await; + assert_eq!( + observation.obs_type, + quickwit_actors::ObservationType::Alive + ); + + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_pipeline_shutdown_drain() { + let universe = Universe::with_accelerated_time(); + let params = make_pipeline_params(&universe); + + let pipeline = ParquetMergePipeline::new(params, None, universe.spawn_ctx()); + let (pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); + + // Let it initialize. + universe.sleep(Duration::from_secs(2)).await; + + // Initiate shutdown. + pipeline_mailbox + .send_message(FinishPendingMergesAndShutdownPipeline) + .await + .unwrap(); + + // The pipeline should eventually exit with Success. + let (exit_status, _) = pipeline_handle.join().await; + assert!( + matches!(exit_status, ActorExitStatus::Success), + "expected Success exit, got {:?}", + exit_status + ); + + universe.assert_quit().await; + } + + #[tokio::test] + async fn test_pipeline_accepts_initial_splits() { + let universe = Universe::with_accelerated_time(); + let params = make_pipeline_params(&universe); + + let initial_splits = Some(vec![make_split("s0"), make_split("s1")]); + let pipeline = ParquetMergePipeline::new(params, initial_splits, universe.spawn_ctx()); + let planner_mailbox = pipeline.merge_planner_mailbox().clone(); + let (pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); + + // Let it initialize. + universe.sleep(Duration::from_secs(2)).await; + + // The planner mailbox should be accessible. + assert!(!planner_mailbox.is_disconnected()); + + // Gracefully shut down before asserting quit. + pipeline_mailbox + .send_message(FinishPendingMergesAndShutdownPipeline) + .await + .unwrap(); + let (exit_status, _) = pipeline_handle.join().await; + assert!(matches!(exit_status, ActorExitStatus::Success)); + + universe.assert_quit().await; + } +} diff --git a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs index 7b8815a0700..dc56d129578 100644 --- a/quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs +++ b/quickwit/quickwit-indexing/src/actors/metrics_pipeline/publisher_impl.rs @@ -87,6 +87,20 @@ impl Handler for Publisher { info!("publish-metrics-splits"); suggest_truncate(ctx, &self.source_mailbox_opt, checkpoint_delta_opt).await; + // Feedback loop: notify the merge planner about all newly published + // splits — both ingest outputs and merge outputs — so it can plan + // further compaction. Infinite loops are prevented by the merge + // policy's maturity checks (max_merge_ops, target_split_size_bytes, + // maturation_period), not by filtering here. This matches the Tantivy + // publisher which sends NewSplits unconditionally. + if let Some(planner_mailbox) = &self.parquet_merge_planner_mailbox_opt + && !new_splits.is_empty() + { + let _ = ctx + .send_message(planner_mailbox, super::ParquetNewSplits { new_splits }) + .await; + } + if split_ids.is_empty() { self.counters.num_empty_splits += 1; } else if replaced_split_ids.is_empty() { diff --git a/quickwit/quickwit-indexing/src/actors/publisher.rs b/quickwit/quickwit-indexing/src/actors/publisher.rs index 26dd6694490..8c664202d42 100644 --- a/quickwit/quickwit-indexing/src/actors/publisher.rs +++ b/quickwit/quickwit-indexing/src/actors/publisher.rs @@ -40,6 +40,9 @@ pub struct Publisher { pub(crate) queue_capacity: QueueCapacity, pub(crate) metastore: MetastoreServiceClient, pub(crate) merge_planner_mailbox_opt: Option>, + #[cfg(feature = "metrics")] + pub(crate) parquet_merge_planner_mailbox_opt: + Option>, pub(crate) source_mailbox_opt: Option>, pub(crate) counters: PublisherCounters, } @@ -57,10 +60,24 @@ impl Publisher { queue_capacity, metastore, merge_planner_mailbox_opt, + #[cfg(feature = "metrics")] + parquet_merge_planner_mailbox_opt: None, source_mailbox_opt, counters: PublisherCounters::default(), } } + + /// Sets the Parquet merge planner mailbox for merge feedback. + /// Post-construction setter because the Publisher is created before the + /// planner mailbox is available (bottom-up actor spawn order). + #[cfg(feature = "metrics")] + pub fn set_parquet_merge_planner_mailbox( + mut self, + mailbox: Mailbox, + ) -> Self { + self.parquet_merge_planner_mailbox_opt = Some(mailbox); + self + } } pub(crate) fn serialize_checkpoint_delta(