-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add checkpoint during progress reporting. #34828
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,10 +91,12 @@ type stage struct { | |
| baseProgTick atomic.Value // time.Duration | ||
| } | ||
|
|
||
| // The minimum and maximum durations between each ProgressBundleRequest and split evaluation. | ||
| const ( | ||
| // The minimum and maximum durations between each ProgressBundleRequest and split evaluation. | ||
| minimumProgTick = 100 * time.Millisecond | ||
| maximumProgTick = 30 * time.Second | ||
| // The number of ticks before triggering a checkpoint | ||
| checkpointTickCutoff = 10 | ||
| ) | ||
|
|
||
| func clampTick(dur time.Duration) time.Duration { | ||
|
|
@@ -177,6 +179,7 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c | |
| unsplit := true | ||
| baseTick := s.baseProgTick.Load().(time.Duration) | ||
| ticked := false | ||
| checkpointTickCount := 0 | ||
| progTick := time.NewTicker(baseTick) | ||
| defer progTick.Stop() | ||
| var dataFinished, bundleFinished bool | ||
|
|
@@ -186,6 +189,7 @@ func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, c | |
| dataFinished = true | ||
| } | ||
| var resp *fnpb.ProcessBundleResponse | ||
| var residualRoots []*fnpb.DelayedBundleApplication | ||
| progress: | ||
| for { | ||
| select { | ||
|
|
@@ -206,6 +210,7 @@ progress: | |
| } | ||
| case <-progTick.C: | ||
| ticked = true | ||
| checkpointTickCount += 1 | ||
| resp, err := b.Progress(ctx, wk) | ||
| if err != nil { | ||
| slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error()) | ||
|
|
@@ -218,58 +223,75 @@ progress: | |
| } | ||
| slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex) | ||
|
|
||
| var fraction float64 | ||
|
|
||
| // Check if there has been any measurable progress by the input, or all output pcollections since last report. | ||
| slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"] | ||
| checkpointReady := checkpointTickCount >= checkpointTickCutoff | ||
| if slow && unsplit { | ||
| slog.Debug("splitting report", "bundle", rb, "index", index) | ||
| sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */) | ||
| if err != nil { | ||
| slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) | ||
| break progress | ||
| } | ||
| if sr.GetChannelSplits() == nil { | ||
| slog.Debug("SDK returned no splits", "bundle", rb) | ||
| unsplit = false | ||
| continue progress | ||
| } | ||
| fraction = 0.5 | ||
| } else if checkpointReady && unsplit { | ||
| // splitting on 0.0 fraction to make a checkpoint | ||
| fraction = 0.0 | ||
| // reset tickCount after scheduling a checkpoint | ||
| checkpointTickCount = 0 | ||
| } else { | ||
| previousIndex = index["index"] | ||
| previousTotalCount = index["totalCount"] | ||
| continue progress | ||
| } | ||
|
|
||
| // TODO sort out rescheduling primary Roots on bundle failure. | ||
| var residuals []engine.Residual | ||
| for _, rr := range sr.GetResidualRoots() { | ||
| ba := rr.GetApplication() | ||
| residuals = append(residuals, engine.Residual{Element: ba.GetElement()}) | ||
| if len(ba.GetElement()) == 0 { | ||
| slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb)) | ||
| panic("sdk returned empty residual application") | ||
| } | ||
| // TODO what happens to output watermarks on splits? | ||
| } | ||
| if len(sr.GetChannelSplits()) != 1 { | ||
| slog.Warn("received non-single channel split", "bundle", rb) | ||
| } | ||
| cs := sr.GetChannelSplits()[0] | ||
| fr := cs.GetFirstResidualElement() | ||
| // The first residual can be after the end of data, so filter out those cases. | ||
| if b.EstimatedInputElements >= int(fr) { | ||
| b.EstimatedInputElements = int(fr) // Update the estimate for the next split. | ||
| // Split Residuals are returned right away for rescheduling. | ||
| em.ReturnResiduals(rb, int(fr), s.inputInfo, engine.Residuals{ | ||
| Data: residuals, | ||
| }) | ||
| // Do the split (fraction > 0) or checkpoint (fraction == 0) | ||
| slog.Debug("splitting report", "bundle", rb, "index", index) | ||
| sr, err := b.Split(ctx, wk, fraction /* fraction of remainder */, nil /* allowed splits */) | ||
| if err != nil { | ||
| slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error()) | ||
| break progress | ||
| } | ||
| if sr.GetChannelSplits() == nil { | ||
| slog.Debug("SDK returned no splits", "bundle", rb) | ||
| unsplit = false | ||
| continue progress | ||
| } | ||
| // Save residual roots for checkpoint. After checkpointing is successful, | ||
| // the bundle will be marked as finished and no residual roots will be | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is kind of a surprise. When a bundle finishes due to splitting with 0.0 fraction, no residual roots in the response. Is this by design?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is expected and by design. Were you seeing errors due to returning the residuals early in the split response for checkpointing case? There are a few different cases to think about, but they're aligned with the two FnAPI calls in question.
You're seeing 2 in this case. We shouldn't need to do any additional residual handling and processing after the bundle is finished here. I'd be a bit concerned that there is a data duplication risk when doing it this way (the same residuals getting "returned" twice.)
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. So my current code is correct then, since I am using the Residual from SplitResponse (rather than Residual from ProcessBundleResponse in the original code) to compute watermark and residual data outside of the progress for loop. QQ: On case 2, what if we have a split at fraction 0.5? Prior to my change, I think the code is relying on Residual from ProcessBundleResponse to update the watermark. However isn't the residual empty there after we have a split response?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So residualRoots is set on line 260, but the ResigdualRoots is called again and processed and sent back to the EM on line 282. That variable is never un-set after processing for bundles. Then after the bundle finishes, the residual roots are only overridden If and only if the final bundle has residual roots already. Therefore the cached roots from the split response might be processed a second time, being sent to the EM as part of PersistBundle, which then also reschedules them. So it may duplicate the residual data. IIUC the better fix is to handle the output watermark estimate in the em.ReturnResidual call. Right now it's only happening in PersistBundle. PersistBundle call: https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L905 ReturnResiduals call: Alternatively the MinOutputWatermarks for residuals is independent of the specific data, so it should also be valid to collect/update them with splits, but not persist them until the PersistBundle call. That matches closely what you have here (and what works), but without the duplicated data. MinOutputWatermarks are collected here: https://github.com/apache/beam/pull/34828/files#diff-c799dce79559a70660d7abb42fcbff8455ba41452bd9483fc5c58dfcf156ee8cR343 Map is created just above currently: https://github.com/apache/beam/pull/34828/files#diff-c799dce79559a70660d7abb42fcbff8455ba41452bd9483fc5c58dfcf156ee8cR322 So we'd just need to ensure we don't have "stale" watermarks being persisted for this holding things back by accident. |
||
| // returned in ProcessBundleResponse. | ||
| if fraction == 0 { | ||
| residualRoots = sr.GetResidualRoots() | ||
| } | ||
| // TODO sort out rescheduling primary Roots on bundle failure. | ||
| var residuals []engine.Residual | ||
| for _, rr := range sr.GetResidualRoots() { | ||
| ba := rr.GetApplication() | ||
| residuals = append(residuals, engine.Residual{Element: ba.GetElement()}) | ||
| if len(ba.GetElement()) == 0 { | ||
| slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb)) | ||
| panic("sdk returned empty residual application") | ||
| } | ||
| // TODO what happens to output watermarks on splits? | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well at least i left a note about the output watermarks when I last touched this. I think the "correct" thing to do here is to collect them and apply them accordingly with PersistBundle, instead of reprocessing the whole set of residuals later (which is how the phone is rendering it. I'll need to reread it).
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right. I didn't quite follow the part where residual from split response is handled in the original code. The latter part outside of the progress for loop makes more sense to me.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Estimated input elements bit is tricky since it's about how to estimate where to split for Unbounded SDFs and how big a bundle "is". I can't recall exactly why it ended up with the "filter out residuals that are before the end of data", cases... but apparently it had to do with timers? https://github.com/apache/beam/blame/master/sdks/go/pkg/beam/runners/prism/internal/stage.go#L253 |
||
| } | ||
| if len(sr.GetChannelSplits()) != 1 { | ||
| slog.Warn("received non-single channel split", "bundle", rb) | ||
| } | ||
| cs := sr.GetChannelSplits()[0] | ||
| fr := cs.GetFirstResidualElement() | ||
| // The first residual can be after the end of data, so filter out those cases. | ||
| if b.EstimatedInputElements >= int(fr) { | ||
| b.EstimatedInputElements = int(fr) // Update the estimate for the next split. | ||
| // Split Residuals are returned right away for rescheduling. | ||
| em.ReturnResiduals(rb, int(fr), s.inputInfo, engine.Residuals{ | ||
| Data: residuals, | ||
| }) | ||
| } | ||
|
|
||
| // Any split means we're processing slower than desired, but splitting should increase | ||
| // throughput. Back off for this and other bundles for this stage | ||
| baseTime := s.baseProgTick.Load().(time.Duration) | ||
| newTime := clampTick(baseTime * 4) | ||
| if s.baseProgTick.CompareAndSwap(baseTime, newTime) { | ||
| progTick.Reset(newTime) | ||
| } else { | ||
| progTick.Reset(s.baseProgTick.Load().(time.Duration)) | ||
| } | ||
| // Any split means we're processing slower than desired, but splitting should increase | ||
| // throughput. Back off for this and other bundles for this stage | ||
| baseTime := s.baseProgTick.Load().(time.Duration) | ||
| newTime := clampTick(baseTime * 4) | ||
| if s.baseProgTick.CompareAndSwap(baseTime, newTime) { | ||
| progTick.Reset(newTime) | ||
| } else { | ||
| previousIndex = index["index"] | ||
| previousTotalCount = index["totalCount"] | ||
| progTick.Reset(s.baseProgTick.Load().(time.Duration)) | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -290,11 +312,16 @@ progress: | |
| j.AddMetricShortIDs(md) | ||
| } | ||
|
|
||
| // Use residual roots from ProcessBundleResponse if any. | ||
| // Otherwise, use residual roots from ProcessBundleSplitResponse if a checkpoint occurs. | ||
| if len(resp.GetResidualRoots()) > 0 { | ||
| residualRoots = resp.GetResidualRoots() | ||
| } | ||
| // ProcessContinuation residuals are rescheduled after the specified delay. | ||
| residuals := engine.Residuals{ | ||
| MinOutputWatermarks: map[string]mtime.Time{}, | ||
| } | ||
| for _, rr := range resp.GetResidualRoots() { | ||
| for _, rr := range residualRoots { | ||
| ba := rr.GetApplication() | ||
| if len(ba.GetElement()) == 0 { | ||
| slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb)) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we should do this checkpointing for both bounded and unbounded cases.
@lostluck: WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intuition is telling me that we should not to add it for the bounded case. But the essence of Beam is to unify batch and streaming so it's probably fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I'm worried about the situation where we're just oversplitting and forcing a checkpoint on a perfectly fine running Batch DoFn (processing efficiently and per element fast).
As it stands, this code looks like it will checkpoint a fast moving bundle after 1 second. (10 ticks, at 100ms per tick, since a fast moving bundle won't get a slower progress request rate).
But that's sort of wasteful. A fast moving bundle shouldn't be stopped. We might just want to only do this if the bundle is moving fast, but the input index isn't moving? Perhaps this is a reason why Dataflow has explicit Batch and Streaming modes of execution.
One almost wants to do it based on the number or amount of output data instead, in order to allow the watermark to progress.... But that would be much harder, and is overthinking it for now.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checkpointing is always a trade-off. In theory, we don't want to checkpoint too often to hurt performance, while we also want to checkpoint sufficiently enough so the hard work can be materialized and saved.
I think we can consider using the checkpoint ticks AND the amount or rate of output data ("totalCount") as the criteria to identify a fast-moving bundle (thousands of events per tick) that lasts reasonably long. Instead of 1 second, we can change it to 10 (or even longer) seconds for example.
Even if it is fast moving, we may still want to checkpoint to make sure we don't need to repeat the previous 10-second work if something goes bad.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After we have bundle retrying implemented, we can adjust the threshold of checkpoint ticks to longer or shorter based on how often we see an error in the bundle, how long does it take to check point, etc.