diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ea543a9b07..834c2e2a59 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -164,5 +164,9 @@ required-features = ["datafusion"] name = "read_delta_partitions_test" required-features = ["datafusion"] +[[bench]] +name = "json_parsing" +harness = false + [package.metadata.cargo-machete] ignored = ["foyer"] diff --git a/crates/core/benches/json_parsing.rs b/crates/core/benches/json_parsing.rs new file mode 100644 index 0000000000..d699fc1743 --- /dev/null +++ b/crates/core/benches/json_parsing.rs @@ -0,0 +1,172 @@ +use bytes::Bytes; +use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput}; +use std::io::{BufRead, BufReader, Cursor}; +use std::time::Duration; +use tokio::runtime::Runtime; + +use deltalake_core::kernel::Action; +use deltalake_core::logstore::get_actions; +use deltalake_core::DeltaTableError; + +fn generate_commit_log_complex( + num_actions: usize, + with_stats: bool, + with_partition_values: bool, + with_deletion_vector: bool, +) -> Bytes { + let mut log_lines = Vec::new(); + + log_lines.push(r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string()); + log_lines.push(r#"{"commitInfo":{"timestamp":1234567890}}"#.to_string()); + + for i in 0..num_actions { + let mut add_json = format!( + r#"{{"path":"part-{:05}.parquet","size":{},"modificationTime":1234567890,"dataChange":true"#, + i, + 1000 + i * 100 + ); + + if with_partition_values { + add_json.push_str(r#","partitionValues":{"year":"2024","month":"10","day":"09"}"#); + } else { + add_json.push_str(r#","partitionValues":{}"#); + } + + if with_stats { + add_json.push_str(&format!( + r#","stats":"{{\"numRecords\":{},\"minValues\":{{\"id\":{},\"name\":\"aaa\",\"value\":{}.5}},\"maxValues\":{{\"id\":{},\"name\":\"zzz\",\"value\":{}.99}},\"nullCount\":{{\"id\":0,\"name\":0,\"value\":{}}}}}""#, + 1000 + i * 10, i, i, i + 1000, i + 1000, i % 10 + )); + } + + if with_deletion_vector { + add_json.push_str(r#","deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}"#); + } + + add_json.push_str("}"); + log_lines.push(format!(r#"{{"add":{}}}"#, add_json)); + } + + Bytes::from(log_lines.join("\n")) +} + +// Baseline implementation for comparison +// TODO: this is the version of the main branch for performance comparison +// Remove it after merging the PR +async fn get_actions_baseline( + version: i64, + commit_log_bytes: Bytes, +) -> Result, DeltaTableError> { + let reader = BufReader::new(Cursor::new(commit_log_bytes)); + + let mut actions = Vec::new(); + for re_line in reader.lines() { + let line = re_line?; + let lstr = line.as_str(); + let action = serde_json::from_str(lstr).map_err(|e| DeltaTableError::InvalidJsonLog { + json_err: e, + line, + version, + })?; + actions.push(action); + } + Ok(actions) +} + +fn bench_simple_actions(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("simple_actions_1000"); + group.throughput(Throughput::Elements(1000)); + group.sample_size(150); + group.measurement_time(Duration::from_secs(10)); + + let commit_log = generate_commit_log_complex(1000, false, false, false); + + group.bench_function("baseline", |b| { + b.iter(|| { + rt.block_on(async { + let result = get_actions_baseline(0, commit_log.clone()).await; + black_box(result.unwrap().len()) + }) + }); + }); + + group.bench_function("new version", |b| { + b.iter(|| { + rt.block_on(async { + let result = get_actions(0, &commit_log).await; + black_box(result.unwrap().len()) + }) + }); + }); + + group.finish(); +} + +fn bench_with_stats(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("with_stats_1000"); + group.throughput(Throughput::Elements(1000)); + group.sample_size(150); + group.measurement_time(Duration::from_secs(10)); + + let commit_log = generate_commit_log_complex(1000, true, false, false); + + group.bench_function("baseline", |b| { + b.iter(|| { + rt.block_on(async { + let result = get_actions_baseline(0, commit_log.clone()).await; + black_box(result.unwrap().len()) + }) + }); + }); + + group.bench_function("new version", |b| { + b.iter(|| { + rt.block_on(async { + let result = get_actions(0, &commit_log).await; + black_box(result.unwrap().len()) + }) + }); + }); + + group.finish(); +} + +fn bench_full_complexity(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + let mut group = c.benchmark_group("full_complexity_1000"); + group.throughput(Throughput::Elements(1000)); + group.sample_size(150); + group.measurement_time(Duration::from_secs(10)); + + let commit_log = generate_commit_log_complex(1000, true, true, true); + + group.bench_function("baseline", |b| { + b.iter(|| { + rt.block_on(async { + let result = get_actions_baseline(0, commit_log.clone()).await; + black_box(result.unwrap().len()) + }) + }); + }); + + group.bench_function("new version", |b| { + b.iter(|| { + rt.block_on(async { + let result = get_actions(0, &commit_log).await; + black_box(result.unwrap().len()) + }) + }); + }); + + group.finish(); +} + +criterion_group!( + benches, + bench_simple_actions, + bench_with_stats, + bench_full_complexity, +); +criterion_main!(benches); diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index b6b6b8617a..e20c2d1535 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -15,7 +15,6 @@ //! //! -use std::io::{BufRead, BufReader, Cursor}; use std::sync::{Arc, LazyLock}; use arrow::array::RecordBatch; @@ -37,6 +36,7 @@ use futures::stream::{once, BoxStream}; use futures::{StreamExt, TryStreamExt}; use object_store::path::Path; use object_store::ObjectStore; +use serde_json::Deserializer; use tokio::task::spawn_blocking; use super::{Action, CommitInfo, Metadata, Protocol}; @@ -323,9 +323,10 @@ impl Snapshot { let store = store.clone(); async move { let commit_log_bytes = store.get(&meta.location).await?.bytes().await?; - let reader = BufReader::new(Cursor::new(commit_log_bytes)); - for line in reader.lines() { - let action: Action = serde_json::from_str(line?.as_str())?; + + for result in Deserializer::from_slice(&commit_log_bytes).into_iter::() + { + let action = result?; if let Action::CommitInfo(commit_info) = action { return Ok::<_, DeltaTableError>(Some(commit_info)); } diff --git a/crates/core/src/kernel/transaction/conflict_checker.rs b/crates/core/src/kernel/transaction/conflict_checker.rs index 1a072db41b..30e27ac4b2 100644 --- a/crates/core/src/kernel/transaction/conflict_checker.rs +++ b/crates/core/src/kernel/transaction/conflict_checker.rs @@ -244,7 +244,7 @@ impl WinningCommitSummary { let commit_log_bytes = log_store.read_commit_entry(winning_commit_version).await?; match commit_log_bytes { Some(bytes) => { - let actions = get_actions(winning_commit_version, bytes).await?; + let actions = get_actions(winning_commit_version, &bytes).await?; let commit_info = actions .iter() .find(|action| matches!(action, Action::CommitInfo(_))) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index d505f0dfaa..27c89de3ac 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -48,7 +48,6 @@ //! ## Configuration //! use std::collections::HashMap; -use std::io::{BufRead, BufReader, Cursor}; use std::sync::{Arc, LazyLock}; use bytes::Bytes; @@ -68,6 +67,7 @@ use regex::Regex; use serde::de::{Error, SeqAccess, Visitor}; use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; +use serde_json::Deserializer; use tokio::runtime::RuntimeFlavor; use tokio::task::spawn_blocking; use tracing::{debug, error}; @@ -309,7 +309,7 @@ pub trait LogStore: Send + Sync + AsAny { Err(err) => Err(err), }?; - let actions = crate::logstore::get_actions(next_version, commit_log_bytes).await; + let actions = crate::logstore::get_actions(next_version, &commit_log_bytes).await; Ok(PeekCommit::New(next_version, actions?)) } @@ -566,23 +566,22 @@ pub fn to_uri(root: &Url, location: &Path) -> String { /// Reads a commit and gets list of actions pub async fn get_actions( version: i64, - commit_log_bytes: bytes::Bytes, + commit_log_bytes: &bytes::Bytes, ) -> Result, DeltaTableError> { debug!("parsing commit with version {version}..."); - let reader = BufReader::new(Cursor::new(commit_log_bytes)); - - let mut actions = Vec::new(); - for re_line in reader.lines() { - let line = re_line?; - let lstr = line.as_str(); - let action = serde_json::from_str(lstr).map_err(|e| DeltaTableError::InvalidJsonLog { - json_err: e, - line, - version, - })?; - actions.push(action); - } - Ok(actions) + Deserializer::from_slice(commit_log_bytes) + .into_iter::() + .map(|result| { + result.map_err(|e| { + let line = format!("Error at line {}, column {}", e.line(), e.column()); + DeltaTableError::InvalidJsonLog { + json_err: e, + line, + version, + } + }) + }) + .collect() } // TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders @@ -981,4 +980,27 @@ mod datafusion_tests { ); } } + + #[tokio::test] + async fn test_get_actions_malformed_json() { + let malformed_json = bytes::Bytes::from( + r#"{"add": {"path": "test.parquet", "partitionValues": {}, "size": 100, "modificationTime": 1234567890, "dataChange": true}} +{"invalid json without closing brace"#, + ); + + let result = get_actions(0, &malformed_json).await; + + match result { + Err(DeltaTableError::InvalidJsonLog { + line, + version, + json_err, + }) => { + assert_eq!(version, 0); + assert!(line.contains("line 2")); + assert!(json_err.is_eof()); + } + other => panic!("Expected InvalidJsonLog error, got {:?}", other), + } + } } diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 798b8abee1..ae5d2c6c51 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -128,7 +128,7 @@ impl CdfLoadBuilder { let ts = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH); for v in 0..self.snapshot.version() { if let Ok(Some(bytes)) = self.log_store.read_commit_entry(v).await { - if let Ok(actions) = get_actions(v, bytes).await { + if let Ok(actions) = get_actions(v, &bytes).await { if actions.iter().any(|action| { matches!(action, Action::CommitInfo(CommitInfo { timestamp: Some(t), .. @@ -209,7 +209,7 @@ impl CdfLoadBuilder { .ok_or(DeltaTableError::InvalidVersion(latest_version))?; let latest_version_actions: Vec = - get_actions(latest_version, latest_snapshot_bytes).await?; + get_actions(latest_version, &latest_snapshot_bytes).await?; let latest_version_commit = latest_version_actions .iter() .find(|a| matches!(a, Action::CommitInfo(_))); @@ -240,7 +240,7 @@ impl CdfLoadBuilder { .await? .ok_or(DeltaTableError::InvalidVersion(version)); - let version_actions: Vec = get_actions(version, snapshot_bytes?).await?; + let version_actions: Vec = get_actions(version, &snapshot_bytes?).await?; let mut ts = 0; let mut cdc_actions = vec![]; @@ -953,7 +953,7 @@ pub(crate) mod tests { .read_commit_entry(2) .await? .expect("failed to get snapshot bytes"); - let version_actions = get_actions(2, snapshot_bytes).await?; + let version_actions = get_actions(2, &snapshot_bytes).await?; let cdc_actions = version_actions .iter() diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 636a103b2b..7be7aece55 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1999,7 +1999,7 @@ mod tests { .await .unwrap() .expect("failed to get snapshot bytes"); - let actions = crate::logstore::get_actions(2, snapshot_bytes) + let actions = crate::logstore::get_actions(2, &snapshot_bytes) .await .unwrap(); @@ -2075,7 +2075,7 @@ mod tests { .await .unwrap() .expect("failed to get snapshot bytes"); - let actions = crate::logstore::get_actions(2, snapshot_bytes) + let actions = crate::logstore::get_actions(2, &snapshot_bytes) .await .unwrap(); @@ -2186,7 +2186,7 @@ mod tests { .await .unwrap() .expect("failed to get snapshot bytes"); - let actions = crate::logstore::get_actions(2, snapshot_bytes) + let actions = crate::logstore::get_actions(2, &snapshot_bytes) .await .unwrap(); diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 8e003014ab..a87c86e309 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -1672,7 +1672,7 @@ mod tests { .read_commit_entry(2) .await? .expect("failed to get snapshot bytes"); - let version_actions = get_actions(2, snapshot_bytes).await?; + let version_actions = get_actions(2, &snapshot_bytes).await?; let cdc_actions = version_actions .iter() @@ -1746,7 +1746,7 @@ mod tests { .read_commit_entry(2) .await? .expect("failed to get snapshot bytes"); - let version_actions = get_actions(2, snapshot_bytes).await?; + let version_actions = get_actions(2, &snapshot_bytes).await?; let cdc_actions = version_actions .iter() @@ -1847,7 +1847,7 @@ mod tests { .read_commit_entry(2) .await? .expect("failed to get snapshot bytes"); - let version_actions = get_actions(2, snapshot_bytes).await?; + let version_actions = get_actions(2, &snapshot_bytes).await?; let cdc_actions = version_actions .iter()