diff --git a/src/query/storages/fuse/fuse/src/operations/fuse_sink.rs b/src/query/storages/fuse/fuse/src/operations/fuse_sink.rs index e8ac226e50079..e2a649c704a2a 100644 --- a/src/query/storages/fuse/fuse/src/operations/fuse_sink.rs +++ b/src/query/storages/fuse/fuse/src/operations/fuse_sink.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::any::Any; +use std::sync::mpsc::channel; use std::sync::Arc; +use std::thread; use async_trait::async_trait; use common_arrow::parquet::compression::CompressionOptions; @@ -51,6 +53,27 @@ struct BloomIndexState { location: Location, } +impl BloomIndexState { + pub fn try_create(block: DataBlock, location: Location) -> Result { + // write index + let bloom_index = BlockFilter::try_create(&[&block])?; + let index_block = bloom_index.filter_block; + let mut data = Vec::with_capacity(100 * 1024); + let index_block_schema = &bloom_index.filter_schema; + let (size, _) = serialize_data_blocks_with_compression( + vec![index_block], + index_block_schema, + &mut data, + CompressionOptions::Uncompressed, + )?; + Ok(Self { + data, + size, + location, + }) + } +} + enum State { None, NeedSerialize(DataBlock), @@ -168,33 +191,48 @@ impl Processor for FuseTableSink { let (block_location, block_id) = self.meta_locations.gen_block_location(); - let bloom_index_state = { - // write index - let bloom_index = BlockFilter::try_create(&[&block])?; - let index_block = bloom_index.filter_block; - let location = self.meta_locations.block_bloom_index_location(&block_id); - let mut data = Vec::with_capacity(100 * 1024); - let index_block_schema = &bloom_index.filter_schema; - let (size, _) = serialize_data_blocks_with_compression( - vec![index_block], - index_block_schema, - &mut data, - CompressionOptions::Uncompressed, - )?; - BloomIndexState { - data, - size, - location, + let (bloom_tx, bloom_rx) = channel(); + let (statistics_tx, statistics_rx) = channel(); + + let bloom_block = block.clone(); + let bloom_location = self.meta_locations.block_bloom_index_location(&block_id); + let _bloom_handler = thread::spawn(move || { + let bloom_index_state = + BloomIndexState::try_create(bloom_block, bloom_location); + if bloom_tx.send(bloom_index_state).is_err() { + return Err(ErrorCode::Internal("send bloom_index_state failed")); } - }; + Ok(()) + }); + + let statistics_block = block.clone(); + let statistics_location = block_location.0; + let _statistics_handler = thread::spawn(move || { + let block_statistics = BlockStatistics::from( + &statistics_block, + statistics_location, + cluster_stats, + ); + if statistics_tx.send(block_statistics).is_err() { + return Err(ErrorCode::Internal("send block_statistics failed")); + } + Ok(()) + }); - let block_statistics = - BlockStatistics::from(&block, block_location.0, cluster_stats)?; // we need a configuration of block size threshold here let mut data = Vec::with_capacity(100 * 1024 * 1024); let schema = block.schema().clone(); let (size, meta_data) = serialize_data_blocks(vec![block], &schema, &mut data)?; + let bloom_index_state = match bloom_rx.recv() { + Ok(bloom_index_state) => bloom_index_state?, + Err(_) => return Err(ErrorCode::Internal("build bloom_index_state failed")), + }; + let block_statistics = match statistics_rx.recv() { + Ok(block_statistics) => block_statistics?, + Err(_) => return Err(ErrorCode::Internal("build block_statistics failed")), + }; + self.state = State::Serialized { data, size,