Skip to content

Fallback Implementation #34148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
Draft

Fallback Implementation #34148

wants to merge 11 commits into from

Conversation

parveensania
Copy link

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@parveensania parveensania changed the title test commit Fallback Implementation Apr 14, 2025
Copy link
Contributor

@m-trieu m-trieu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some minor updates needed

@parveensania parveensania requested review from scwhittle and m-trieu May 13, 2025 05:07
Copy link
Contributor

@m-trieu m-trieu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just the test then LGTM

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you merge in the removal of the throttling tracker and other changes?

synchronized (metadataLock) {
if (newWindmillEndpoints.version() < pendingMetadataVersion) {
if (newWindmillEndpoints.version() < pendingMetadataVersion
&& newWindmillEndpoints.endpointType() == activeEndpointType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, should be

(version < pending) || (version==pending && type=activeType)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't add the check version==pending here. As in the caller function pending version gets set to new endpoints version. So in this function for the most latest endpoints, pending version will always be equal to new endpoints version(irrespective of the enpoint type)

}
}

private WorkItemReceiver getWorkItemReceiver() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like the duplication here between this an SingleSourceWorkerHarness in SE mode.
Could this just create a SingleSourceWorkerHarness and start it?

It seems like it is just missing setBudget support, but perhaps that could be added to SingleSourceWorkerHarness? Or alternatively could we have some static method to construct this receiver that we share between the two classes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the class. PTAL

One thing I want clarification on, when close() is called, SSWH shuts down the executor service and workCommitter but does not call shutdown of getWorkStream explicitly. Should we be calling getWorkStream.shutdown() for graceful termination or it is irrelevant?

@scwhittle
Copy link
Contributor

@arunpandianp If you have cycles to help review that would be great

@@ -245,17 +247,34 @@ private StreamingDataflowWorker(
Consumer<PrintWriter> getDataStatusProvider;
Supplier<Long> currentActiveCommitBytesProvider;
ChannelCache channelCache = null;
WindmillStreamPool<GetDataStream> getDataStreamPool = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where this is used, I think you could just move the definition into the different branches.

if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) {
// Direct path pipelines.
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
channelCache = createChannelCache(options, configFetcher);
getDataStreamPool =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of these deps that are just for the fallback case I'm wondering if we can create lazily so that we don't startup threads etc unless we need to.

One maybe easy way to do that is replace all the ones that are only used in the fallback path with a Supplier and Suppliers.memoize

example
Supplier ... = Suppliers.memoize(...);
Supplier ... = Supppliers.memoize(()->new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool.get());

*/
@Internal
@ThreadSafe
@SuppressWarnings("unused")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be removed and just fixed?

streamingEngineStreamFactory.createGetWorkStream(
connection.currentStub(), getWorkRequest, workItemReceiver);
workStream.start();
this.getWorkStream = workStream;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems racy, it's not synchronized and we're passing this method to some other class to call which could be simultaneous with it's usage in setBudget

@arunpandianp
Copy link
Contributor

@arunpandianp If you have cycles to help review that would be great

on it.

@arunpandianp
Copy link
Contributor

arunpandianp commented Jun 27, 2025

To fallback, will it be simpler to shutdown the current streamingWorkerHarness, reinitialize it with SingleSourceWorkerHarness and call start?

void switchStreamingWorkerHarness(boolean directPath) {
if (directPath) {
  if(!(streamingWorkerHarness instanceof FanOutStreamingEngineWorkerHarness)) {
     streamingWorkerHarness.shutdown();
     streamingWorkerHarness = createFanOutStreamingEngineWorkerHarness();
     streamingWorkerHarness.start();
     return;
  } 
} else {
  if (!(streamingWorkerHarness instanceof SingleSourceWorkerHarness)) {
     streamingWorkerHarness.shutdown();
     streamingWorkerHarness = createSingleSourceWorkerHarness();
     streamingWorkerHarness.start();
     return;
  } 
}
}

The directpath boolean can be read from StreamingGlobalConfigHandle::getConfig().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants