Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51586][SS] initialize input partitions independent of columnar support in continuous mode #50348

Closed
wants to merge 1 commit into from

Conversation

vrozov
Copy link
Member

@vrozov vrozov commented Mar 21, 2025

What changes were proposed in this pull request?

Initialize input partitions independent of columnar support in ContinuousScanExec

Why are the changes needed?

After SPARK-45080 and PR 42823 Kafka continuous stream may go into infinite loop of reconfiguring. As KafkaScan columnar support mode is hardcoded to UNSUPPORTED ContinuousScanExec does not initialize inputPartitions in supportsColumnar as it did prior to the above PR. So, there is no call to KafkaContinuousStream.planInputPartitions during query planning. It leaves KafkaContinuousStream.knownPartitions uninitialized, so call to needsReconfiguration returns true. Now epochUpdateThread in ContinuousExecution requests interruption of queryExecutionThread, so that thread also can't initialize knownPartitions as it checks for interrupts in KafkaOffsetReaderAdmin.withRetries and exits before knownPartitions is assigned.

Does this PR introduce any user-facing change?

Yes, it fixes bug that may prevent kafka continuous stream from working properly

How was this patch tested?

Using existing Kafka tests. Those tests should complete faster.

Was this patch authored or co-authored using generative AI tooling?

No

@vrozov
Copy link
Member Author

vrozov commented Mar 21, 2025

@HeartSaVioR please review

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only a nit. Thanks for figuring this out!

@@ -535,7 +535,7 @@ private[kafka010] class KafkaOffsetReaderAdmin(
var attempt = 1
var lastException: Throwable = null
while (result.isEmpty && attempt <= maxOffsetFetchAttempts
&& !Thread.currentThread().isInterrupted) {
&& !Thread.currentThread.isInterrupted) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we not change the code which is not relevant? You can have a different PR to do that, but please find more cases and change altogether.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, it is too small to have a different PR and it is somewhat related to the Kafka issue. Let me know if you strongly prefer to revert this change.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to revert the change. I don't want to mess up the commit log. What you change is completely just a style issue and I don't want to give this line to be shown as "fixing bug".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, reverted. IMO, it will be good to fix the style anyway and enforce it.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is arguably not super clear style rule. Only "pure" parameterless method can remove the paren from the best practice of Scala (it's not that you can remove the paren as long as the method does not take any param), but there are arguments of what is "pure" method.

Here is the post which starts with very simple question got distracted with what is pure method.
https://users.scala-lang.org/t/paramterless-functions-with-and-without-parentheses-different/9939/3

In addition, Scala 3 will enforce the defined method with paren to "always" call with paren, regardless of parameterless, "pure" method, etcetc.

So I don't think this is an obvious style fix - someone could argue that we are going to break the style.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.scala-lang.org/scala3/reference/dropped-features/auto-apply.html: Thread.currentThread still will be the recommended way for Scala 3 as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latter is idiomatic Scala because it conforms to the uniform access principle. This principle states that one should be able to change an object member from a field to a non-side-effecting method and back without affecting clients that access the member. Consequently, Scala encourages to define such "property" methods without a () parameter list whereas side-effecting methods should be defined with it. Methods defined in Java cannot make this distinction; for them a () is always mandatory. So Scala fixes the problem on the client side, by allowing the parameterless references. But where Scala allows that freedom for all method references, Scala 3 restricts it to references of external methods that are not defined themselves in Scala 3.

If you see the origin thread, the definition of "pure" method is not super obvious; it's not something compiler would be able to catch and it's mostly on human's justification. This is just a homework Scala left to us and I don't really like the fact Scala makes such an ambiguity.

Continuing from there, I'd rather interpret this as Java is just excluded from the enforcement. It is still arguable what is "pure" method and everyone has every different opinions. Thread.currentThread is a native method, and it's not super obvious that there is no side-effect.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR HeartSaVioR changed the title [SPARK-51586][SS] initialize input partitions independent of columnar support [SPARK-51586][SS] initialize input partitions independent of columnar support in continuous mode Mar 25, 2025
@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 25, 2025

Thanks! Merging to master/4.0 (I guess we'd like to not hold the bug to another 6 months.)

HeartSaVioR pushed a commit that referenced this pull request Mar 25, 2025
… support in continuous mode

### What changes were proposed in this pull request?
Initialize input partitions independent of columnar support in `ContinuousScanExec`

### Why are the changes needed?
After [SPARK-45080](https://issues.apache.org/jira/browse/SPARK-45080) and [PR 42823](#42823) Kafka continuous stream may go into infinite loop of reconfiguring. As `KafkaScan` columnar support mode is hardcoded to `UNSUPPORTED` `ContinuousScanExec` does not initialize `inputPartitions` in `supportsColumnar` as it did prior to the above PR. So, there is no call to `KafkaContinuousStream.planInputPartitions` during query planning. It leaves `KafkaContinuousStream.knownPartitions` uninitialized, so call to `needsReconfiguration` returns `true`. Now `epochUpdateThread` in `ContinuousExecution` requests interruption of `queryExecutionThread`, so that thread also can't initialize `knownPartitions` as it checks for interrupts in `KafkaOffsetReaderAdmin.withRetries` and exits before `knownPartitions` is assigned.

### Does this PR introduce _any_ user-facing change?
Yes, it fixes bug that may prevent kafka continuous stream from working properly

### How was this patch tested?
Using existing Kafka tests. Those tests should complete faster.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #50348 from vrozov/SPARK-51586.

Authored-by: Vlad Rozov <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 11d1409)
Signed-off-by: Jungtaek Lim <[email protected]>
@vrozov vrozov deleted the SPARK-51586 branch March 25, 2025 15:26
SauronShepherd pushed a commit to SauronShepherd/spark that referenced this pull request Mar 25, 2025
… support in continuous mode

### What changes were proposed in this pull request?
Initialize input partitions independent of columnar support in `ContinuousScanExec`

### Why are the changes needed?
After [SPARK-45080](https://issues.apache.org/jira/browse/SPARK-45080) and [PR 42823](apache#42823) Kafka continuous stream may go into infinite loop of reconfiguring. As `KafkaScan` columnar support mode is hardcoded to `UNSUPPORTED` `ContinuousScanExec` does not initialize `inputPartitions` in `supportsColumnar` as it did prior to the above PR. So, there is no call to `KafkaContinuousStream.planInputPartitions` during query planning. It leaves `KafkaContinuousStream.knownPartitions` uninitialized, so call to `needsReconfiguration` returns `true`. Now `epochUpdateThread` in `ContinuousExecution` requests interruption of `queryExecutionThread`, so that thread also can't initialize `knownPartitions` as it checks for interrupts in `KafkaOffsetReaderAdmin.withRetries` and exits before `knownPartitions` is assigned.

### Does this PR introduce _any_ user-facing change?
Yes, it fixes bug that may prevent kafka continuous stream from working properly

### How was this patch tested?
Using existing Kafka tests. Those tests should complete faster.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#50348 from vrozov/SPARK-51586.

Authored-by: Vlad Rozov <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants