diff --git a/.cargo/audit.toml b/.cargo/audit.toml index be411a3a14f96..2b2ba124f499d 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -55,4 +55,8 @@ ignore = [ "RUSTSEC-2024-0421", # gix-worktree-state nonexclusive checkout sets executable files world-writable "RUSTSEC-2025-0001", + # `fast-float`: Segmentation fault due to lack of bound check + "RUSTSEC-2025-0003", + # ssl::select_next_proto use after free + "RUSTSEC-2025-0004", ] diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index 767bdb255b226..87d8fe21d3632 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -2701,6 +2701,15 @@ pub struct CacheConfig { )] pub block_meta_count: u64, + /// Max number of **segment** which all of its block meta will be cached. + /// Note that a segment may contain multiple block metadata entries. + #[clap( + long = "cache-segment-block-metas-count", + value_name = "VALUE", + default_value = "0" + )] + pub segment_block_metas_count: u64, + /// Max number of cached table statistic meta #[clap( long = "cache-table-meta-statistic-count", @@ -2999,6 +3008,7 @@ mod cache_config_converters { table_meta_snapshot_count: value.table_meta_snapshot_count, table_meta_segment_bytes: value.table_meta_segment_bytes, block_meta_count: value.block_meta_count, + segment_block_metas_count: value.segment_block_metas_count, table_meta_statistic_count: value.table_meta_statistic_count, enable_table_index_bloom: value.enable_table_bloom_index_cache, table_bloom_index_meta_count: value.table_bloom_index_meta_count, @@ -3043,6 +3053,7 @@ mod cache_config_converters { table_data_deserialized_data_bytes: value.table_data_deserialized_data_bytes, table_data_deserialized_memory_ratio: value.table_data_deserialized_memory_ratio, table_meta_segment_count: None, + segment_block_metas_count: value.segment_block_metas_count, } } } diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index cf6dc8fb847f0..6aa4045a0f17b 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -536,6 +536,10 @@ pub struct CacheConfig { /// Max number of cached table block meta pub block_meta_count: u64, + /// Max number of **segment** which all of its block meta will be cached. + /// Note that a segment may contain multiple block metadata entries. + pub segment_block_metas_count: u64, + /// Max number of cached table segment pub table_meta_statistic_count: u64, @@ -683,6 +687,7 @@ impl Default for CacheConfig { table_meta_snapshot_count: 256, table_meta_segment_bytes: 1073741824, block_meta_count: 0, + segment_block_metas_count: 0, table_meta_statistic_count: 256, enable_table_index_bloom: true, table_bloom_index_meta_count: 3000, diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 7e20e16c1e523..0c0918147e311 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -15,6 +15,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'cache' | 'inverted_index_filter_memory_ratio' | '0' | '' | | 'cache' | 'inverted_index_filter_size' | '2147483648' | '' | | 'cache' | 'inverted_index_meta_count' | '3000' | '' | +| 'cache' | 'segment_block_metas_count' | '0' | '' | | 'cache' | 'table_bloom_index_filter_count' | '0' | '' | | 'cache' | 'table_bloom_index_filter_size' | '2147483648' | '' | | 'cache' | 'table_bloom_index_meta_count' | '3000' | '' | diff --git a/src/query/storages/common/cache/src/caches.rs b/src/query/storages/common/cache/src/caches.rs index 3a4f900f9e8a6..76b8ce7100c9e 100644 --- a/src/query/storages/common/cache/src/caches.rs +++ b/src/query/storages/common/cache/src/caches.rs @@ -35,7 +35,15 @@ use crate::InMemoryLruCache; /// In memory object cache of SegmentInfo pub type CompactSegmentInfoCache = InMemoryLruCache; -pub type BlockMetaCache = InMemoryLruCache>>; +/// In-memory cache for all the block metadata of individual segments. +/// +/// Note that this cache may be memory-intensive, as each item of this cache +/// contains ALL the BlockMeta of a segment, for well-compacted segment, the +/// number of BlockMeta might be 1000 ~ 2000. +pub type SegmentBlockMetasCache = InMemoryLruCache>>; + +/// In-memory cache of individual BlockMeta. +pub type BlockMetaCache = InMemoryLruCache; /// In memory object cache of TableSnapshot pub type TableSnapshotCache = InMemoryLruCache; @@ -95,9 +103,9 @@ impl CachedObject for TableSnapshot { } impl CachedObject>> for Vec> { - type Cache = BlockMetaCache; + type Cache = SegmentBlockMetasCache; fn cache() -> Option { - CacheManager::instance().get_block_meta_cache() + CacheManager::instance().get_segment_block_metas_cache() } } @@ -187,6 +195,15 @@ impl From>> for CacheValue>> { } } +impl From for CacheValue { + fn from(value: BlockMeta) -> Self { + CacheValue { + inner: Arc::new(value), + mem_bytes: 0, + } + } +} + impl From for CacheValue { fn from(value: TableSnapshot) -> Self { CacheValue { diff --git a/src/query/storages/common/cache/src/lib.rs b/src/query/storages/common/cache/src/lib.rs index 0a7378134b714..1b04a82e9f24c 100644 --- a/src/query/storages/common/cache/src/lib.rs +++ b/src/query/storages/common/cache/src/lib.rs @@ -26,6 +26,7 @@ pub use cache::Unit; pub use caches::BlockMetaCache; pub use caches::CacheValue; pub use caches::CachedObject; +pub use caches::SegmentBlockMetasCache; pub use caches::SizedColumnArray; pub use manager::CacheManager; pub use providers::DiskCacheError; diff --git a/src/query/storages/common/cache/src/manager.rs b/src/query/storages/common/cache/src/manager.rs index c70755ad84348..a651baacd9bc1 100644 --- a/src/query/storages/common/cache/src/manager.rs +++ b/src/query/storages/common/cache/src/manager.rs @@ -34,6 +34,7 @@ use crate::caches::FileMetaDataCache; use crate::caches::InvertedIndexFileCache; use crate::caches::InvertedIndexMetaCache; use crate::caches::PrunePartitionsCache; +use crate::caches::SegmentBlockMetasCache; use crate::caches::TableSnapshotCache; use crate::caches::TableSnapshotStatisticCache; use crate::InMemoryLruCache; @@ -78,6 +79,7 @@ pub struct CacheManager { parquet_meta_data_cache: CacheSlot, table_data_cache: CacheSlot, in_memory_table_data_cache: CacheSlot, + segment_block_metas_cache: CacheSlot, block_meta_cache: CacheSlot, } @@ -151,6 +153,7 @@ impl CacheManager { table_statistic_cache: CacheSlot::new(None), table_data_cache, in_memory_table_data_cache, + segment_block_metas_cache: CacheSlot::new(None), block_meta_cache: CacheSlot::new(None), })); } else { @@ -201,8 +204,14 @@ impl CacheManager { DEFAULT_PARQUET_META_DATA_CACHE_ITEMS, ); + let segment_block_metas_cache = Self::new_items_cache_slot( + MEMORY_CACHE_SEGMENT_BLOCK_METAS, + config.block_meta_count as usize, + ); + let block_meta_cache = Self::new_items_cache_slot( MEMORY_CACHE_BLOCK_META, + // TODO replace this config config.block_meta_count as usize, ); @@ -217,8 +226,9 @@ impl CacheManager { table_statistic_cache, table_data_cache, in_memory_table_data_cache, - block_meta_cache, + segment_block_metas_cache, parquet_meta_data_cache, + block_meta_cache, })); } @@ -270,6 +280,9 @@ impl CacheManager { MEMORY_CACHE_TABLE_SNAPSHOT => { Self::set_items_capacity(&self.table_snapshot_cache, new_capacity, name); } + MEMORY_CACHE_SEGMENT_BLOCK_METAS => { + Self::set_items_capacity(&self.segment_block_metas_cache, new_capacity, name); + } MEMORY_CACHE_BLOCK_META => { Self::set_items_capacity(&self.block_meta_cache, new_capacity, name); } @@ -311,6 +324,10 @@ impl CacheManager { } } + pub fn get_segment_block_metas_cache(&self) -> Option { + self.segment_block_metas_cache.get() + } + pub fn get_block_meta_cache(&self) -> Option { self.block_meta_cache.get() } @@ -426,4 +443,6 @@ const MEMORY_CACHE_BLOOM_INDEX_FILTER: &str = "memory_cache_bloom_index_filter"; const MEMORY_CACHE_COMPACT_SEGMENT_INFO: &str = "memory_cache_compact_segment_info"; const MEMORY_CACHE_TABLE_STATISTICS: &str = "memory_cache_table_statistics"; const MEMORY_CACHE_TABLE_SNAPSHOT: &str = "memory_cache_table_snapshot"; +const MEMORY_CACHE_SEGMENT_BLOCK_METAS: &str = "memory_cache_segment_block_metas"; + const MEMORY_CACHE_BLOCK_META: &str = "memory_cache_block_meta"; diff --git a/src/query/storages/fuse/src/operations/replace.rs b/src/query/storages/fuse/src/operations/replace.rs index c05cb21c7d42a..b68a461d6e88a 100644 --- a/src/query/storages/fuse/src/operations/replace.rs +++ b/src/query/storages/fuse/src/operations/replace.rs @@ -29,7 +29,7 @@ use rand::prelude::SliceRandom; use crate::io::BlockBuilder; use crate::io::ReadSettings; use crate::operations::mutation::SegmentIndex; -use crate::operations::replace_into::MergeIntoOperationAggregator; +use crate::operations::replace_into::ReplaceIntoOperationAggregator; use crate::FuseTable; impl FuseTable { @@ -102,7 +102,7 @@ impl FuseTable { let read_settings = ReadSettings::from_ctx(&ctx)?; let mut items = Vec::with_capacity(num_partition); for chunk_of_segment_locations in chunks { - let item = MergeIntoOperationAggregator::try_create( + let item = ReplaceIntoOperationAggregator::try_create( ctx.clone(), on_conflicts.clone(), bloom_filter_column_indexes.clone(), diff --git a/src/query/storages/fuse/src/operations/replace_into/meta/mod.rs b/src/query/storages/fuse/src/operations/replace_into/meta/mod.rs index cf03e3431c840..1260a7c8d2f5b 100644 --- a/src/query/storages/fuse/src/operations/replace_into/meta/mod.rs +++ b/src/query/storages/fuse/src/operations/replace_into/meta/mod.rs @@ -12,6 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod merge_into_operation_meta; +mod replace_into_operation_meta; -pub use merge_into_operation_meta::*; +pub use replace_into_operation_meta::*; diff --git a/src/query/storages/fuse/src/operations/replace_into/meta/merge_into_operation_meta.rs b/src/query/storages/fuse/src/operations/replace_into/meta/replace_into_operation_meta.rs similarity index 76% rename from src/query/storages/fuse/src/operations/replace_into/meta/merge_into_operation_meta.rs rename to src/query/storages/fuse/src/operations/replace_into/meta/replace_into_operation_meta.rs index 080db8f882f69..d6b11a1c9810a 100644 --- a/src/query/storages/fuse/src/operations/replace_into/meta/merge_into_operation_meta.rs +++ b/src/query/storages/fuse/src/operations/replace_into/meta/replace_into_operation_meta.rs @@ -19,14 +19,8 @@ use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_expression::Scalar; -// This mod need to be refactored, since it not longer aiming to be -// used in the implementation of `MERGE INTO` statement in the future. -// -// unfortunately, distributed `replace-into` is being implemented in parallel, -// to avoid the potential heavy merge conflicts, the refactoring is postponed. - #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] -pub enum MergeIntoOperation { +pub enum ReplaceIntoOperation { Delete(Vec), None, } @@ -43,8 +37,8 @@ pub struct DeletionByColumn { pub bloom_hashes: Vec, } -#[typetag::serde(name = "merge_into_operation_meta")] -impl BlockMetaInfo for MergeIntoOperation { +#[typetag::serde(name = "replace_into_operation_meta")] +impl BlockMetaInfo for ReplaceIntoOperation { fn equals(&self, info: &Box) -> bool { Self::downcast_ref_from(info).is_some_and(|other| self == other) } @@ -54,16 +48,16 @@ impl BlockMetaInfo for MergeIntoOperation { } } -impl TryFrom for MergeIntoOperation { +impl TryFrom for ReplaceIntoOperation { type Error = ErrorCode; fn try_from(value: DataBlock) -> Result { let meta = value.get_owned_meta().ok_or_else(|| { ErrorCode::Internal( - "convert MergeIntoOperation from data block failed, no block meta found", + "convert ReplaceIntoOperation from data block failed, no block meta found", ) })?; - MergeIntoOperation::downcast_from(meta).ok_or_else(|| { + ReplaceIntoOperation::downcast_from(meta).ok_or_else(|| { ErrorCode::Internal( "downcast block meta to MutationIntoOperation failed, type mismatch", ) diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/mod.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/mod.rs index 1763ea1fb74ba..bef03acda7b47 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/mod.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/mod.rs @@ -14,11 +14,11 @@ mod column_hash; mod deletion_accumulator; -mod merge_into_mutator; -mod mutator_replace_into; +mod replace_into_mutator; +mod replace_into_operation_agg; pub use column_hash::row_hash_of_columns; pub use deletion_accumulator::BlockDeletionKeys; pub use deletion_accumulator::DeletionAccumulator; -pub use merge_into_mutator::MergeIntoOperationAggregator; -pub use mutator_replace_into::ReplaceIntoMutator; +pub use replace_into_mutator::ReplaceIntoMutator; +pub use replace_into_operation_agg::ReplaceIntoOperationAggregator; diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/mutator_replace_into.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_mutator.rs similarity index 96% rename from src/query/storages/fuse/src/operations/replace_into/mutator/mutator_replace_into.rs rename to src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_mutator.rs index 1196fc5d63f5e..d2ae4e8dd9dc7 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/mutator_replace_into.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_mutator.rs @@ -46,14 +46,14 @@ use databend_storages_common_table_meta::meta::ColumnStatistics; use log::info; use crate::operations::replace_into::meta::DeletionByColumn; -use crate::operations::replace_into::meta::MergeIntoOperation; +use crate::operations::replace_into::meta::ReplaceIntoOperation; use crate::operations::replace_into::meta::UniqueKeyDigest; use crate::operations::replace_into::mutator::column_hash::row_hash_of_columns; use crate::operations::replace_into::mutator::column_hash::RowScalarValue; // Replace is somehow a simplified merge_into, which // - do insertion for "matched" branch -// - update for "not-matched" branch (by sending MergeIntoOperation to downstream) +// - update for "not-matched" branch (by sending ReplaceIntoOperation to downstream) pub struct ReplaceIntoMutator { on_conflict_fields: Vec, table_range_index: HashMap, @@ -100,7 +100,7 @@ enum ColumnHash { } impl ReplaceIntoMutator { - pub fn process_input_block(&mut self, data_block: &DataBlock) -> Result { + pub fn process_input_block(&mut self, data_block: &DataBlock) -> Result { // pruning rows by using table level range index // rows that definitely have no conflict will be removed metrics_inc_replace_original_row_number(data_block.num_rows() as u64); @@ -111,10 +111,10 @@ impl ReplaceIntoMutator { if row_number_after_pruning == 0 { info!("(replace-into) all rows are append-only"); - return Ok(MergeIntoOperation::None); + return Ok(ReplaceIntoOperation::None); } - let merge_into_operation = if let Some(partitioner) = &self.partitioner { + let replace_into_operation = if let Some(partitioner) = &self.partitioner { // if table has cluster keys; we partition the input data block by left most column of cluster keys let partitions = partitioner.partition(data_block)?; metrics_inc_replace_partition_number(partitions.len() as u64); @@ -137,12 +137,12 @@ impl ReplaceIntoMutator { } }) .collect(); - MergeIntoOperation::Delete(vs) + ReplaceIntoOperation::Delete(vs) } else { // otherwise, we just build a single delete action - self.build_merge_into_operation(&data_block_may_have_conflicts)? + self.build_replace_into_operation(&data_block_may_have_conflicts)? }; - Ok(merge_into_operation) + Ok(replace_into_operation) } // filter out rows that definitely have no conflict, by using table level range index @@ -171,7 +171,10 @@ impl ReplaceIntoMutator { data_block.clone().filter_with_bitmap(&bitmap) } - fn build_merge_into_operation(&mut self, data_block: &DataBlock) -> Result { + fn build_replace_into_operation( + &mut self, + data_block: &DataBlock, + ) -> Result { let num_rows = data_block.num_rows(); let column_values = on_conflict_key_column_values(&self.on_conflict_fields, data_block); @@ -183,7 +186,7 @@ impl ReplaceIntoMutator { key_hashes, bloom_hashes: vec![], }; - Ok(MergeIntoOperation::Delete(vec![delete_action])) + Ok(ReplaceIntoOperation::Delete(vec![delete_action])) } ColumnHash::Conflict(conflict_row_idx) => { let conflict_description = { diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs similarity index 92% rename from src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs rename to src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs index b39145f7dcde3..0e2cf919db001 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs @@ -44,6 +44,9 @@ use databend_common_metrics::storage::*; use databend_common_sql::evaluator::BlockOperator; use databend_common_sql::executor::physical_plans::OnConflictField; use databend_common_sql::StreamContext; +use databend_storages_common_cache::BlockMetaCache; +use databend_storages_common_cache::CacheAccessor; +use databend_storages_common_cache::CacheManager; use databend_storages_common_cache::LoadParams; use databend_storages_common_index::filters::Filter; use databend_storages_common_index::filters::Xor8Filter; @@ -73,7 +76,7 @@ use crate::operations::mutation::BlockIndex; use crate::operations::mutation::SegmentIndex; use crate::operations::read_block; use crate::operations::replace_into::meta::DeletionByColumn; -use crate::operations::replace_into::meta::MergeIntoOperation; +use crate::operations::replace_into::meta::ReplaceIntoOperation; use crate::operations::replace_into::meta::UniqueKeyDigest; use crate::operations::replace_into::mutator::row_hash_of_columns; use crate::operations::replace_into::mutator::DeletionAccumulator; @@ -101,15 +104,17 @@ struct AggregationContext { io_request_semaphore: Arc, // generate stream columns if necessary stream_ctx: Option, + + block_meta_cache: Option, } // Apply MergeIntoOperations to segments -pub struct MergeIntoOperationAggregator { +pub struct ReplaceIntoOperationAggregator { deletion_accumulator: DeletionAccumulator, aggregation_ctx: Arc, } -impl MergeIntoOperationAggregator { +impl ReplaceIntoOperationAggregator { #[allow(clippy::too_many_arguments)] // TODO fix this pub fn try_create( ctx: Arc, @@ -210,21 +215,22 @@ impl MergeIntoOperationAggregator { block_builder, io_request_semaphore, stream_ctx, + block_meta_cache: CacheManager::instance().get_block_meta_cache(), }), }) } } // aggregate mutations (currently, deletion only) -impl MergeIntoOperationAggregator { +impl ReplaceIntoOperationAggregator { #[async_backtrace::framed] - pub async fn accumulate(&mut self, merge_into_operation: MergeIntoOperation) -> Result<()> { + pub async fn accumulate(&mut self, replace_into_operation: ReplaceIntoOperation) -> Result<()> { let aggregation_ctx = &self.aggregation_ctx; metrics_inc_replace_number_accumulated_merge_action(); let start = Instant::now(); - match merge_into_operation { - MergeIntoOperation::Delete(partitions) => { + match replace_into_operation { + ReplaceIntoOperation::Delete(partitions) => { for (segment_index, (path, ver)) in &aggregation_ctx.segment_locations { // segment level let load_param = LoadParams { @@ -280,7 +286,7 @@ impl MergeIntoOperationAggregator { } } } - MergeIntoOperation::None => {} + ReplaceIntoOperation::None => {} } metrics_inc_replace_accumulated_merge_action_time_ms(start.elapsed().as_millis() as u64); @@ -289,9 +295,11 @@ impl MergeIntoOperationAggregator { } // apply the mutations and generate mutation log -impl MergeIntoOperationAggregator { +impl ReplaceIntoOperationAggregator { #[async_backtrace::framed] pub async fn apply(&mut self) -> Result> { + let block_meta_cache = &self.aggregation_ctx.block_meta_cache; + metrics_inc_replace_number_apply_deletion(); // track number of segments and blocks after pruning (per merge action application) @@ -318,7 +326,7 @@ impl MergeIntoOperationAggregator { let mut mutation_log_handlers = Vec::new(); let mut num_rows_mutated = 0; for (segment_idx, block_deletion) in self.deletion_accumulator.deletions.drain() { - let (path, ver) = self + let (segment_path, ver) = self .aggregation_ctx .segment_locations .get(&segment_idx) @@ -330,19 +338,41 @@ impl MergeIntoOperationAggregator { })?; let load_param = LoadParams { - location: path.clone(), + location: segment_path.clone(), len_hint: None, ver: *ver, put_cache: true, }; - let compact_segment_info = aggregation_ctx.segment_reader.read(&load_param).await?; - let segment_info: SegmentInfo = compact_segment_info.try_into()?; + // Retain SegmentInfo to avoid repeatedly extracting it from CompactSegmentInfo later. + let mut opt_segment_info: Option = None; for (block_index, keys) in block_deletion { + let block_cache_key = format!("{segment_path}-{block_index}"); + let block_meta = match block_meta_cache.get(&block_cache_key) { + Some(block_meta) => block_meta, + None => { + let block_meta = if let Some(segment_info) = &opt_segment_info { + segment_info.blocks[block_index].clone() + } else { + let compact_segment_info = + aggregation_ctx.segment_reader.read(&load_param).await?; + let segment_info: SegmentInfo = compact_segment_info.try_into()?; + let block_meta = segment_info.blocks[block_index].clone(); + opt_segment_info = Some(segment_info); + block_meta + }; + // A query node typically processes only a subset of the BlockMeta in a given segment. + // Therefore, even though all BlockMeta of a segment are available here, not all are populated into the cache. + block_meta_cache.insert(block_cache_key, block_meta.as_ref().clone()); + block_meta + } + }; + let permit = acquire_task_permit(aggregation_ctx.io_request_semaphore.clone()).await?; - let block_meta = segment_info.blocks[block_index].clone(); + + // let block_meta = segment_info.blocks[block_index].clone(); let aggregation_ctx = aggregation_ctx.clone(); num_rows_mutated += block_meta.row_count; // self.aggregation_ctx. @@ -616,7 +646,7 @@ impl AggregationContext { if let Some(stats) = column_stats { let max = stats.max(); let min = stats.min(); - std::cmp::min(key_max, max) >= std::cmp::max(key_min,min) + std::cmp::min(key_max, max) >= std::cmp::max(key_min, min) || // coincide overlap (max == key_max && min == key_min) } else { @@ -642,22 +672,22 @@ impl AggregationContext { let reader = reader.clone(); GlobalIORuntime::instance() .spawn(async move { - let column_chunks = merged_io_read_result.columns_chunks()?; - reader.deserialize_chunks( - block_meta_ptr.location.0.as_str(), - block_meta_ptr.row_count as usize, - &block_meta_ptr.compression, - &block_meta_ptr.col_metas, - column_chunks, - &storage_format, - ) - }) + let column_chunks = merged_io_read_result.columns_chunks()?; + reader.deserialize_chunks( + block_meta_ptr.location.0.as_str(), + block_meta_ptr.row_count as usize, + &block_meta_ptr.compression, + &block_meta_ptr.col_metas, + column_chunks, + &storage_format, + ) + }) .await .map_err(|e| { ErrorCode::Internal( "unexpected, failed to join aggregation context read block tasks for replace into.", ) - .add_message_back(e.to_string()) + .add_message_back(e.to_string()) })? } diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/mod.rs b/src/query/storages/fuse/src/operations/replace_into/processors/mod.rs index 4fabde1b699b6..8d9cbdecdf442 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/mod.rs @@ -17,7 +17,7 @@ mod processor_broadcast; mod processor_replace_into; mod processor_unbranched_replace_into; -mod transform_merge_into_mutation_aggregator; +mod transform_replace_into_mutation_aggregator; pub use processor_broadcast::BroadcastProcessor; pub use processor_replace_into::ReplaceIntoProcessor; diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs b/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs index bb6e1ae700c70..5ef2f2c24625d 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs @@ -49,11 +49,11 @@ pub struct ReplaceIntoProcessor { // stage data blocks input_port: Arc, - output_port_merge_into_action: Arc, + output_port_replace_into_action: Arc, output_port_append_data: Arc, input_data: Option, - output_data_merge_into_action: Option, + output_data_replace_into_action: Option, output_data_append: Option, target_table_empty: bool, @@ -83,16 +83,16 @@ impl ReplaceIntoProcessor { table_range_idx, )?; let input_port = InputPort::create(); - let output_port_merge_into_action = OutputPort::create(); + let output_port_replace_into_action = OutputPort::create(); let output_port_append_data = OutputPort::create(); Ok(Self { replace_into_mutator, input_port, - output_port_merge_into_action, + output_port_replace_into_action, output_port_append_data, input_data: None, - output_data_merge_into_action: None, + output_data_replace_into_action: None, output_data_append: None, target_table_empty, delete_when, @@ -109,12 +109,12 @@ impl ReplaceIntoProcessor { #[allow(dead_code)] pub fn into_pipe_item(self) -> PipeItem { let input = self.input_port.clone(); - let output_port_merge_into_action = self.output_port_merge_into_action.clone(); + let output_port_replace_into_action = self.output_port_replace_into_action.clone(); let output_port_append_data = self.output_port_append_data.clone(); let processor_ptr = ProcessorPtr::create(Box::new(self)); PipeItem::create(processor_ptr, vec![input], vec![ output_port_append_data, - output_port_merge_into_action, + output_port_replace_into_action, ]) } } @@ -131,10 +131,10 @@ impl Processor for ReplaceIntoProcessor { fn event(&mut self) -> Result { let finished = self.input_port.is_finished() && self.output_data_append.is_none() - && self.output_data_merge_into_action.is_none(); + && self.output_data_replace_into_action.is_none(); if finished { - self.output_port_merge_into_action.finish(); + self.output_port_replace_into_action.finish(); self.output_port_append_data.finish(); return Ok(Event::Finished); } @@ -147,9 +147,9 @@ impl Processor for ReplaceIntoProcessor { } } - if self.output_port_merge_into_action.can_push() { - if let Some(data) = self.output_data_merge_into_action.take() { - self.output_port_merge_into_action.push_data(Ok(data)); + if self.output_port_replace_into_action.can_push() { + if let Some(data) = self.output_data_replace_into_action.take() { + self.output_port_replace_into_action.push_data(Ok(data)); pushed_something = true; } } @@ -162,7 +162,8 @@ impl Processor for ReplaceIntoProcessor { } if self.input_port.has_data() { - if self.output_data_append.is_none() && self.output_data_merge_into_action.is_none() + if self.output_data_append.is_none() + && self.output_data_replace_into_action.is_none() { // no pending data (being sent to down streams) self.input_data = Some(self.input_port.pull_data().unwrap()?); @@ -207,12 +208,12 @@ impl Processor for ReplaceIntoProcessor { .collect::>(); data_block = data_block.project(&projections); }; - let merge_into_action = self.replace_into_mutator.process_input_block(&data_block)?; + let replace_into_action = self.replace_into_mutator.process_input_block(&data_block)?; metrics_inc_replace_process_input_block_time_ms(start.elapsed().as_millis() as u64); metrics_inc_replace_block_number_input(1); if !self.target_table_empty { - self.output_data_merge_into_action = - Some(DataBlock::empty_with_meta(Box::new(merge_into_action))); + self.output_data_replace_into_action = + Some(DataBlock::empty_with_meta(Box::new(replace_into_action))); } if all_delete { diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs b/src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs index 3037d649af7d0..037f065cfb8b7 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/processor_unbranched_replace_into.rs @@ -43,10 +43,10 @@ pub struct UnbranchedReplaceIntoProcessor { // stage data blocks input_port: Arc, - output_port_merge_into_action: Arc, + output_port_replace_into_action: Arc, input_data: Option, - output_data_merge_into_action: Option, + output_data_replace_into_action: Option, target_table_empty: bool, delete_column: Option, @@ -74,14 +74,14 @@ impl UnbranchedReplaceIntoProcessor { table_range_idx, )?; let input_port = InputPort::create(); - let output_port_merge_into_action = OutputPort::create(); + let output_port_replace_into_action = OutputPort::create(); Ok(Self { replace_into_mutator, input_port, - output_port_merge_into_action, + output_port_replace_into_action, input_data: None, - output_data_merge_into_action: None, + output_data_replace_into_action: None, target_table_empty, delete_column, }) @@ -96,10 +96,10 @@ impl UnbranchedReplaceIntoProcessor { #[allow(dead_code)] pub fn into_pipe_item(self) -> PipeItem { let input = self.input_port.clone(); - let output_port_merge_into_action = self.output_port_merge_into_action.clone(); + let output_port_replace_into_action = self.output_port_replace_into_action.clone(); let processor_ptr = ProcessorPtr::create(Box::new(self)); PipeItem::create(processor_ptr, vec![input], vec![ - output_port_merge_into_action, + output_port_replace_into_action, ]) } } @@ -115,17 +115,17 @@ impl Processor for UnbranchedReplaceIntoProcessor { } fn event(&mut self) -> Result { let finished = - self.input_port.is_finished() && self.output_data_merge_into_action.is_none(); + self.input_port.is_finished() && self.output_data_replace_into_action.is_none(); if finished { - self.output_port_merge_into_action.finish(); + self.output_port_replace_into_action.finish(); return Ok(Event::Finished); } let mut pushed_something = false; - if self.output_port_merge_into_action.can_push() { - if let Some(data) = self.output_data_merge_into_action.take() { - self.output_port_merge_into_action.push_data(Ok(data)); + if self.output_port_replace_into_action.can_push() { + if let Some(data) = self.output_data_replace_into_action.take() { + self.output_port_replace_into_action.push_data(Ok(data)); pushed_something = true; } } @@ -138,7 +138,7 @@ impl Processor for UnbranchedReplaceIntoProcessor { } if self.input_port.has_data() { - if self.output_data_merge_into_action.is_none() { + if self.output_data_replace_into_action.is_none() { // no pending data (being sent to down streams) self.input_data = Some(self.input_port.pull_data().unwrap()?); Ok(Event::Sync) @@ -163,11 +163,11 @@ impl Processor for UnbranchedReplaceIntoProcessor { .collect::>(); data_block = data_block.project(&projections); } - let merge_into_action = self.replace_into_mutator.process_input_block(&data_block)?; + let replace_into_action = self.replace_into_mutator.process_input_block(&data_block)?; metrics_inc_replace_process_input_block_time_ms(start.elapsed().as_millis() as u64); if !self.target_table_empty { - self.output_data_merge_into_action = - Some(DataBlock::empty_with_meta(Box::new(merge_into_action))); + self.output_data_replace_into_action = + Some(DataBlock::empty_with_meta(Box::new(replace_into_action))); } return Ok(()); } diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/replace_into/processors/transform_replace_into_mutation_aggregator.rs similarity index 80% rename from src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs rename to src/query/storages/fuse/src/operations/replace_into/processors/transform_replace_into_mutation_aggregator.rs index be014c6ddee72..9bc03482b7a1e 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/transform_replace_into_mutation_aggregator.rs @@ -21,18 +21,18 @@ use databend_common_pipeline_core::PipeItem; use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransform; use databend_common_pipeline_transforms::processors::AsyncAccumulatingTransformer; -use crate::operations::replace_into::meta::MergeIntoOperation; -use crate::operations::replace_into::mutator::MergeIntoOperationAggregator; +use crate::operations::replace_into::meta::ReplaceIntoOperation; +use crate::operations::replace_into::mutator::ReplaceIntoOperationAggregator; #[async_trait::async_trait] -impl AsyncAccumulatingTransform for MergeIntoOperationAggregator { - const NAME: &'static str = "MergeIntoMutationAggregator"; +impl AsyncAccumulatingTransform for ReplaceIntoOperationAggregator { + const NAME: &'static str = "ReplaceIntoMutationAggregator"; #[async_backtrace::framed] async fn transform(&mut self, data: DataBlock) -> Result> { // accumulate mutations - let merge_into_operation = MergeIntoOperation::try_from(data)?; - self.accumulate(merge_into_operation).await?; + let replace_into_operation = ReplaceIntoOperation::try_from(data)?; + self.accumulate(replace_into_operation).await?; // no partial output Ok(None) } @@ -45,7 +45,7 @@ impl AsyncAccumulatingTransform for MergeIntoOperationAggregator { } } -impl MergeIntoOperationAggregator { +impl ReplaceIntoOperationAggregator { pub fn into_pipe_item(self) -> PipeItem { let input = InputPort::create(); let output = OutputPort::create(); diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index 093cbf17ced85..4fd98cece5bb4 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -26,9 +26,9 @@ use databend_common_expression::SEGMENT_NAME_COL_NAME; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_sql::field_default_value; use databend_common_sql::BloomIndexColumns; -use databend_storages_common_cache::BlockMetaCache; use databend_storages_common_cache::CacheAccessor; use databend_storages_common_cache::CacheManager; +use databend_storages_common_cache::SegmentBlockMetasCache; use databend_storages_common_index::RangeIndex; use databend_storages_common_pruner::BlockMetaIndex; use databend_storages_common_pruner::InternalColumnPruner; @@ -195,7 +195,7 @@ pub struct FusePruner { pub push_down: Option, pub inverse_range_index: Option, pub deleted_segments: Vec, - pub block_meta_cache: Option, + pub block_meta_cache: Option, } impl FusePruner { @@ -265,7 +265,7 @@ impl FusePruner { pruning_ctx, inverse_range_index: None, deleted_segments: vec![], - block_meta_cache: CacheManager::instance().get_block_meta_cache(), + block_meta_cache: CacheManager::instance().get_segment_block_metas_cache(), }) } @@ -406,7 +406,7 @@ impl FusePruner { segment: &CompactSegmentInfo, populate_cache: bool, ) -> Result>>> { - if let Some(cache) = CacheManager::instance().get_block_meta_cache() { + if let Some(cache) = CacheManager::instance().get_segment_block_metas_cache() { if let Some(metas) = cache.get(segment_path) { Ok(metas) } else { diff --git a/src/query/storages/system/src/caches_table.rs b/src/query/storages/system/src/caches_table.rs index ead0282a6aefa..35c1138525a6b 100644 --- a/src/query/storages/system/src/caches_table.rs +++ b/src/query/storages/system/src/caches_table.rs @@ -76,6 +76,7 @@ impl SyncSystemTable for CachesTable { let segment_info_cache = cache_manager.get_table_segment_cache(); let bloom_index_filter_cache = cache_manager.get_bloom_index_filter_cache(); let bloom_index_meta_cache = cache_manager.get_bloom_index_meta_cache(); + let segment_block_metas_cache = cache_manager.get_segment_block_metas_cache(); let block_meta_cache = cache_manager.get_block_meta_cache(); let inverted_index_meta_cache = cache_manager.get_inverted_index_meta_cache(); let inverted_index_file_cache = cache_manager.get_inverted_index_file_cache(); @@ -105,6 +106,10 @@ impl SyncSystemTable for CachesTable { Self::append_row(&bloom_index_meta_cache, &local_node, &mut columns); } + if let Some(segment_block_metas_cache) = segment_block_metas_cache { + Self::append_row(&segment_block_metas_cache, &local_node, &mut columns); + } + if let Some(block_meta_cache) = block_meta_cache { Self::append_row(&block_meta_cache, &local_node, &mut columns); } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0043_set_cache_cap.sql b/tests/sqllogictests/suites/base/09_fuse_engine/09_0043_set_cache_cap.sql index 1b75c19a4703d..2015350673878 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0043_set_cache_cap.sql +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0043_set_cache_cap.sql @@ -7,11 +7,22 @@ use db_09_0041; # By default, memory_cache_block_meta is disabled, # let's enable it by setting a non-zero capacity statement ok -call system$set_cache_capacity('memory_cache_block_meta', 1000); +call system$set_cache_capacity('memory_cache_block_meta', 1); # check cache "memory_cache_block_meta" exists query II -select count()>=1 from system.caches where name = 'memory_cache_block_meta' and capacity = 1000; +select count()>=1 from system.caches where name = 'memory_cache_block_meta' and capacity = 1; +---- +1 + + +statement ok +call system$set_cache_capacity('memory_cache_segment_block_metas', 3); + +# check cache "memory_cache_segment_block_metas" exists + +query II +select count()>=1 from system.caches where name = 'memory_cache_segment_block_metas' and capacity = 3; ---- 1 diff --git a/tests/sqllogictests/suites/stage/formats/parquet/options/null_if.test b/tests/sqllogictests/suites/stage/formats/parquet/options/null_if.test index 338efaac0c318..2b85e0e94b2c3 100644 --- a/tests/sqllogictests/suites/stage/formats/parquet/options/null_if.test +++ b/tests/sqllogictests/suites/stage/formats/parquet/options/null_if.test @@ -23,10 +23,15 @@ insert into string values (''), ('null'),('128') statement ok remove @data/unload/parquet/null_if/ -query +statement ok copy into @data/unload/parquet/null_if from string + +query +select a from @data/unload/parquet/null_if order by a ---- -3 40 384 +(empty) +128 +null statement ok drop file format if exists parquet_null_if diff --git a/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_uuid.test b/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_uuid.test index 2e0f634eea6c6..5906e218fcf42 100644 --- a/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_uuid.test +++ b/tests/sqllogictests/suites/stage/formats/parquet/options/parquet_missing_uuid.test @@ -7,10 +7,13 @@ create table t_uuid(id string default uuid(), a int) statement ok remove @data/parquet/unload/uuid -query +statement ok copy into @data/parquet/unload/uuid/ from (select 1 as a) file_format = (type = parquet) + +query +select a from @data/parquet/unload/uuid/ ---- -1 1 377 +1 query error column id doesn't exist copy into t_uuid from @data/parquet/unload/uuid file_format = (type = parquet) RETURN_FAILED_ONLY=TRUE @@ -19,10 +22,9 @@ query select * from t_uuid ---- -query +# prepare data for copying into t_uuid +statement ok copy into @data/parquet/unload/uuid/ from (select 1 as a) file_format = (type = parquet) ----- -1 1 377 statement ok truncate table t_uuid diff --git a/tests/suites/0_stateless/05_hints/05_0001_set_var.result b/tests/suites/0_stateless/05_hints/05_0001_set_var.result index 8a477a8c51d7c..e8a36ce0e93d2 100644 --- a/tests/suites/0_stateless/05_hints/05_0001_set_var.result +++ b/tests/suites/0_stateless/05_hints/05_0001_set_var.result @@ -22,5 +22,5 @@ Asia/Shanghai America/Toronto 2022-02-02 03:00:00 2022-02-01 14:00:00 -1 13 415 +1 13 Asia/Shanghai diff --git a/tests/suites/0_stateless/05_hints/05_0001_set_var.sh b/tests/suites/0_stateless/05_hints/05_0001_set_var.sh index 084e1b24a2818..51950b1abd457 100755 --- a/tests/suites/0_stateless/05_hints/05_0001_set_var.sh +++ b/tests/suites/0_stateless/05_hints/05_0001_set_var.sh @@ -37,6 +37,6 @@ echo "drop database set_var;" | $BENDSQL_CLIENT_CONNECT echo "drop stage if exists s2" | $BENDSQL_CLIENT_CONNECT echo "create stage s2" | $BENDSQL_CLIENT_CONNECT -echo "copy /*+SET_VAR(timezone='Asia/Shanghai') */ into @s2 from (select timezone()); " | $BENDSQL_CLIENT_CONNECT +echo "copy /*+SET_VAR(timezone='Asia/Shanghai') */ into @s2 from (select timezone()); " | $BENDSQL_CLIENT_CONNECT | cut -d$'\t' -f1,2 echo "select * from @s2 " | $BENDSQL_CLIENT_CONNECT echo "drop stage s2" | $BENDSQL_CLIENT_CONNECT diff --git a/tests/suites/0_stateless/18_rbac/18_0002_ownership_cover.result b/tests/suites/0_stateless/18_rbac/18_0002_ownership_cover.result index 5f853c770b115..1570311b2ead6 100644 --- a/tests/suites/0_stateless/18_rbac/18_0002_ownership_cover.result +++ b/tests/suites/0_stateless/18_rbac/18_0002_ownership_cover.result @@ -1,7 +1,7 @@ === test db/table === 200 === test stage === -1 8 401 +1 8 0 === test udf === 2 diff --git a/tests/suites/0_stateless/18_rbac/18_0002_ownership_cover.sh b/tests/suites/0_stateless/18_rbac/18_0002_ownership_cover.sh index 16881d0741566..d62bceea8891b 100755 --- a/tests/suites/0_stateless/18_rbac/18_0002_ownership_cover.sh +++ b/tests/suites/0_stateless/18_rbac/18_0002_ownership_cover.sh @@ -41,7 +41,7 @@ echo "select * from d_0002.t" | $TEST_USER_CONNECT ## stage echo "=== test stage ===" echo 'create stage hello' | $TEST_USER_CONNECT -echo 'COPY INTO @hello from (select number from numbers(1)) FILE_FORMAT = (type = parquet)' | $TEST_USER_CONNECT +echo 'COPY INTO @hello from (select number from numbers(1)) FILE_FORMAT = (type = parquet)' | $TEST_USER_CONNECT | cut -d$'\t' -f1,2 echo 'select * from @hello' | $TEST_USER_CONNECT ## udf diff --git a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.result b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.result index 08fbf309c29b9..383393f1f624b 100644 --- a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.result +++ b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.result @@ -95,7 +95,7 @@ Error: APIError: QueryFailed: [1063]Permission denied: privilege READ is require Error: APIError: QueryFailed: [1063]Permission denied: No privilege on database root_db for user b. Error: APIError: QueryFailed: [1063]Permission denied: No privilege on table root_table for user b. Error: APIError: QueryFailed: [1063]Permission denied: No privilege on table root_table for user b. -1 1 377 +1 1 Error: APIError: QueryFailed: [1063]Permission denied: privilege [Select] is required on 'default'.'default'.'t1' for user 'b'@'%' with roles [public] Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE s3 for user 'b'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Database|Table|UDF|Stage. Error: APIError: QueryFailed: [1063]Permission denied: privilege [Select] is required on 'default'.'default'.'t' for user 'b'@'%' with roles [public] diff --git a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh index ebbc772282056..e0c15ed787364 100755 --- a/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh +++ b/tests/suites/0_stateless/18_rbac/18_0007_privilege_access.sh @@ -236,7 +236,7 @@ echo "grant insert, delete on default.t to b" | $BENDSQL_CLIENT_CONNECT echo "grant select on system.* to b" | $BENDSQL_CLIENT_CONNECT echo "create stage s3;" | $BENDSQL_CLIENT_CONNECT -echo "copy into '@s3/a b' from (select 2);" | $BENDSQL_CLIENT_CONNECT | $RM_UUID +echo "copy into '@s3/a b' from (select 2);" | $BENDSQL_CLIENT_CONNECT | $RM_UUID | cut -d$'\t' -f1,2 # need err echo "insert into t select * from t1" | $USER_B_CONNECT diff --git a/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.result b/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.result index c82a86e894121..2c43c9afdad11 100644 --- a/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.result +++ b/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.result @@ -1,4 +1,3 @@ -2 10 386 expects .stats.write_progress.rows be 2 expects .error be null 2 diff --git a/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.sh b/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.sh index 0bd510417b0cc..de3229261079d 100755 --- a/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.sh +++ b/tests/suites/0_stateless/20+_others/20_0015_compact_hook_stas_issue_13947.sh @@ -4,11 +4,11 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) . "$CURDIR"/../../../shell_env.sh # set up -cat < /dev/null +create or replace database i13947; use i13947; -create stage test_stage; -create table tmp(id int); +create or replace stage test_stage; +create or replace table tmp(id int); insert into tmp values(1); insert into tmp values(2); copy into @test_stage from (select * from tmp); diff --git a/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.result b/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.result index b5fae37f1ce82..36cd956783ae2 100755 --- a/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.result +++ b/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.result @@ -1,4 +1,4 @@ -20 160 160 -20 450 799 +20 160 +20 450 2 -20 160 160 +20 160 diff --git a/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.sh b/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.sh index 2bdcece326be8..c71a6e129ce00 100755 --- a/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.sh +++ b/tests/suites/1_stateful/00_stage/00_0001_copy_into_stage.sh @@ -17,13 +17,14 @@ for i in `seq 1 10`;do echo "insert into test_table (id,name,age) values(1,'2',3), (4, '5', 6);" | $BENDSQL_CLIENT_CONNECT done - -echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV);" | $BENDSQL_CLIENT_CONNECT -echo "copy into @s2 from (select name, age, id from test_table limit 100) FILE_FORMAT = (type = 'PARQUET');" | $BENDSQL_CLIENT_CONNECT +# The last column `output_bytes` is excluded to avoid flakiness +echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV);" | $BENDSQL_CLIENT_CONNECT | cut -d$'\t' -f1,2 +echo "copy into @s2 from (select name, age, id from test_table limit 100) FILE_FORMAT = (type = 'PARQUET');" | $BENDSQL_CLIENT_CONNECT | cut -d$'\t' -f1,2 echo "list @s2;" | $BENDSQL_CLIENT_CONNECT | wc -l | sed 's/ //g' -echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV) MAX_FILE_SIZE = 10;" | $BENDSQL_CLIENT_CONNECT +# The last column `output_bytes` is excluded to avoid flakiness +echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV) MAX_FILE_SIZE = 10;" | $BENDSQL_CLIENT_CONNECT | cut -d$'\t' -f1,2 lines=`echo "list @s2;" | $BENDSQL_CLIENT_CONNECT | wc -l` diff --git a/tests/suites/1_stateful/00_stage/00_0012_stage_priv.result b/tests/suites/1_stateful/00_stage/00_0012_stage_priv.result index 82ee35e8fa93e..975a9b57c3d2c 100644 --- a/tests/suites/1_stateful/00_stage/00_0012_stage_priv.result +++ b/tests/suites/1_stateful/00_stage/00_0012_stage_priv.result @@ -1,11 +1,11 @@ ==== check internal stage write priv === Error: APIError: QueryFailed: [1063]Permission denied: privilege [Write] is required on STAGE s2 for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Database|Table|UDF|Stage. Error: APIError: QueryFailed: [1063]Permission denied: privilege [Select] is required on 'default'.'default'.'test_table' for user 'u1'@'%' with roles [public] -20 160 160 +20 160 1 ==== check external stage priv === Error: APIError: QueryFailed: [1063]Permission denied: privilege [Write] is required on STAGE s1 for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Database|Table|UDF|Stage. -20 160 160 +20 160 Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE s1 for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Database|Table|UDF|Stage. csv/data_UUID_0000_00000000.csv 20 0 NULL NULL ==== check internal stage read priv === @@ -17,7 +17,7 @@ Error: APIError: QueryFailed: [1063]Permission denied: privilege [Write] is requ Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE presign_stage for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Database|Table|UDF|Stage. 000 Error: APIError: QueryFailed: [1063]Permission denied: privilege [Write] is required on STAGE s3 for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Database|Table|UDF|Stage. -1 1 377 +1 1 Error: APIError: QueryFailed: [1063]Permission denied: privilege [Read] is required on STAGE s3 for user 'u1'@'%' with roles [public]. Note: Please ensure that your current role have the appropriate permissions to create a new Database|Table|UDF|Stage. Error: APIError: QueryFailed: [1063]Permission denied: privilege READ is required on stage s3 for user 'u1'@'%' Error: APIError: QueryFailed: [1063]Permission denied: privilege READ is required on stage s3 for user 'u1'@'%' diff --git a/tests/suites/1_stateful/00_stage/00_0012_stage_priv.sh b/tests/suites/1_stateful/00_stage/00_0012_stage_priv.sh index 2996ffc4bbca6..c8d4d6de3e6f1 100755 --- a/tests/suites/1_stateful/00_stage/00_0012_stage_priv.sh +++ b/tests/suites/1_stateful/00_stage/00_0012_stage_priv.sh @@ -33,17 +33,17 @@ echo "create stage s1 url = 'fs:///$STAGE_DIR/' FILE_FORMAT = (type = CSV)" | $B echo "create user u1 identified by 'password';" | $BENDSQL_CLIENT_CONNECT echo "grant insert on default.test_table to u1;" | $BENDSQL_CLIENT_CONNECT echo "==== check internal stage write priv ===" -echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID +echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID | cut -d$'\t' -f1,2 echo "grant Write on stage s2 to 'u1'" | $BENDSQL_CLIENT_CONNECT -echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID +echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID | cut -d$'\t' -f1,2 echo "grant select on default.test_table to u1;" | $BENDSQL_CLIENT_CONNECT -echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID +echo "copy into @s2 from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID | cut -d$'\t' -f1,2 echo "list @s2;" | $BENDSQL_CLIENT_CONNECT | wc -l | sed 's/ //g' echo "==== check external stage priv ===" -echo "copy into @s1/csv/ from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID +echo "copy into @s1/csv/ from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID | cut -d$'\t' -f1,2 echo "grant write on stage s1 to 'u1'" | $BENDSQL_CLIENT_CONNECT -echo "copy into @s1/csv/ from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID +echo "copy into @s1/csv/ from test_table FILE_FORMAT = (type = CSV);" | $TEST_USER_CONNECT | $RM_UUID | cut -d$'\t' -f1,2 echo "copy into test_table from @s1/csv/ FILE_FORMAT = (type = CSV skip_header = 0) force=true;" | $TEST_USER_CONNECT | $RM_UUID echo "grant read on stage s1 to 'u1'" | $BENDSQL_CLIENT_CONNECT echo "copy into test_table from @s1/csv/ FILE_FORMAT = (type = CSV skip_header = 0) force=true;" | $TEST_USER_CONNECT | $RM_UUID @@ -80,7 +80,7 @@ echo "create stage s3;" | $BENDSQL_CLIENT_CONNECT echo "remove @s3;" | $TEST_USER_CONNECT echo "grant write on stage s3 to u1" | $BENDSQL_CLIENT_CONNECT echo "remove @s3;" | $TEST_USER_CONNECT -echo "copy into '@s3/a b' from (select 2);" | $TEST_USER_CONNECT | $RM_UUID +echo "copy into '@s3/a b' from (select 2);" | $TEST_USER_CONNECT | $RM_UUID | cut -d$'\t' -f1,2 echo "grant select on system.* to u1" | $BENDSQL_CLIENT_CONNECT diff --git a/tests/suites/1_stateful/00_stage/00_0012_stage_with_connection.result b/tests/suites/1_stateful/00_stage/00_0012_stage_with_connection.result index 307f687b4598d..3657c218544e7 100644 --- a/tests/suites/1_stateful/00_stage/00_0012_stage_with_connection.result +++ b/tests/suites/1_stateful/00_stage/00_0012_stage_with_connection.result @@ -7,7 +7,7 @@ >>>> create stage my_stage url= 's3://testbucket/admin/tempdata/' connection = (connection_name='my_conn'); >>>> remove @my_stage; >>>> copy into @my_stage/a.csv from my_table -3 13 387 +3 13 >>>> select * from @my_stage order by a; 1 2 diff --git a/tests/suites/1_stateful/00_stage/00_0012_stage_with_connection.sh b/tests/suites/1_stateful/00_stage/00_0012_stage_with_connection.sh index c84f419137915..8b4a588e02047 100755 --- a/tests/suites/1_stateful/00_stage/00_0012_stage_with_connection.sh +++ b/tests/suites/1_stateful/00_stage/00_0012_stage_with_connection.sh @@ -12,7 +12,7 @@ stmt "drop connection if exists my_conn;" stmt "create connection my_conn storage_type = 's3' access_key_id ='minioadmin' secret_access_key ='minioadmin' endpoint_url='${STORAGE_S3_ENDPOINT_URL}'" stmt "create stage my_stage url= 's3://testbucket/admin/tempdata/' connection = (connection_name='my_conn');" stmt "remove @my_stage;" -stmt "copy into @my_stage/a.csv from my_table" +stmt "copy into @my_stage/a.csv from my_table" | cut -d$'\t' -f1,2 query "select * from @my_stage order by a;" stmt "drop table if exists my_table;" diff --git a/tests/suites/1_stateful/00_stage/00_0015_unload_output.result b/tests/suites/1_stateful/00_stage/00_0015_unload_output.result index 59d4f0e99c345..99b2e680a05bc 100644 --- a/tests/suites/1_stateful/00_stage/00_0015_unload_output.result +++ b/tests/suites/1_stateful/00_stage/00_0015_unload_output.result @@ -28,35 +28,37 @@ a/bc/data_UUID_0000_00000009.csv 2 1 10 20 20 <<<< <<<< ->>>> copy /*+ set_var(max_threads=1) */ into @s1/a/bc from (select * from t1) max_file_size=1 detailed_output=true -a/bc/data_UUID_0000_00000000.parquet 378 1 -a/bc/data_UUID_0000_00000001.parquet 378 1 -a/bc/data_UUID_0000_00000002.parquet 378 1 -a/bc/data_UUID_0000_00000003.parquet 378 1 -a/bc/data_UUID_0000_00000004.parquet 378 1 -a/bc/data_UUID_0000_00000005.parquet 378 1 -a/bc/data_UUID_0000_00000006.parquet 378 1 -a/bc/data_UUID_0000_00000007.parquet 378 1 -a/bc/data_UUID_0000_00000008.parquet 378 1 -a/bc/data_UUID_0000_00000009.parquet 378 1 +a/bc/data_UUID_0000_00000000.parquet 1 +a/bc/data_UUID_0000_00000001.parquet 1 +a/bc/data_UUID_0000_00000002.parquet 1 +a/bc/data_UUID_0000_00000003.parquet 1 +a/bc/data_UUID_0000_00000004.parquet 1 +a/bc/data_UUID_0000_00000005.parquet 1 +a/bc/data_UUID_0000_00000006.parquet 1 +a/bc/data_UUID_0000_00000007.parquet 1 +a/bc/data_UUID_0000_00000008.parquet 1 +a/bc/data_UUID_0000_00000009.parquet 1 +10 50 +<<<< +>>>> copy into @s1/a/bc from (select * from t1) max_file_size=1 detailed_output=false >>>> unload path >>>> copy /*+ set_var(max_threads=1) */ into @s1 from (select 1) detailed_output=true -data_UUID_0000_00000000.parquet 377 1 +data_UUID_0000_00000000.parquet 1 <<<< >>>> copy /*+ set_var(max_threads=1) */ into @s1/ from (select 1) detailed_output=true -data_UUID_0000_00000000.parquet 377 1 +data_UUID_0000_00000000.parquet 1 <<<< >>>> copy /*+ set_var(max_threads=1) */ into @s1/a from (select 1) detailed_output=true -a/data_UUID_0000_00000000.parquet 377 1 +a/data_UUID_0000_00000000.parquet 1 <<<< >>>> copy /*+ set_var(max_threads=1) */ into @s1/a/ from (select 1) detailed_output=true -a/data_UUID_0000_00000000.parquet 377 1 +a/data_UUID_0000_00000000.parquet 1 <<<< >>>> copy /*+ set_var(max_threads=1) */ into @s1/a/bc from (select 1) detailed_output=true -a/bc/data_UUID_0000_00000000.parquet 377 1 +a/bc/data_UUID_0000_00000000.parquet 1 <<<< >>>> copy /*+ set_var(max_threads=1) */ into @s1/a/data_ from (select 1) detailed_output=true -a/data_UUID_0000_00000000.parquet 377 1 +a/data_UUID_0000_00000000.parquet 1 <<<< >>>> drop stage if exists s1 >>>> drop table if exists t1 diff --git a/tests/suites/1_stateful/00_stage/00_0015_unload_output.sh b/tests/suites/1_stateful/00_stage/00_0015_unload_output.sh index d589f61a152ee..dc221cde4932f 100755 --- a/tests/suites/1_stateful/00_stage/00_0015_unload_output.sh +++ b/tests/suites/1_stateful/00_stage/00_0015_unload_output.sh @@ -21,17 +21,21 @@ query "copy /*+ set_var(max_threads=1) */ into @s1/a/bc from (select * from t1) query "copy into @s1/a/bc from (select * from t1) file_format = (type=csv) max_file_size=1 detailed_output=false" -query "copy /*+ set_var(max_threads=1) */ into @s1/a/bc from (select * from t1) max_file_size=1 detailed_output=true" | $RM_UUID | sort +query "copy /*+ set_var(max_threads=1) */ into @s1/a/bc from (select * from t1) max_file_size=1 detailed_output=true" | $RM_UUID | tail -n +2 | sort | cut -d$'\t' -f1,3 -query "copy into @s1/a/bc from (select * from t1) max_file_size=1 detailed_output=false" | $MYSQL +# when option `detailed_output` is set to false, the result-set will have the following 3 columns: +# `rows_unloaded, input_bytes, output_bytes` +# https://docs.databend.com/sql/sql-commands/dml/dml-copy-into-location#detailed_output +# the last column `output_bytes` will be ignored, to avoid flakiness +query "copy into @s1/a/bc from (select * from t1) max_file_size=1 detailed_output=false" | sort | cut -d$'\t' -f1,2 echo ">>>> unload path" -query "copy /*+ set_var(max_threads=1) */ into @s1 from (select 1) detailed_output=true" | $RM_UUID -query "copy /*+ set_var(max_threads=1) */ into @s1/ from (select 1) detailed_output=true" | $RM_UUID -query "copy /*+ set_var(max_threads=1) */ into @s1/a from (select 1) detailed_output=true" | $RM_UUID -query "copy /*+ set_var(max_threads=1) */ into @s1/a/ from (select 1) detailed_output=true" | $RM_UUID -query "copy /*+ set_var(max_threads=1) */ into @s1/a/bc from (select 1) detailed_output=true" | $RM_UUID -query "copy /*+ set_var(max_threads=1) */ into @s1/a/data_ from (select 1) detailed_output=true" | $RM_UUID +query "copy /*+ set_var(max_threads=1) */ into @s1 from (select 1) detailed_output=true" | $RM_UUID | cut -d$'\t' -f1,3 +query "copy /*+ set_var(max_threads=1) */ into @s1/ from (select 1) detailed_output=true" | $RM_UUID | cut -d$'\t' -f1,3 +query "copy /*+ set_var(max_threads=1) */ into @s1/a from (select 1) detailed_output=true" | $RM_UUID | cut -d$'\t' -f1,3 +query "copy /*+ set_var(max_threads=1) */ into @s1/a/ from (select 1) detailed_output=true" | $RM_UUID | cut -d$'\t' -f1,3 +query "copy /*+ set_var(max_threads=1) */ into @s1/a/bc from (select 1) detailed_output=true" | $RM_UUID | cut -d$'\t' -f1,3 +query "copy /*+ set_var(max_threads=1) */ into @s1/a/data_ from (select 1) detailed_output=true" | $RM_UUID | cut -d$'\t' -f1,3 stmt "drop stage if exists s1" stmt "drop table if exists t1" diff --git a/tests/suites/1_stateful/05_formats/05_05_parquet/05_05_01_parquet_load_unload.result b/tests/suites/1_stateful/05_formats/05_05_parquet/05_05_01_parquet_load_unload.result index 91d1e47915cd9..da4ed68a234a9 100755 --- a/tests/suites/1_stateful/05_formats/05_05_parquet/05_05_01_parquet_load_unload.result +++ b/tests/suites/1_stateful/05_formats/05_05_parquet/05_05_01_parquet_load_unload.result @@ -28,14 +28,14 @@ a"b 1 ['a"b'] {"k":"v"} 2044-05-06 10:25:02.868894 10.01 ('a',5) ['{"k":"v"}'] [ NULL 2 ['a'b'] [1] 2044-05-06 10:25:02.868894 -10.01 ('b',10) ['[1]'] [('b',10)] <<<< >>>> copy into @s1/unload1/ from test_load_unload -2 362 2986 +2 362 >>>> truncate table test_load_unload >>>> copy into test_load_unload from @s1/unload1.parquet force=true; unload1.parquet 2 0 NULL NULL begin diff select end diff >>>> copy into @s1/unload2/ from test_load_unload -2 362 2986 +2 362 begin diff parquet end diff >>>> truncate table test_load_unload diff --git a/tests/suites/1_stateful/05_formats/05_05_parquet/05_05_01_parquet_load_unload.sh b/tests/suites/1_stateful/05_formats/05_05_parquet/05_05_01_parquet_load_unload.sh index 18e985039626e..07f7e29947e66 100755 --- a/tests/suites/1_stateful/05_formats/05_05_parquet/05_05_01_parquet_load_unload.sh +++ b/tests/suites/1_stateful/05_formats/05_05_parquet/05_05_01_parquet_load_unload.sh @@ -43,7 +43,7 @@ # unload1 query "select * from test_load_unload" query "select * from test_load_unload" > /tmp/test_load_unload/select1.txt - stmt "copy into @s1/unload1/ from test_load_unload" + stmt "copy into @s1/unload1/ from test_load_unload" | cut -d$'\t' -f1,2 mv `ls /tmp/test_load_unload/unload1/*` /tmp/test_load_unload/unload1.parquet # reload with copy into table @@ -57,7 +57,7 @@ diff /tmp/test_load_unload/select1.txt /tmp/test_load_unload/select2.txt echo "end diff" - stmt "copy into @s1/unload2/ from test_load_unload" + stmt "copy into @s1/unload2/ from test_load_unload" | cut -d$'\t' -f1,2 mv `ls /tmp/test_load_unload/unload2/*` /tmp/test_load_unload/unload2.parquet diff --git a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result index 31509fcbee6c2..52fcc4224b98e 100755 --- a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result +++ b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.result @@ -1,8 +1,8 @@ --- named internal stage -2 45 769 +2 45 1 2 3 4 5 6 -2 45 769 +2 45 --- external stage 1 2 3 4 5 6 @@ -10,6 +10,6 @@ 1 2 3 4 5 6 --- variant named internal stage -2 70 725 +2 70 1 [1,2,3] 2 {"k":"v"} diff --git a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.sh b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.sh index 42ef18e72a514..c31d14c2e5ba4 100755 --- a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.sh +++ b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_00_basic.sh @@ -10,13 +10,13 @@ echo "insert into t1 (id,name,age) values(1,'2',3), (4, '5', 6);" | $BENDSQL_CLI echo '--- named internal stage' echo "drop stage if exists s1;" | $BENDSQL_CLIENT_CONNECT echo "create stage s1 FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT -echo "copy into @s1 from t1;" | $BENDSQL_CLIENT_CONNECT +echo "copy into @s1 from t1;" | $BENDSQL_CLIENT_CONNECT | cut -d$'\t' -f1,2 echo "select * from @s1;" | $BENDSQL_CLIENT_CONNECT DATADIR_PATH="/tmp/08_00_00" rm -rf ${DATADIR_PATH} DATADIR="fs://$DATADIR_PATH/" -echo "copy into '${DATADIR}' from t1 FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT +echo "copy into '${DATADIR}' from t1 FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT | cut -d$'\t' -f1,2 #echo '--- uri' #echo "select * from '${DATADIR}';" | $BENDSQL_CLIENT_CONNECT @@ -38,7 +38,7 @@ echo "insert into t2 (id,data) values(1,'[1,2,3]'),(2,'{\"k\":\"v\"}');" | $BEND echo '--- variant named internal stage' echo "drop stage if exists s4;" | $BENDSQL_CLIENT_CONNECT echo "create stage s4 FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT -echo "copy into @s4 from t2;" | $BENDSQL_CLIENT_CONNECT +echo "copy into @s4 from t2;" | $BENDSQL_CLIENT_CONNECT | cut -d$'\t' -f1,2 echo "select * from @s4;" | $BENDSQL_CLIENT_CONNECT rm -rf ${DATADIR_PATH} diff --git a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_06_transform.result b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_06_transform.result index 3fe2915437f05..735f9cd1e4b22 100755 --- a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_06_transform.result +++ b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_06_transform.result @@ -1,4 +1,4 @@ -2 18 572 +2 18 --- copy from uri with transform 2 5 diff --git a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_06_transform.sh b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_06_transform.sh index 3b79c69a1b2c2..2795bcb3e3934 100755 --- a/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_06_transform.sh +++ b/tests/suites/1_stateful/08_select_stage/08_00_parquet/08_00_06_transform.sh @@ -10,7 +10,7 @@ echo "insert into t1 (id, age) values(1,3), (4, 6);" | $BENDSQL_CLIENT_CONNECT DATADIR_PATH="/tmp/08_00_06" rm -rf ${DATADIR_PATH} DATADIR="fs://$DATADIR_PATH/" -echo "copy into '${DATADIR}' from t1 FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT +echo "copy into '${DATADIR}' from t1 FILE_FORMAT = (type = PARQUET);" | $BENDSQL_CLIENT_CONNECT | cut -d$'\t' -f1,2 touch ${DATADIR_PATH}/transform.csv