Skip to content

Conversation

@Alibaba-HZY
Copy link
Contributor

@Alibaba-HZY Alibaba-HZY commented Feb 18, 2025

[flink]

Purpose

Support partition pushdown in Flink connector
Linked issue: close #1711

Tests

com.alibaba.fluss.connector.flink.source.FlinkTableSourceITCase#testStreamingReadPartitionPushDown
com.alibaba.fluss.predicate.PredicateBuilderTest
com.alibaba.fluss.predicate.PredicateTest

API and Format

no

Documentation

no

@Alibaba-HZY Alibaba-HZY force-pushed the flink-support-patition-pushdown branch 2 times, most recently from 9a09bfe to 8d7cc2e Compare February 18, 2025 03:49
@wuchong wuchong added this to the v0.7 milestone Feb 26, 2025
@wuchong
Copy link
Member

wuchong commented Feb 26, 2025

@Alibaba-HZY Thanks for the contribution. We are reaching the code freeze deadline of v0.6. Therefore, I planned this issue into the next version. Besides, this pull request is quite huge. Splitting into multiple pull requests can speed up the review process.

@Alibaba-HZY
Copy link
Contributor Author

Alibaba-HZY commented Mar 3, 2025

@Alibaba-HZY感谢你的贡献。我们即将到达 v0.6 的代码冻结期限。因此,我将这个问题计划到了下一个版本。另外,这个 pull request 相当庞大。拆分成多个 pull request 可以加快审核速度。

ok i will submit 2pr.
first:Introduce Predicate #515
second:Support partition pushdown in Flink connector

@wuchong wuchong removed this from the v0.7 milestone Jun 18, 2025
remainingFilters.add(filter);
} else {
Predicate p = predicateOptional.get();
if (!p.visit(partitionPredicateVisitor)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name PartitionPredicateVisitor is quite strange. Based on its implementation semantics and usage, perhaps it should be called PartitionPredicateMatcher? (Although I checked and found that Apache Paimon also uses this naming.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the class renamed PartitionPredicateMatcher, then his ’visit‘ methods should be renamed to ‘match’?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose a public method match() to wrap the internal visit() method as a better approach. match() serves as the semantic interface, and visit() is its implementation.

@platinumhamburg
Copy link
Contributor

@Alibaba-HZY It seems the pull request branch is currently far behind the main branch, and several conflicts exist. Could you kindly rebase it from the main branch when possible? This will help us proceed with more thorough testing for this PR.

@platinumhamburg
Copy link
Contributor

@Alibaba-HZY I have tested this PR in several complex scenarios. Overall, the changes look good, but there are still a few minor issues that need attention:

  • When using partition filters with LIKE clauses such as "like 'xxx%'" or "like '%xxx%'", Flink throws an error: Caused by: java.io.NotSerializableException: com.alibaba.fluss.memory.MemorySegment.... This issue was straightforward to fix.
  • Currently, the test cases only cover a limited set of filter patterns. It would be better to add more comprehensive unit tests to cover the supported filtering scenarios.

@Alibaba-HZY
Copy link
Contributor Author

@Alibaba-HZY I have tested this PR in several complex scenarios. Overall, the changes look good, but there are still a few minor issues that need attention:

  • When using partition filters with LIKE clauses such as "like 'xxx%'" or "like '%xxx%'", Flink throws an error: Caused by: java.io.NotSerializableException: com.alibaba.fluss.memory.MemorySegment.... This issue was straightforward to fix.
  • Currently, the test cases only cover a limited set of filter patterns. It would be better to add more comprehensive unit tests to cover the supported filtering scenarios.

thank you for your test, i will fix thoes problems

}

@Test
void testStreamingReadWithCombinedFilters2() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between the test cases testStreamingReadWithCombinedFilters1 and testStreamingReadWithCombinedFilters2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i can combine thoese

@platinumhamburg
Copy link
Contributor

platinumhamburg commented Jul 8, 2025

This PR LGTM. I've tested it across several rounds in different production scenarios, and it works almost well. Once https://github.com/apache/fluss/pull/515/files is merged, this PR should be ready for final review by @wuchong after cleaning up the commits.

convertPartitionInfoToInternalRow(
partitionInfo)))
.collect(Collectors.toList());
LOG.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The listPartitions() will be called every 10 seconds (default setting) in streaming task, so this log will also be printed every 10 seconds, even if no new partitions are discovered. It will drown out other useful logs.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Alibaba-HZY , I added a commit to improve the code a bit.

Besides, please add UTs for the PredicateConverter, and try to support more data types if it is not complex since #1264 is merged.

@wuchong wuchong linked an issue Sep 16, 2025 that may be closed by this pull request
2 tasks
@wuchong wuchong changed the title [flink]Support partition pushdown in Flink connector [flink]Support partition pushdown for more filters in Flink connector Sep 16, 2025
@wuchong wuchong force-pushed the flink-support-patition-pushdown branch from 463e7d0 to f0e7af6 Compare September 16, 2025 16:31
@wuchong
Copy link
Member

wuchong commented Sep 21, 2025

@Alibaba-HZY, just checking in how’s the progress on this PR? Let me know if you need any help or if there’s anything I can do to move it forward!

@Alibaba-HZY
Copy link
Contributor Author

Alibaba-HZY commented Sep 21, 2025 via email

@Alibaba-HZY
Copy link
Contributor Author

just checking in how’s the progress on this PR? Let me know if you need any help or if there’s anything I can do to move it forward!

left comments in your dingding

@Alibaba-HZY Alibaba-HZY force-pushed the flink-support-patition-pushdown branch from f0e7af6 to 9489f6a Compare September 21, 2025 14:37
@wuchong wuchong force-pushed the flink-support-patition-pushdown branch from 2b155cc to 774051b Compare September 26, 2025 15:44
@wuchong
Copy link
Member

wuchong commented Sep 26, 2025

@Alibaba-HZY I updated the implementation to fix bugs and add documentation.

@wuchong wuchong force-pushed the flink-support-patition-pushdown branch from 774051b to c68b73d Compare September 26, 2025 15:57
@wuchong wuchong force-pushed the flink-support-patition-pushdown branch from c68b73d to 9fbfce4 Compare September 26, 2025 16:59
@wuchong
Copy link
Member

wuchong commented Sep 26, 2025

Merging...

@wuchong wuchong merged commit 680ca7e into apache:main Sep 26, 2025
6 checks passed
leosanqing pushed a commit to leosanqing/fluss that referenced this pull request Sep 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support partition pushdown for more filters

3 participants