Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,9 @@ required-features = ["datafusion"]
name = "read_delta_partitions_test"
required-features = ["datafusion"]

[[bench]]
name = "json_parsing"
harness = false

[package.metadata.cargo-machete]
ignored = ["foyer"]
172 changes: 172 additions & 0 deletions crates/core/benches/json_parsing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
use bytes::Bytes;
use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughput};
use std::io::{BufRead, BufReader, Cursor};
use std::time::Duration;
use tokio::runtime::Runtime;

use deltalake_core::kernel::Action;
use deltalake_core::logstore::get_actions;
use deltalake_core::DeltaTableError;

fn generate_commit_log_complex(
num_actions: usize,
with_stats: bool,
with_partition_values: bool,
with_deletion_vector: bool,
) -> Bytes {
let mut log_lines = Vec::new();

log_lines.push(r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#.to_string());
log_lines.push(r#"{"commitInfo":{"timestamp":1234567890}}"#.to_string());

for i in 0..num_actions {
let mut add_json = format!(
r#"{{"path":"part-{:05}.parquet","size":{},"modificationTime":1234567890,"dataChange":true"#,
i,
1000 + i * 100
);

if with_partition_values {
add_json.push_str(r#","partitionValues":{"year":"2024","month":"10","day":"09"}"#);
} else {
add_json.push_str(r#","partitionValues":{}"#);
}

if with_stats {
add_json.push_str(&format!(
r#","stats":"{{\"numRecords\":{},\"minValues\":{{\"id\":{},\"name\":\"aaa\",\"value\":{}.5}},\"maxValues\":{{\"id\":{},\"name\":\"zzz\",\"value\":{}.99}},\"nullCount\":{{\"id\":0,\"name\":0,\"value\":{}}}}}""#,
1000 + i * 10, i, i, i + 1000, i + 1000, i % 10
));
}

if with_deletion_vector {
add_json.push_str(r#","deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}"#);
}

add_json.push_str("}");
log_lines.push(format!(r#"{{"add":{}}}"#, add_json));
}

Bytes::from(log_lines.join("\n"))
}

// Baseline implementation for comparison
// TODO: this is the version of the main branch for performance comparison
// Remove it after merging the PR
async fn get_actions_baseline(
version: i64,
commit_log_bytes: Bytes,
) -> Result<Vec<Action>, DeltaTableError> {
let reader = BufReader::new(Cursor::new(commit_log_bytes));

let mut actions = Vec::new();
for re_line in reader.lines() {
let line = re_line?;
let lstr = line.as_str();
let action = serde_json::from_str(lstr).map_err(|e| DeltaTableError::InvalidJsonLog {
json_err: e,
line,
version,
})?;
actions.push(action);
}
Ok(actions)
}

fn bench_simple_actions(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("simple_actions_1000");
group.throughput(Throughput::Elements(1000));
group.sample_size(150);
group.measurement_time(Duration::from_secs(10));

let commit_log = generate_commit_log_complex(1000, false, false, false);

group.bench_function("baseline", |b| {
b.iter(|| {
rt.block_on(async {
let result = get_actions_baseline(0, commit_log.clone()).await;
black_box(result.unwrap().len())
})
});
});

group.bench_function("new version", |b| {
b.iter(|| {
rt.block_on(async {
let result = get_actions(0, &commit_log).await;
black_box(result.unwrap().len())
})
});
});

group.finish();
}

fn bench_with_stats(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("with_stats_1000");
group.throughput(Throughput::Elements(1000));
group.sample_size(150);
group.measurement_time(Duration::from_secs(10));

let commit_log = generate_commit_log_complex(1000, true, false, false);

group.bench_function("baseline", |b| {
b.iter(|| {
rt.block_on(async {
let result = get_actions_baseline(0, commit_log.clone()).await;
black_box(result.unwrap().len())
})
});
});

group.bench_function("new version", |b| {
b.iter(|| {
rt.block_on(async {
let result = get_actions(0, &commit_log).await;
black_box(result.unwrap().len())
})
});
});

group.finish();
}

fn bench_full_complexity(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("full_complexity_1000");
group.throughput(Throughput::Elements(1000));
group.sample_size(150);
group.measurement_time(Duration::from_secs(10));

let commit_log = generate_commit_log_complex(1000, true, true, true);

group.bench_function("baseline", |b| {
b.iter(|| {
rt.block_on(async {
let result = get_actions_baseline(0, commit_log.clone()).await;
black_box(result.unwrap().len())
})
});
});

group.bench_function("new version", |b| {
b.iter(|| {
rt.block_on(async {
let result = get_actions(0, &commit_log).await;
black_box(result.unwrap().len())
})
});
});

group.finish();
}

criterion_group!(
benches,
bench_simple_actions,
bench_with_stats,
bench_full_complexity,
);
criterion_main!(benches);
9 changes: 5 additions & 4 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
//!
//!

use std::io::{BufRead, BufReader, Cursor};
use std::sync::{Arc, LazyLock};

use arrow::array::RecordBatch;
Expand All @@ -37,6 +36,7 @@ use futures::stream::{once, BoxStream};
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectStore;
use serde_json::Deserializer;
use tokio::task::spawn_blocking;

use super::{Action, CommitInfo, Metadata, Protocol};
Expand Down Expand Up @@ -323,9 +323,10 @@ impl Snapshot {
let store = store.clone();
async move {
let commit_log_bytes = store.get(&meta.location).await?.bytes().await?;
let reader = BufReader::new(Cursor::new(commit_log_bytes));
for line in reader.lines() {
let action: Action = serde_json::from_str(line?.as_str())?;

for result in Deserializer::from_slice(&commit_log_bytes).into_iter::<Action>()
{
let action = result?;
if let Action::CommitInfo(commit_info) = action {
return Ok::<_, DeltaTableError>(Some(commit_info));
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/transaction/conflict_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl WinningCommitSummary {
let commit_log_bytes = log_store.read_commit_entry(winning_commit_version).await?;
match commit_log_bytes {
Some(bytes) => {
let actions = get_actions(winning_commit_version, bytes).await?;
let actions = get_actions(winning_commit_version, &bytes).await?;
let commit_info = actions
.iter()
.find(|action| matches!(action, Action::CommitInfo(_)))
Expand Down
56 changes: 39 additions & 17 deletions crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
//! ## Configuration
//!
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Cursor};
use std::sync::{Arc, LazyLock};

use bytes::Bytes;
Expand All @@ -68,6 +67,7 @@ use regex::Regex;
use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
use tokio::runtime::RuntimeFlavor;
use tokio::task::spawn_blocking;
use tracing::{debug, error};
Expand Down Expand Up @@ -309,7 +309,7 @@ pub trait LogStore: Send + Sync + AsAny {
Err(err) => Err(err),
}?;

let actions = crate::logstore::get_actions(next_version, commit_log_bytes).await;
let actions = crate::logstore::get_actions(next_version, &commit_log_bytes).await;
Ok(PeekCommit::New(next_version, actions?))
}

Expand Down Expand Up @@ -566,23 +566,22 @@ pub fn to_uri(root: &Url, location: &Path) -> String {
/// Reads a commit and gets list of actions
pub async fn get_actions(
version: i64,
commit_log_bytes: bytes::Bytes,
commit_log_bytes: &bytes::Bytes,
) -> Result<Vec<Action>, DeltaTableError> {
Comment on lines 567 to 570
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this function is async. There's nothing async inside of it. Not your fault as the base function was also async, but probably some legacy tech debt? I can imagine there was a world where this function took an async bytes stream instead of all the bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per git blame, this function was implemented two years ago as async even though there were no async in it at any time.

Copy link
Collaborator Author

@fvaleye fvaleye Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I kept the function async.
Removing async would be a minor breaking change, as it would also require removing .await from the callers.
Let's see what @roeap and @rtyler think about this!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more generally speaking, I see most call sites disappearing short term, since log replay nor produces record batches that we extract data from (i.e. LogFileView et. al.) avoiding copies whenver possible.

Exception being calling this in commit_infos.

Sine we are passing in Bytes, I see no reason why we should be doing IO in this function, and with that also little reason for it to be async ... maybe in a follow-up we can make it sync.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's see when we have fully kernelized conflict resolution. There might be a few surprises lurking 😆.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's see when we have fully kernelized conflict resolution. There might be a few surprises lurking 😆.

Yay!
Let's keep it like this for now and make it sync later.
I will create an issue for tracking this need.

debug!("parsing commit with version {version}...");
let reader = BufReader::new(Cursor::new(commit_log_bytes));

let mut actions = Vec::new();
for re_line in reader.lines() {
let line = re_line?;
let lstr = line.as_str();
let action = serde_json::from_str(lstr).map_err(|e| DeltaTableError::InvalidJsonLog {
json_err: e,
line,
version,
})?;
actions.push(action);
}
Ok(actions)
Deserializer::from_slice(commit_log_bytes)
.into_iter::<Action>()
.map(|result| {
result.map_err(|e| {
let line = format!("Error at line {}, column {}", e.line(), e.column());
DeltaTableError::InvalidJsonLog {
json_err: e,
line,
version,
}
})
})
.collect()
}

// TODO: maybe a bit of a hack, required to `#[derive(Debug)]` for the operation builders
Expand Down Expand Up @@ -981,4 +980,27 @@ mod datafusion_tests {
);
}
}

#[tokio::test]
async fn test_get_actions_malformed_json() {
let malformed_json = bytes::Bytes::from(
r#"{"add": {"path": "test.parquet", "partitionValues": {}, "size": 100, "modificationTime": 1234567890, "dataChange": true}}
{"invalid json without closing brace"#,
);

let result = get_actions(0, &malformed_json).await;

match result {
Err(DeltaTableError::InvalidJsonLog {
line,
version,
json_err,
}) => {
assert_eq!(version, 0);
assert!(line.contains("line 2"));
assert!(json_err.is_eof());
}
other => panic!("Expected InvalidJsonLog error, got {:?}", other),
}
}
}
8 changes: 4 additions & 4 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl CdfLoadBuilder {
let ts = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
for v in 0..self.snapshot.version() {
if let Ok(Some(bytes)) = self.log_store.read_commit_entry(v).await {
if let Ok(actions) = get_actions(v, bytes).await {
if let Ok(actions) = get_actions(v, &bytes).await {
if actions.iter().any(|action| {
matches!(action, Action::CommitInfo(CommitInfo {
timestamp: Some(t), ..
Expand Down Expand Up @@ -186,7 +186,7 @@ impl CdfLoadBuilder {
.ok_or(DeltaTableError::InvalidVersion(latest_version))?;

let latest_version_actions: Vec<Action> =
get_actions(latest_version, latest_snapshot_bytes).await?;
get_actions(latest_version, &latest_snapshot_bytes).await?;
let latest_version_commit = latest_version_actions
.iter()
.find(|a| matches!(a, Action::CommitInfo(_)));
Expand Down Expand Up @@ -217,7 +217,7 @@ impl CdfLoadBuilder {
.await?
.ok_or(DeltaTableError::InvalidVersion(version));

let version_actions: Vec<Action> = get_actions(version, snapshot_bytes?).await?;
let version_actions: Vec<Action> = get_actions(version, &snapshot_bytes?).await?;

let mut ts = 0;
let mut cdc_actions = vec![];
Expand Down Expand Up @@ -930,7 +930,7 @@ pub(crate) mod tests {
.read_commit_entry(2)
.await?
.expect("failed to get snapshot bytes");
let version_actions = get_actions(2, snapshot_bytes).await?;
let version_actions = get_actions(2, &snapshot_bytes).await?;

let cdc_actions = version_actions
.iter()
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1999,7 +1999,7 @@ mod tests {
.await
.unwrap()
.expect("failed to get snapshot bytes");
let actions = crate::logstore::get_actions(2, snapshot_bytes)
let actions = crate::logstore::get_actions(2, &snapshot_bytes)
.await
.unwrap();

Expand Down Expand Up @@ -2075,7 +2075,7 @@ mod tests {
.await
.unwrap()
.expect("failed to get snapshot bytes");
let actions = crate::logstore::get_actions(2, snapshot_bytes)
let actions = crate::logstore::get_actions(2, &snapshot_bytes)
.await
.unwrap();

Expand Down Expand Up @@ -2186,7 +2186,7 @@ mod tests {
.await
.unwrap()
.expect("failed to get snapshot bytes");
let actions = crate::logstore::get_actions(2, snapshot_bytes)
let actions = crate::logstore::get_actions(2, &snapshot_bytes)
.await
.unwrap();

Expand Down
Loading
Loading