Skip to content

Commit 2213e14

Browse files
feat: schema mismatch alignment
1 parent 7ddd888 commit 2213e14

File tree

4 files changed

+205
-119
lines changed

4 files changed

+205
-119
lines changed

influxdb3/src/commands/serve.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ pub struct Config {
181181
#[clap(
182182
long = "gen1-duration",
183183
env = "INFLUXDB3_GEN1_DURATION",
184-
default_value = "10m",
184+
default_value = "1m",
185185
action
186186
)]
187187
pub gen1_duration: Gen1Duration,
@@ -577,9 +577,7 @@ pub async fn command(config: Config) -> Result<()> {
577577

578578
info!("setting up background mem check for query buffer");
579579
background_buffer_checker(
580-
// config.force_snapshot_mem_threshold.bytes(),
581-
734003200,
582-
// 536870912,
580+
config.force_snapshot_mem_threshold.as_num_bytes(),
583581
&write_buffer_impl,
584582
)
585583
.await;

influxdb3_write/src/paths.rs

-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ impl ParquetFilePath {
7575
chunk_idx = sub_chunk_index.unwrap(),
7676
ext = PARQUET_FILE_EXTENSION
7777
))
78-
7978
} else {
8079
ObjPath::from(format!(
8180
"{host_prefix}/dbs/{db_name}-{db_id}/{table_name}-{table_id}/{date_string}/{wal_seq:010}.{ext}",

influxdb3_write/src/write_buffer/queryable_buffer.rs

+192-48
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
1-
use crate::paths::ParquetFilePath;
21
use crate::persister::Persister;
32
use crate::write_buffer::persisted_files::PersistedFiles;
43
use crate::write_buffer::table_buffer::TableBuffer;
54
use crate::{ChunkFilter, ParquetFile, ParquetFileId, PersistedSnapshot};
65
use crate::{chunk::BufferChunk, write_buffer::table_buffer::SnaphotChunkIter};
6+
use crate::{paths::ParquetFilePath, write_buffer::table_buffer::array_ref_nulls_for_type};
77
use anyhow::Context;
8-
use arrow::{
9-
array::{AsArray, UInt64Array},
10-
compute::take,
11-
datatypes::TimestampNanosecondType,
12-
record_batch::RecordBatch,
13-
};
8+
use arrow::record_batch::RecordBatch;
149
use async_trait::async_trait;
1510
use data_types::{
1611
ChunkId, ChunkOrder, PartitionHashId, PartitionId, PartitionKey, TimestampMinMax,
@@ -24,7 +19,10 @@ use influxdb3_cache::parquet_cache::{CacheRequest, ParquetCacheOracle};
2419
use influxdb3_cache::{distinct_cache::DistinctCacheProvider, last_cache::LastCacheProvider};
2520
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition};
2621
use influxdb3_id::{DbId, TableId};
27-
use influxdb3_wal::{CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalOp, WriteBatch};
22+
use influxdb3_wal::{
23+
CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalFileSequenceNumber, WalOp,
24+
WriteBatch,
25+
};
2826
use iox_query::QueryChunk;
2927
use iox_query::chunk_statistics::{NoColumnRanges, create_chunk_statistics};
3028
use iox_query::exec::Executor;
@@ -36,9 +34,9 @@ use parking_lot::RwLock;
3634
use parquet::format::FileMetaData;
3735
use schema::Schema;
3836
use schema::sort::SortKey;
39-
use std::sync::Arc;
37+
use std::any::Any;
4038
use std::time::Duration;
41-
use std::{any::Any, collections::BTreeMap};
39+
use std::{iter::Peekable, slice::Iter, sync::Arc};
4240
use tokio::sync::oneshot::{self, Receiver};
4341
use tokio::task::JoinSet;
4442

@@ -217,6 +215,7 @@ impl QueryableBuffer {
217215

218216
let persist_job = PersistJob {
219217
database_id: *database_id,
218+
database_name: Arc::clone(&db_schema.name),
220219
table_id: *table_id,
221220
table_name: Arc::clone(&table_name),
222221
chunk_time: chunk.chunk_time,
@@ -231,14 +230,13 @@ impl QueryableBuffer {
231230
None,
232231
),
233232
// these clones are cheap and done one at a time
234-
batch: chunk.record_batch.clone(),
233+
batch: vec![chunk.record_batch.clone()],
235234
schema: chunk.schema.clone(),
236235
timestamp_min_max: chunk.timestamp_min_max,
237236
sort_key: sort_key.clone(),
238237
};
239238
persisting_chunks.push(persist_job);
240-
snapshot_chunks.push_back(chunk);
241-
// snapshot_chunks.add_one(chunk);
239+
snapshot_chunks.push(chunk);
242240
debug!(">>> finished with chunk");
243241
}
244242
}
@@ -322,6 +320,9 @@ impl QueryableBuffer {
322320
)));
323321

324322
sort_dedupe_parallel(
323+
Arc::from(persister.node_identifier_prefix()),
324+
wal_file_number,
325+
Arc::clone(&catalog),
325326
persist_jobs,
326327
&persister,
327328
executor,
@@ -421,7 +422,11 @@ impl QueryableBuffer {
421422
}
422423
}
423424

425+
#[allow(clippy::too_many_arguments)]
424426
async fn sort_dedupe_parallel(
427+
host_prefix: Arc<str>,
428+
wal_file_number: WalFileSequenceNumber,
429+
catalog: Arc<Catalog>,
425430
persist_jobs: Vec<PersistJob>,
426431
persister: &Arc<Persister>,
427432
executor: Arc<Executor>,
@@ -430,10 +435,16 @@ async fn sort_dedupe_parallel(
430435
persisted_files: Arc<PersistedFiles>,
431436
persisted_snapshot: Arc<Mutex<PersistedSnapshot>>,
432437
) {
433-
// if gen1 duration is 1m we should combine upto 10 of them
434-
// to create a single parquet file
438+
let iterator = PersistJobGroupedIterator::new(
439+
&persist_jobs,
440+
Arc::clone(&host_prefix),
441+
wal_file_number,
442+
Arc::clone(&catalog),
443+
10,
444+
);
445+
435446
let mut set = JoinSet::new();
436-
for persist_job in persist_jobs {
447+
for persist_job in iterator {
437448
let persister = Arc::clone(persister);
438449
let executor = Arc::clone(&executor);
439450
let persisted_snapshot = Arc::clone(&persisted_snapshot);
@@ -560,34 +571,7 @@ async fn sort_dedupe_serial(
560571
}
561572
}
562573

563-
persisted_snapshot
564-
.add_parquet_file(database_id, table_id, parquet_file)
565-
}
566-
}
567-
568-
#[derive(Debug)]
569-
struct MinMax {
570-
min: i64,
571-
max: i64,
572-
}
573-
574-
impl MinMax {
575-
fn new(min: i64, max: i64) -> Self {
576-
// this doesn't check if min < max, a lot of the times
577-
// it's good to start with i64::MAX for min and i64::MIN
578-
// for max in loops so this type unlike TimestampMinMax
579-
// doesn't check this pre-condition
580-
Self { min, max }
581-
}
582-
583-
fn update(&mut self, other: i64) {
584-
self.min = other.min(self.min);
585-
self.max = other.max(self.max);
586-
}
587-
588-
fn to_ts_min_max(&self) -> TimestampMinMax {
589-
// at this point min < max
590-
TimestampMinMax::new(self.min, self.max)
574+
persisted_snapshot.add_parquet_file(database_id, table_id, parquet_file)
591575
}
592576
}
593577

@@ -768,16 +752,176 @@ impl BufferState {
768752
#[derive(Debug)]
769753
struct PersistJob {
770754
database_id: DbId,
755+
database_name: Arc<str>,
771756
table_id: TableId,
772757
table_name: Arc<str>,
773758
chunk_time: i64,
774759
path: ParquetFilePath,
775-
batch: RecordBatch,
760+
batch: Vec<RecordBatch>,
776761
schema: Schema,
777762
timestamp_min_max: TimestampMinMax,
778763
sort_key: SortKey,
779764
}
780765

766+
struct PersistJobGroupedIterator<'a> {
767+
iter: Peekable<Iter<'a, PersistJob>>,
768+
host_prefix: Arc<str>,
769+
wal_file_number: WalFileSequenceNumber,
770+
catalog: Arc<Catalog>,
771+
chunk_size: usize,
772+
}
773+
774+
impl<'a> PersistJobGroupedIterator<'a> {
775+
fn new(
776+
data: &'a [PersistJob],
777+
host_prefix: Arc<str>,
778+
wal_file_number: WalFileSequenceNumber,
779+
catalog: Arc<Catalog>,
780+
chunk_size: usize,
781+
) -> Self {
782+
PersistJobGroupedIterator {
783+
iter: data.iter().peekable(),
784+
host_prefix: Arc::clone(&host_prefix),
785+
wal_file_number,
786+
catalog,
787+
chunk_size,
788+
}
789+
}
790+
}
791+
792+
impl Iterator for PersistJobGroupedIterator<'_> {
793+
// This is a grouped persist job, since it includes exactly
794+
// same fields with only difference being each job has a vec
795+
// of batches, it's been reused for now. For clarity it might
796+
// be better to have different types to represent this state
797+
type Item = PersistJob;
798+
799+
fn next(&mut self) -> Option<Self::Item> {
800+
let current_data = self.iter.next()?;
801+
let current_table_id = &current_data.table_id;
802+
803+
let mut ts_min_max = current_data.timestamp_min_max;
804+
805+
let mut all_batches = Vec::with_capacity(self.chunk_size);
806+
let mut all_schemas = Vec::with_capacity(self.chunk_size);
807+
all_batches.extend_from_slice(&current_data.batch);
808+
all_schemas.push(current_data.schema.clone());
809+
810+
let mut min_chunk_time = current_data.chunk_time;
811+
// currently this naively assumes all batches are the same
812+
// shape, but they may not be - in that case we should use
813+
// the most recent table defn to add null arrays for batches
814+
// with missing cols.
815+
while all_batches.len() < self.chunk_size {
816+
if let Some(next_data) = self.iter.peek() {
817+
if next_data.table_id == *current_table_id {
818+
let next = self.iter.next().unwrap();
819+
ts_min_max = ts_min_max.union(&next.timestamp_min_max);
820+
min_chunk_time = min_chunk_time.min(next.chunk_time);
821+
all_batches.extend_from_slice(&next.batch);
822+
all_schemas.push(next.schema.clone());
823+
} else {
824+
break;
825+
}
826+
} else {
827+
break;
828+
}
829+
}
830+
831+
// most recent table defn
832+
let table_defn = self
833+
.catalog
834+
.db_schema_by_id(&current_data.database_id)?
835+
.table_definition_by_id(&current_data.table_id)?;
836+
837+
let expected_schema = table_defn.schema.clone();
838+
let batches_with_schema_mismatch: Vec<(usize, RecordBatch)> = all_batches
839+
.iter()
840+
.cloned()
841+
.enumerate()
842+
// TODO: check if these are in order..
843+
.filter(|(idx, _)| {
844+
let schema = &all_schemas[*idx];
845+
for field_1 in expected_schema.iter() {
846+
let mut found_field = false;
847+
for field_2 in schema.iter() {
848+
if field_1.1.name() == field_2.1.name() {
849+
found_field = true;
850+
break;
851+
}
852+
}
853+
854+
if !found_field {
855+
return true;
856+
}
857+
}
858+
false
859+
})
860+
.collect();
861+
862+
if !batches_with_schema_mismatch.is_empty() {
863+
// we need to add the missing fields - as schema changes are additive, when there is
864+
// a mismatch it means new column has been added to table but the batches are missing
865+
// them.
866+
for (idx, batch) in &batches_with_schema_mismatch {
867+
let mut cols = vec![];
868+
let new_schema = &table_defn.schema;
869+
// pick it's current iox schema, to add the columns (making null for missing)
870+
let outdated_batch_schema = &all_schemas[*idx];
871+
debug!(
872+
?outdated_batch_schema,
873+
">>> outdated batch schema when aligning mismatched schema"
874+
);
875+
for col_idx_with_field_details in new_schema.iter().enumerate() {
876+
let (col_idx, (influx_col_type, field)) = col_idx_with_field_details;
877+
let batch_field = outdated_batch_schema.field_by_name(field.name());
878+
let len = batch.columns()[0].len();
879+
if batch_field.is_some() {
880+
let col = Arc::clone(&batch.columns()[col_idx]);
881+
cols.push(col);
882+
} else {
883+
let null_array_col = array_ref_nulls_for_type(influx_col_type, len);
884+
cols.push(null_array_col);
885+
}
886+
}
887+
888+
let new_arrow_schema = new_schema.as_arrow();
889+
debug!(
890+
?new_arrow_schema,
891+
">>> new arrow schema for batch when aligning mismatched schema"
892+
);
893+
let new_rec_batch = RecordBatch::try_new(new_arrow_schema, cols).expect(
894+
"record batch to be created with new schema after fixing schema mismatch",
895+
);
896+
897+
let _ = std::mem::replace(&mut all_batches[*idx], new_rec_batch);
898+
}
899+
}
900+
901+
Some(PersistJob {
902+
database_id: current_data.database_id,
903+
database_name: Arc::clone(&current_data.database_name),
904+
table_id: current_data.table_id,
905+
path: ParquetFilePath::new(
906+
&self.host_prefix,
907+
&current_data.database_name,
908+
current_data.database_id.as_u32(),
909+
&current_data.table_name,
910+
current_data.table_id.as_u32(),
911+
min_chunk_time,
912+
self.wal_file_number,
913+
None,
914+
),
915+
table_name: Arc::clone(&current_data.table_name),
916+
chunk_time: min_chunk_time,
917+
batch: all_batches,
918+
schema: current_data.schema.clone(),
919+
timestamp_min_max: ts_min_max,
920+
sort_key: current_data.sort_key.clone(),
921+
})
922+
}
923+
}
924+
781925
pub(crate) struct SortDedupePersistSummary {
782926
pub file_size_bytes: u64,
783927
pub file_meta_data: FileMetaData,
@@ -799,7 +943,7 @@ async fn sort_dedupe_persist(
799943
) -> Result<SortDedupePersistSummary, anyhow::Error> {
800944
// Dedupe and sort using the COMPACT query built into
801945
// iox_query
802-
let row_count = persist_job.batch.num_rows();
946+
let row_count = persist_job.batch.iter().map(|batch| batch.num_rows()).sum();
803947
info!(
804948
"Persisting {} rows for db id {} and table id {} and chunk {} to file {}",
805949
row_count,
@@ -818,7 +962,7 @@ async fn sort_dedupe_persist(
818962
);
819963

820964
let chunks: Vec<Arc<dyn QueryChunk>> = vec![Arc::new(BufferChunk {
821-
batches: vec![persist_job.batch],
965+
batches: persist_job.batch,
822966
schema: persist_job.schema.clone(),
823967
stats: Arc::new(chunk_stats),
824968
partition_id: TransitionPartitionId::from_parts(
@@ -904,7 +1048,7 @@ mod tests {
9041048
use parquet_file::storage::{ParquetStorage, StorageId};
9051049
use std::num::NonZeroUsize;
9061050

907-
#[tokio::test]
1051+
#[test_log::test(tokio::test)]
9081052
async fn snapshot_works_with_not_all_columns_in_buffer() {
9091053
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
9101054
let metrics = Arc::new(metric::Registry::default());

0 commit comments

Comments
 (0)