Skip to content

[FLINK-34566] Pass a FixedThreadPool to set reconciliation parallelism correctly #790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 8, 2024

Conversation

ysymi
Copy link

@ysymi ysymi commented Mar 5, 2024

What is the purpose of the change

This pull request pass a fixed size thread pool to JOSDK, so that we can set a large reconciliation parallelism as we except .

Brief change log

  • set a fixed size thread pool as reconciliation executor in JOSDK configuration

Verifying this change

This change added tests and can be verified as follows:

org.apache.flink.kubernetes.operator.FlinkOperatorTest#testConfigurationPassedToJOSDK:

  • *Extended check logic about reconciliation thread pool creation *

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API, i.e., is any changes to the CustomResourceDescriptors: ( no)
  • Core observer or reconciler logic that is regularly executed: ( yes)

Documentation

  • Does this pull request introduce a new feature? ( no)

Comment on lines +77 to +86
for (int i = 0; i < testParallelism * 2; i++) {
threadPoolExecutor.execute(
() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we expect to test here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to test whether the number of threads in the executor can reach our expected maximumPoolSize when the executor receives enough tasks.
see assertion in line 87

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I think code in lines 78 to 85 was ugly, any good suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could do a more functional test that will hold even if the service implementation changes.

  1. Set a smaller parallelism to not use soo many threads
  2. For 2x the parallelism like you did we execute:
  • Increment atomic counter
  • Sleep for long (lets say 10sec)
  1. Assert the counter == 2x parallelism

Wdyt? It would be good to test that the test actually fails before the change

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean "increase counter and sleep for long in every task" ? if yes, "Assert the counter == 2x parallelism" will always fail even if we set a fixed thread pool(what we want to change)
I think check pool size (which equals really thread number)is functional enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine, by me for now

@gyfora gyfora merged commit 726e484 into apache:main Mar 8, 2024
131 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants