Conversation
There was a problem hiding this comment.
Pull request overview
This WIP/demo PR experiments with running Sync Gateway’s cbgt-backed DCP in “one-shot” mode by providing end sequence numbers and wiring cbgt feed shutdown events back into the resync background process so it can terminate when all vbuckets are complete.
Changes:
- Switches
cbgtto a fork viago.modreplace and updatesgo.sumaccordingly. - Refactors DCP destination construction to use a
DCPDestOptionsstruct and threads one-shot end-seq handling into cbgt feed params and DCP dest/common logic. - Adds an
OnUnregisterFeedcallback path in sharded DCP manager event handlers and tracks completion in resync; updates resync tests/logging.
Reviewed changes
Copilot reviewed 9 out of 10 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| go.sum | Removes upstream cbgt sums and adds sums for the forked cbgt module. |
| go.mod | Adds a replace directive to point github.com/couchbase/cbgt at github.com/torcolvin/cbgt. |
| db/import_listener.go | Updates DCP dest creation to use base.DCPDestOptions. |
| db/background_mgr_resync_dcp.go | Adds sharded one-shot end-seq support, feed-unregister callback completion, and new run-state fields. |
| db/background_mgr_resync_dcp_test.go | Adjusts resync test behavior and enables debug logging (with leftover commented code). |
| base/util.go | Introduces a generic MutexMap helper used for completion tracking. |
| base/dcp_sharded.go | Extends sharded DCP options and cbgt manager event handlers to support unregister-feed callbacks and end-seq feed params. |
| base/dcp_feed_type.go | Extends cbgt feed params to include stop-after settings and adds GetHighSeqNos. |
| base/dcp_dest.go | Introduces DCPDestOptions and updates dest creation/signatures accordingly. |
| base/dcp_common.go | Threads one-shot end-seq handling into DCP common processing. |
| // TODO distributed resync: this value has to be serialized and evaluated in a distributed context. | ||
| completedVBs base.MutexMap[uint16, struct{}] | ||
| useXattrs bool |
| if distributed && base.UnitTestUrlIsWalrus() { | ||
| t.Skip("Distribute resync not supported for rosmar") | ||
| } | ||
| //base.SetUpTestLogging(t, base.LevelDebug, base.KeyDCP, base.KeyCRUD) |
adamcfraser
left a comment
There was a problem hiding this comment.
Generally looks fine, a few comments to look at.
| defer c.m.Unlock() | ||
|
|
||
| // Check the expected maximum sequence number when running a one shot feed. Do not checkpoint if the incoming | ||
| // sequence is greater than the expected maximum sequence number. |
There was a problem hiding this comment.
Can we do this check earlier (in dataUpdate) and also avoid callback processing for any sequences higher than endSeqNo?
| type SGDest interface { | ||
| cbgt.Dest | ||
| cbgt.DestEx | ||
| ForceCheckpointWrite() |
There was a problem hiding this comment.
As discussed, PR doesn't currently have any callers of ForceCheckpointWrite. Review whether that's intentional.
| // doneChan when all vBuckets have completed, which will allow the resync process to finish. | ||
| func (r *ResyncManagerDCP) getUnregisterFeedFunc(ctx context.Context, totalVBuckets uint16) base.CbgtUnregisterFeedCallback { | ||
| return func(feed cbgt.Feed) { | ||
| for _, d := range feed.Dests() { |
There was a problem hiding this comment.
As discussed, don't think we have a use case where our feed will have multiple Dests, but to be defensive can make this code just look for at least one SGDest to use for the ForceCheckpoint call (and exit the loop when it finds it)
adamcfraser
left a comment
There was a problem hiding this comment.
Changes look good - one minor question on handling the error case in shouldProcessSequence
| // DCP will provide mutations that run to the end of the snapshot that contains the end sequence number. | ||
| endSeq, ok := c.endSeqNos[vBucketID] | ||
| if !ok { | ||
| AssertfCtx(c.loggingCtx, "Received DCP event for vbno %d which is not tracked by the expected endSeqNos %#+v. This means that endSeqNos was specified with the incorrect number of vBuckets", vBucketID, c.endSeqNos) |
There was a problem hiding this comment.
In the case where the endSeq isn't found, it looks like the code will skip the mutation (endSeq will be zero for line 237). Would it be more defensive to return true in that scenario?
CBG-5184 use cbgt one shot mode
This does not yet work in multi node mode (expected) and all tests are not enabled. I'm working on why some of the tests fail or flake. I do not believe there will be major changes from the failing tests, so I am putting this up for review as is. Most tests do pass in distributed resync mode (where distributed resync is only run on a single node).
endSeqNoswith cbgt viasourceParams. This marks when cbgt will stop processing documents.DCPCommon.updateSeqto avoid a ERANGE error when re-opening the feed. https://github.com/couchbase/cbgt/blob/6e34ff79a0e6e97d5fb5b548c217a8a9d279a94b/feed_dcp_gocbcore.go#L1034ForceCheckpointWritewhen the feed shuts down to force the checkpoints to be written. This is really only helpful for tests, butDCPCommon.setMetaDatawill only be called on a 1 minute interval, so the last checkpoints are not set. For a test that makes sure that some documents are processed, and then resumed from that point, this test fails without this call.cbgt.EventHandler.OnUnregisterFeedis now used to discover when vBuckets are completed. This is tracked locally (for now, see TODO) and used to closedcpDoneChanto indicate when the DCP operations should finish. There are N feeds for N partitions used by cbgt. This function is called whenever a feed (set of vBuckets for a partition) completes. This is true on reaching the endSeqNos, or when unregistering a feed (normal shutdown, such as/db/resync?action=stop)Small fixes:
DCPDestOptionsto avoid having a another nil-able optino.ConsistencyLevel, this is type alias.dbExpVarsStatto be nil, especially in tests. I do not have a place for this stat yet in distributed resync.Pre-review checklist
fmt.Print,log.Print, ...)base.UD(docID),base.MD(dbName))docs/apiIntegration Tests