-
Couldn't load subscription status.
- Fork 118
[write] add Transaction with commit info and commit implementation #370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8c11dc0 to
740d112
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main apache/arrow-rs#370 +/- ##
==========================================
+ Coverage 78.94% 79.71% +0.76%
==========================================
Files 52 53 +1
Lines 10756 11316 +560
Branches 10756 11316 +560
==========================================
+ Hits 8491 9020 +529
Misses 1796 1796
- Partials 469 500 +31 ☔ View full report in Codecov by Sentry. |
f1dcda9 to
5779f06
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some style nits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flushing comments at EOD... will try to finish the review ASAP.
kernel/src/engine/default/json.rs
Outdated
| .put_opts( | ||
| &Path::from(path.path()), | ||
| buffer.into(), | ||
| object_store::PutMode::Create.into(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: Does object_store support the new S3 put-if-absent capability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good question! theres a recent issue but I haven't followed up much yet on it: https://github.com/apache/arrow-rs/issues/6285
kernel/src/engine/default/json.rs
Outdated
| fn write_json_file( | ||
| &self, | ||
| path: &url::Url, | ||
| data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, why does this need a lifetime specification?
It doesn't outlive the method call and isn't ever passed to any async code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also: can we get away with this as a workaround?
| data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>, | |
| data: impl Iterator<Item = Box<dyn EngineData>> + Send, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lifetime specification is since it defaults to 'static and we don't want to impose that requirement. seemed odd to me that it would default to static but didn't take too much time to dive into it yet. and secondarily I think if we use impl Iterator it causes the trait not to be object-safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Common 'static misconception strikes again :) I don't think it's much of a burden, just means it has to be a "real owned" type. given we're just gonna iterate the data here, what's the use case for object safety?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm seeing a compiler error that whenever we have impl Iterator: it breaks JsonHandler from being object-safe (which it must be since we use it as a trait object in Arc<dyn JsonHandler>
And you're right - didn't need the '_ at all :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing comments
kernel/src/engine/default/json.rs
Outdated
| fn write_json_file( | ||
| &self, | ||
| path: &url::Url, | ||
| data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send + '_>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Common 'static misconception strikes again :) I don't think it's much of a burden, just means it has to be a "real owned" type. given we're just gonna iterate the data here, what's the use case for object safety?
| static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| { | ||
| StructType::new([Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME)]).into() | ||
| }); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a u64? There is no chance of this being negative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't impl Schema on u64 - the LONG type is i64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a long time issue in that java doesn't have unsigned types. Somewhere back in our issue list is something about figuring this out :p
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may or may not need u64 support in general, but parquet timestamps are signed by definition:
In data annotated with the TIMESTAMP logical type, each value is a single int64 number
|
|
||
| #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
| #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
| fn get_log_schema() -> &'static SchemaRef { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this actually optional? Would a commit ever not have an associated action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is indeed optional according to the protocol. that is, we should never require it to be present when reading commit info
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, the protocol has nothing to say about any of this.
It just so happens that Delta-spark requires this field, along with our favorite operationParameters. It's NOT optional AFAICT?
(put another way -- it's optional from kernel-as-reader perspective, but if we want compat with Delta-spark then it's required from kernel-as-writer perspective)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, to be clear I mean that it is optional because the protocol says commit info can be anything.
since we don't really have a way of communicating reader/writer optionality I can just add a comment saying that this is optional since it is actually optional from a read/enforcement perspective but that kernel always writes it
| #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
| // TODO need to have a way to always write some fields but not always read them | ||
| struct CommitInfo { | ||
| /// The time this logical file was created, as milliseconds since the epoch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So to my questions above, are you using these options as nullable fields than truly optional ones?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option in schema-land is just a nullable field so I think the answer is yes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushing some comments.
| static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| { | ||
| StructType::new([Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME)]).into() | ||
| }); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a long time issue in that java doesn't have unsigned types. Somewhere back in our issue list is something about figuring this out :p
Co-authored-by: Nick Lanham <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! just one thing to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall approach looking good. Several questions, nits, and potential panic sites, tho.
| static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| { | ||
| StructType::new([Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME)]).into() | ||
| }); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may or may not need u64 support in general, but parquet timestamps are signed by definition:
In data annotated with the TIMESTAMP logical type, each value is a single int64 number
|
|
||
| #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] | ||
| #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] | ||
| fn get_log_schema() -> &'static SchemaRef { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, the protocol has nothing to say about any of this.
It just so happens that Delta-spark requires this field, along with our favorite operationParameters. It's NOT optional AFAICT?
(put another way -- it's optional from kernel-as-reader perspective, but if we want compat with Delta-spark then it's required from kernel-as-writer perspective)
|
|
||
| /// serialize an arrow RecordBatch to a JSON string by appending to a buffer. | ||
| // TODO (zach): this should stream data to the JSON writer and output an iterator. | ||
| pub(crate) fn to_json_bytes( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there was a comment thread before, recommending to rename this from write_json_bytes to to_json_bytes. But that seems to be based on current behavior rather than the final plan?
(tho I guess as pub(crate) we can change it easily enough when the time comes?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup probably advocate to just keep this to_json_bytes since that's what it does and we can rename/evolve in the future?
| const UNKNOWN_OPERATION: &str = "UNKNOWN"; | ||
|
|
||
| /// A transaction represents an in-progress write to a table. | ||
| pub struct Transaction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is commit info a required field, if it's passed as Option?
| let timestamp: i64 = SystemTime::now() | ||
| .duration_since(UNIX_EPOCH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamp manipulation code is starting to proliferate... and it's very manual/duplicated right now. Maybe it's time to start thinking about an internal API we can use for working with timestamp values?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup +1 I can create a follow-up. And I think it makes testing hard since we have places that actually use local machine time and we need to have deterministic tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR does 4 main things:
reorganizetransaction.rsso that the transaction action is now moved to actions moduleEDIT: now in add more tests for window::shift and handle boundary cases apache/arrow-rs#386
TransactionAPI which includes:a.
Table.new_transaction()to create a new transaction from the latest snapshot of the tableb.
Transaction.with_commit_info(engine_commit_info: Box<dyn EngineData>)to add single-row commit info in the form of amap<string, string>. required to commit.c.
Transaction.with_operation(operation: String)to set the operation name of the transaction (persisted in commit info)d.
Transaction.commit() // consumes transactionto commit the transaction to the log (currently only supporting committing the commit info)write_json_file(impl Iterator<Item = Box<dyn EngineData>>)(and a default engine implementation for this)write.rsto house many of our write tests as it's implementedresolves apache/arrow-rs#378