-
Notifications
You must be signed in to change notification settings - Fork 458
Milestone
Description
Search before asking
- I searched in the issues and found nothing similar.
Fluss version
main (development)
Please describe the bug 🐞
When I follow the delta join docs, it raise an exception:
Sink(table=[fluss_catalog.my_db.snk], fields=[city_id, order_id, content, city_name], changelogMode=[NONE], duplicateChanges=[NONE])
+- Calc(select=[city_id, order_id, content, city_name], changelogMode=[I], duplicateChanges=[ALLOW])
+- Join(joinType=[InnerJoin], where=[=(city_id, city_id0)], select=[city_id, order_id, content, city_id0, city_name], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I], duplicateChanges=[ALLOW])
:- Exchange(distribution=[hash[city_id]], changelogMode=[I], duplicateChanges=[DISALLOW])
: +- TableSourceScan(table=[[fluss_catalog, my_db, left_src]], fields=[city_id, order_id, content], changelogMode=[I], duplicateChanges=[DISALLOW])
+- Exchange(distribution=[hash[city_id]], changelogMode=[I], duplicateChanges=[DISALLOW])
+- TableSourceScan(table=[[fluss_catalog, my_db, right_src]], fields=[city_id, city_name], changelogMode=[I], duplicateChanges=[DISALLOW])
at org.apache.flink.table.planner.plan.optimize.StreamPhysicalDeltaJoinForceValidator.validatePhysicalPlan(StreamPhysicalDeltaJoinForceValidator.java:66)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.postOptimize(StreamCommonSubGraphBasedOptimizer.scala:379)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:96)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:395)
at org.apache.flink.table.planner.delegation.PlannerBase.getExplainGraphs(PlannerBase.scala:637)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:100)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:47)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:835)
at org.apache.flink.table.api.internal.TableEnvironmentInternal.explainInternal(TableEnvironmentInternal.java:95)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1258)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:873)
Solution
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!
Metadata
Metadata
Assignees
Labels
No labels