-
Notifications
You must be signed in to change notification settings - Fork 458
[Client] Add Pojo Support #1992
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
FYI @wuchong .. This PR adds support for writing/scanning Pojos directly with the client, while keeping the API as is. When I find some time, I want to test the effect in terms of performance, between writing/scanning with Pojos vs InternalRows. |
fluss-client/src/main/java/org/apache/fluss/client/converter/PojoType.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/TypedLogScanner.java
Outdated
Show resolved
Hide resolved
|
@leekeiabstraction . Indeed, we are looking at roughly 2x performance penalty. This is basically a trade-off for the users. Using InternalRow/GenericRow directly is way more efficient; however, this might come with some extra complexity and boilerplatete code. For this reason, I want to give flexibility, probably leave the docs as is, with GenericRow being the go-to approach, but also add a section that Pojos can be used directly and maybe highlight this trade-off. Moving forward I'm thinking that maybe it makes sense to add some helper classes that also derive the schema for the table from a Pojo. |
d6722e7 to
24b65c2
Compare
|
Thank you @polyzos for addressing the commends and also providing the data from your tests! That gives a clear picture on the performance. Further response questions below:
IMO, this does not need to be a trade-off. I am curious if you have explored implementation complexity or cons around pushing the Pojo conversion down further so that conversion to/from
Trying to understand, does this relate to performance or a separate thread on further changes that you're planning? |
|
@leekeiabstraction yes, it's a separate thread. |
My aim is so that we can eliminate performance penalty by avoiding performing conversion twice. By "pushing conversion down", I mean you can do something like
Currently not, I'm very new to this code base but it's certainly worth exploring especially if we're also converting twice on the write side. Happy to have a look at the write side as well. |
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedAppendWriter.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TypedUpsertWriter.java
Outdated
Show resolved
Hide resolved
b955ff5 to
a274f5a
Compare
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @polyzos!
Overall, the pull request looks good. My only concern is the interface change. Could you please take a look at the new proposal?
Also, when you update the PR, please rebase your branch to keep the history clean.
fluss-client/src/main/java/org/apache/fluss/client/lookup/TypedLookuper.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookup.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/table/writer/Append.java
Outdated
Show resolved
Hide resolved
fluss-client/src/main/java/org/apache/fluss/client/table/writer/Upsert.java
Outdated
Show resolved
Hide resolved
|
@wuchong thank you for all your comments. If you check the test i have also ensured backwards compatibility ... do you mean something different, or am I missing something? |
|
@polyzos Yeah, I noticed those typed classes. However, typed classes are internal, only interfaces are visible to users. My suggestion is to introduce dedicated typed interfaces (e.g., While turning By keeping
This approach preserves backward compatibility, avoids IDE warnings, and aligns with how users actually interact with the API. |
# Conflicts: # fluss-client/src/main/java/org/apache/fluss/client/lookup/Lookuper.java # fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java # fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java
# Conflicts: # fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlussRowToFlinkRowConverterTest.java
f79319a to
3ea717c
Compare
|
@wuchong I made the required changes. |
|
Thank you @polyzos , I will take another look. |
wuchong
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @polyzos , I think this PR is already in a good shape. I left some minor comments.
fluss-client/src/main/java/org/apache/fluss/client/table/writer/TableAppend.java
Outdated
Show resolved
Hide resolved
| @Override | ||
| public <T> TypedUpsertWriter<T> createTypedWriter(Class<T> pojoClass) { | ||
| UpsertWriterImpl delegate = | ||
| new UpsertWriterImpl(tablePath, tableInfo, targetColumns, writerClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can simplify to just call createWriter().
| private final Class<T> pojoClass; | ||
| private final TableInfo tableInfo; | ||
| private final RowType tableSchema; | ||
| private final int[] targetColumns; // may be null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add @Nullable annotation to indicate it is nullable
| delegate.close(); | ||
| } | ||
|
|
||
| private final Class<T> pojoClass; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used, can be removed.
fluss-client/src/test/java/org/apache/fluss/client/table/FlussTypedClientITCase.java
Outdated
Show resolved
Hide resolved
|
|
||
| LookupResult lr = lookuper.lookup(new PLookupKey(1)).get(); | ||
| AllTypesPojo one = rowConv.fromRow(lr.getSingletonRow()); | ||
| assertThat(one.str).isEqualTo("s1"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertThat(one).isEqualTo(newAllTypesPojo(1));
After adding equals and hashcode method to the AllTypesPojo class, we can simply assert the full record, this can check the full POJO record deserialization.
| AllTypesPojo lookedUp = | ||
| rowConv.fromRow(lookuper.lookup(new PLookupKey(1)).get().getSingletonRow()); | ||
| assertThat(lookedUp.str).isEqualTo("second"); | ||
| assertThat(lookedUp.dec).isEqualByComparingTo("99.99"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to test the partial update feature, we should assert the other fields are keep unchanged here.
| TypedScanRecords<AllTypesPojo> recs = scanner.poll(Duration.ofSeconds(2)); | ||
| for (TypedScanRecord<AllTypesPojo> r : recs) { | ||
| if (r.getChangeType() == ChangeType.UPDATE_AFTER) { | ||
| assertThat(r.getValue().str).isEqualTo("second"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| public static final ScanRecords empty() { | ||
| return new ScanRecords(Collections.emptyMap()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary? I still prefer the preivous implementation because it can avoid some small object overhead (GC).
| default void close() throws Exception { | ||
| // by default do nothing | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems the newly introduced close() method doesn’t currently have any meaningful work to perform, as the writers don’t hold any resources at this stage. I suggest holding off on introducing it for now.
Typically, a close() method should either flush pending records or ensure all previously submitted requests have completed—otherwise, it may mislead users into expecting cleanup or finalization behavior that isn’t actually implemented. This introduces some complex to this PR.


This addresses #1731