Skip to content

Conversation

@shunping
Copy link
Collaborator

@shunping shunping commented May 3, 2025

addresses #33815

I verified that the previous stuck sample code in #33815 (comment) and #33815 (comment) is working with this PR's changes.

@shunping shunping self-assigned this May 3, 2025
@shunping shunping changed the title Add periodic checkpointing during progress reporting. Add periodic checkpoint during progress reporting. May 3, 2025
@shunping shunping changed the title Add periodic checkpoint during progress reporting. Add checkpoint during progress reporting. May 3, 2025
@shunping shunping marked this pull request as ready for review May 3, 2025 04:33
@shunping shunping requested a review from lostluck May 3, 2025 04:33
continue progress
}
fraction = 0.5
} else if checkpointReady && unsplit {
Copy link
Collaborator Author

@shunping shunping May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should do this checkpointing for both bounded and unbounded cases.

@lostluck: WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intuition is telling me that we should not to add it for the bounded case. But the essence of Beam is to unify batch and streaming so it's probably fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'm worried about the situation where we're just oversplitting and forcing a checkpoint on a perfectly fine running Batch DoFn (processing efficiently and per element fast).

As it stands, this code looks like it will checkpoint a fast moving bundle after 1 second. (10 ticks, at 100ms per tick, since a fast moving bundle won't get a slower progress request rate).

But that's sort of wasteful. A fast moving bundle shouldn't be stopped. We might just want to only do this if the bundle is moving fast, but the input index isn't moving? Perhaps this is a reason why Dataflow has explicit Batch and Streaming modes of execution.

One almost wants to do it based on the number or amount of output data instead, in order to allow the watermark to progress.... But that would be much harder, and is overthinking it for now.

Copy link
Collaborator Author

@shunping shunping May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpointing is always a trade-off. In theory, we don't want to checkpoint too often to hurt performance, while we also want to checkpoint sufficiently enough so the hard work can be materialized and saved.

A fast moving bundle shouldn't be stopped.

I think we can consider using the checkpoint ticks AND the amount or rate of output data ("totalCount") as the criteria to identify a fast-moving bundle (thousands of events per tick) that lasts reasonably long. Instead of 1 second, we can change it to 10 (or even longer) seconds for example.

Even if it is fast moving, we may still want to checkpoint to make sure we don't need to repeat the previous 10-second work if something goes bad.

Copy link
Collaborator Author

@shunping shunping May 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we have bundle retrying implemented, we can adjust the threshold of checkpoint ticks to longer or shorter based on how often we see an error in the bundle, how long does it take to check point, etc.

continue progress
}
// Save residual roots for checkpoint. After checkpointing is successful,
// the bundle will be marked as finished and no residual roots will be
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a surprise. When a bundle finishes due to splitting with 0.0 fraction, no residual roots in the response. Is this by design?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is expected and by design.

Were you seeing errors due to returning the residuals early in the split response for checkpointing case?

There are a few different cases to think about, but they're aligned with the two FnAPI calls in question.

  1. Normal ProcessBundleResponse: Returns when the primary is completed, there are no residuals to worry about.

  2. Split Response + ProcessBundleResponse:
    The Split Response contains the confirmation of the primary (what the bundle will finish processing), and the residual that needs to be processed later. ProcessBundleResponse will not contain any residuals at this time, since they were already persisted by the split response (per the above).

  3. Self Checkpointed ProcessBundleResponse: This is when the DoFn itself returns a process continuation for a specific element (eg. Resume in 10s or similar). The Primary is by definition completed, but there may be residuals to process later. That's what's returned and scheduled.

You're seeing 2 in this case. We shouldn't need to do any additional residual handling and processing after the bundle is finished here. I'd be a bit concerned that there is a data duplication risk when doing it this way (the same residuals getting "returned" twice.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So my current code is correct then, since I am using the Residual from SplitResponse (rather than Residual from ProcessBundleResponse in the original code) to compute watermark and residual data outside of the progress for loop.

QQ: On case 2, what if we have a split at fraction 0.5? Prior to my change, I think the code is relying on Residual from ProcessBundleResponse to update the watermark. However isn't the residual empty there after we have a split response?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So residualRoots is set on line 260, but the ResigdualRoots is called again and processed and sent back to the EM on line 282.

That variable is never un-set after processing for bundles.

Then after the bundle finishes, the residual roots are only overridden If and only if the final bundle has residual roots already. Therefore the cached roots from the split response might be processed a second time, being sent to the EM as part of PersistBundle, which then also reschedules them. So it may duplicate the residual data.

IIUC the better fix is to handle the output watermark estimate in the em.ReturnResidual call. Right now it's only happening in PersistBundle.

PersistBundle call: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L905

ReturnResiduals call:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1023


Alternatively the MinOutputWatermarks for residuals is independent of the specific data, so it should also be valid to collect/update them with splits, but not persist them until the PersistBundle call. That matches closely what you have here (and what works), but without the duplicated data.

MinOutputWatermarks are collected here: https://github.com/apache/beam/pull/34828/files#diff-c799dce79559a70660d7abb42fcbff8455ba41452bd9483fc5c58dfcf156ee8cR343

Map is created just above currently: https://github.com/apache/beam/pull/34828/files#diff-c799dce79559a70660d7abb42fcbff8455ba41452bd9483fc5c58dfcf156ee8cR322

So we'd just need to ensure we don't have "stale" watermarks being persisted for this holding things back by accident.

@github-actions
Copy link
Contributor

github-actions bot commented May 3, 2025

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
panic("sdk returned empty residual application")
}
// TODO what happens to output watermarks on splits?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well at least i left a note about the output watermarks when I last touched this.

I think the "correct" thing to do here is to collect them and apply them accordingly with PersistBundle, instead of reprocessing the whole set of residuals later (which is how the phone is rendering it. I'll need to reread it).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I didn't quite follow the part where residual from split response is handled in the original code.

The latter part outside of the progress for loop makes more sense to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Estimated input elements bit is tricky since it's about how to estimate where to split for Unbounded SDFs and how big a bundle "is". I can't recall exactly why it ended up with the "filter out residuals that are before the end of data", cases... but apparently it had to do with timers?

https://github.com/apache/beam/blame/master/sdks/go/pkg/beam/runners/prism/internal/stage.go#L253

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @jrmccluskey

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @lostluck for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@shunping
Copy link
Collaborator Author

waiting on author

@shunping
Copy link
Collaborator Author

Assign that back to me as I need to make some more changes.

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @lostluck

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@shunping shunping marked this pull request as draft June 2, 2025 12:48
@shunping
Copy link
Collaborator Author

shunping commented Jun 2, 2025

Converting this back to draft for now as it needs some more thoughts.

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @jrmccluskey

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @lostluck for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @lostluck

@lostluck
Copy link
Contributor

waiting on author

(since the bot doesn't know how drafts work)

@derrickaw
Copy link
Collaborator

waiting on author

@shunping
Copy link
Collaborator Author

shunping commented Aug 7, 2025

It is a draft PR, and I don't have bandwidth to move forward with that recently. Feel free to close it if the bot is making too much noise.

@github-actions
Copy link
Contributor

Reminder, please take a look at this pr: @lostluck

@github-actions
Copy link
Contributor

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

@danowar2
Copy link

The place in the center

@shunping
Copy link
Collaborator Author

Closing this for now.

@shunping shunping closed this Aug 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants