Skip to content

Commit 662f58c

Browse files
committed
CBG-5184 add support for cbgt stopping after a sequence
1 parent a7706d4 commit 662f58c

10 files changed

Lines changed: 278 additions & 113 deletions

base/dcp_common.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -60,30 +60,26 @@ type DCPCommon struct {
6060
callback sgbucket.FeedEventCallbackFunc // Function to callback for mutation processing
6161
loggingCtx context.Context // Logging context, prefixes feedID
6262
checkpointPrefix string // DCP checkpoint key prefix
63+
endSeqNos map[uint16]uint64 // endSeqNos mark the sequence numbers keyed by vBucket ID that are the end sequence numbers for a stream
6364
}
6465

65-
// NewDCPCommon creates a new DCPCommon which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. It stores checkpoints in the metaStore starting with checkpointPrefix if persistCheckpoints is true.
66-
// Specific stats for DCP are stored in expvars rather than SgwStats.
66+
// NewDCPCommon creates a new DCPCommon instance which manages updates coming from a cbgt-based DCP feed.
6767
func NewDCPCommon(
6868
ctx context.Context,
69-
callback sgbucket.FeedEventCallbackFunc,
70-
metaStore DataStore,
71-
maxVbNo uint16,
72-
persistCheckpoints bool,
73-
dbStats *expvar.Map,
74-
checkpointPrefix string) (*DCPCommon, error) {
69+
opts DCPDestOptions) (*DCPCommon, error) {
7570

7671
c := &DCPCommon{
77-
dbStatsExpvars: dbStats,
78-
metaStore: metaStore,
79-
persistCheckpoints: persistCheckpoints,
80-
seqs: make([]uint64, maxVbNo),
81-
meta: make([][]byte, maxVbNo),
82-
updatesSinceCheckpoint: make([]uint64, maxVbNo),
83-
callback: callback,
84-
lastCheckpointTime: make([]time.Time, maxVbNo),
85-
checkpointPrefix: checkpointPrefix,
72+
dbStatsExpvars: opts.DcpStats,
73+
metaStore: opts.MetadataStore,
74+
persistCheckpoints: opts.PersistCheckpoints,
75+
seqs: make([]uint64, opts.MaxVbNo),
76+
meta: make([][]byte, opts.MaxVbNo),
77+
updatesSinceCheckpoint: make([]uint64, opts.MaxVbNo),
78+
callback: opts.Callback,
79+
lastCheckpointTime: make([]time.Time, opts.MaxVbNo),
80+
checkpointPrefix: opts.CheckpointPrefix,
8681
loggingCtx: ctx,
82+
endSeqNos: opts.EndSeqNos,
8783
}
8884

8985
return c, nil
@@ -151,7 +147,9 @@ func (c *DCPCommon) getMetaData(vbucketId uint16) (
151147
// rollbackEx is called when a DCP open stream issues a rollback. The metadata persisted for a given uuid and sequence number and stream reopening is deferred to cbgt via AutoReconnectAfterRollback feed parameter.
152148
func (c *DCPCommon) rollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64, rollbackMetaData []byte) error {
153149
InfofCtx(c.loggingCtx, KeyDCP, "DCP RollbackEx request - rolling back DCP feed for: vbucketId: %d, rollbackSeq: %x.", vbucketId, rollbackSeq)
154-
c.dbStatsExpvars.Add("dcp_rollback_count", 1)
150+
if c.dbStatsExpvars != nil {
151+
c.dbStatsExpvars.Add("dcp_rollback_count", 1)
152+
}
155153
c.updateSeq(vbucketId, rollbackSeq, false)
156154
err := c.setMetaData(vbucketId, rollbackMetaData, true)
157155
if err != nil {
@@ -226,6 +224,21 @@ func (c *DCPCommon) updateSeq(vbucketId uint16, seq uint64, warnOnLowerSeqNo boo
226224
c.m.Lock()
227225
defer c.m.Unlock()
228226

227+
// Check the expected maximum sequence number when running a one shot feed. Do not checkpoint if the incoming
228+
// sequence is greater than the expected maximum sequence number.
229+
//
230+
// DCP will provide mutations that run to the end of the snapshot that contains the end sequence number.
231+
if c.endSeqNos != nil {
232+
endSeq, ok := c.endSeqNos[vbucketId]
233+
if !ok {
234+
AssertfCtx(c.loggingCtx, "Received DCP event for vbno %d which is greater than endSeqNos we have for that vbno", vbucketId)
235+
}
236+
// Checkpoints
237+
if seq > endSeq {
238+
return
239+
}
240+
}
241+
229242
previousSequence := c.seqs[vbucketId]
230243

231244
if seq < previousSequence && warnOnLowerSeqNo == true {

base/dcp_dest.go

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ func init() {
3030
type SGDest interface {
3131
cbgt.Dest
3232
cbgt.DestEx
33+
ForceCheckpointWrite()
3334
}
3435

3536
// DCPDest implements SGDest (superset of cbgt.Dest) interface to manage updates coming from a
@@ -41,28 +42,32 @@ type DCPDest struct {
4142
metaInitComplete []bool // Whether metadata initialization has been completed, per vbNo
4243
}
4344

44-
// NewDCPDest creates a new DCPDest which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. If persistCheckpoints is true, stores checkpoints as documents in metadataStore starting with checkpointPrefix.
45-
// Specific stats for DCP are stored in expvars rather than SgwStats, except for partition stat, which indicates the number of cbgt partitions assigned to this node.
45+
type DCPDestOptions struct {
46+
Callback sgbucket.FeedEventCallbackFunc // Callback function receives DCP events.
47+
MetadataStore sgbucket.DataStore // location to store checkpoints in
48+
MaxVbNo uint16 // number of vBuckets for the backing bucket (does not have to match metadata store)
49+
PersistCheckpoints bool // if true, write checkpoints to MetadataStore with keys prefixed by CheckpointPrefix
50+
DcpStats *expvar.Map // Optional stats for dcp_rollback_count. This is not exposed by prometheus.
51+
PartitionStat *SgwIntStat // Optional stat for active partition count, to track cbgt partitions.
52+
CheckpointPrefix string // document prefix for checkpoint documents.
53+
EndSeqNos map[uint16]uint64 // If running a one shot DCP feed, these should match the end sequence numbers for each vbNo.
54+
}
55+
56+
// NewDCPDest creates a new DCPDest which manages updates coming from a cbgt-based DCP feed.
4657
// Each partition will have its own DCPDest object.
4758
func NewDCPDest(
4859
ctx context.Context,
49-
callback sgbucket.FeedEventCallbackFunc,
50-
metadataStore sgbucket.DataStore,
51-
maxVbNo uint16,
52-
persistCheckpoints bool,
53-
dcpStats *expvar.Map,
54-
partitionStat *SgwIntStat,
55-
checkpointPrefix string,
60+
opts DCPDestOptions,
5661
) (SGDest, error) {
57-
dcpCommon, err := NewDCPCommon(ctx, callback, metadataStore, maxVbNo, persistCheckpoints, dcpStats, checkpointPrefix)
62+
dcpCommon, err := NewDCPCommon(ctx, opts)
5863
if err != nil {
5964
return nil, err
6065
}
6166

6267
d := &DCPDest{
6368
DCPCommon: dcpCommon,
64-
partitionCountStat: partitionStat,
65-
metaInitComplete: make([]bool, maxVbNo),
69+
partitionCountStat: opts.PartitionStat,
70+
metaInitComplete: make([]bool, opts.MaxVbNo),
6671
}
6772

6873
if d.partitionCountStat != nil {
@@ -213,7 +218,7 @@ func (d *DCPDest) RollbackEx(partition string, vbucketUUID uint64, rollbackSeq u
213218

214219
// TODO: Not implemented, review potential usage
215220
func (d *DCPDest) ConsistencyWait(partition, partitionUUID string,
216-
consistencyLevel string, consistencySeq uint64, cancelCh <-chan bool) error {
221+
consistencyLevel cbgt.ConsistencyLevel, consistencySeq uint64, cancelCh <-chan bool) error {
217222
WarnfCtx(d.loggingCtx, "Dest.ConsistencyWait being invoked by cbgt - not supported by Sync Gateway")
218223
return nil
219224
}
@@ -234,6 +239,23 @@ func (d *DCPDest) Stats(io.Writer) error {
234239
return nil
235240
}
236241

242+
// ForceCheckpointWrite forces a write on all checkpoints.
243+
func (d *DCPDest) ForceCheckpointWrite() {
244+
for vbNo, init := range d.metaInitComplete {
245+
if init {
246+
value, _, err := d.getMetaData(uint16(vbNo))
247+
if err != nil {
248+
WarnfCtx(d.loggingCtx, "Could not retrieve metadata for vbNo %d during ForceCheckpointWrite: %v. Skipping checkpoint write", vbNo, err)
249+
continue
250+
}
251+
err = d.setMetaData(uint16(vbNo), value, true)
252+
if err != nil {
253+
WarnfCtx(d.loggingCtx, "Could not persist metadata for vbNo %d during ForceCheckpointWrite: %v. Skipping checkpoint write", vbNo, err)
254+
}
255+
}
256+
}
257+
}
258+
237259
func partitionToVbNo(ctx context.Context, partition string) uint16 {
238260
vbNo, err := strconv.Atoi(partition)
239261
if err != nil {
@@ -316,7 +338,7 @@ func (d *DCPLoggingDest) RollbackEx(partition string, vbucketUUID uint64, rollba
316338
}
317339

318340
func (d *DCPLoggingDest) ConsistencyWait(partition, partitionUUID string,
319-
consistencyLevel string, consistencySeq uint64, cancelCh <-chan bool) error {
341+
consistencyLevel cbgt.ConsistencyLevel, consistencySeq uint64, cancelCh <-chan bool) error {
320342
return d.dest.ConsistencyWait(partition, partitionUUID, consistencyLevel, consistencySeq, cancelCh)
321343
}
322344

@@ -332,3 +354,10 @@ func (d *DCPLoggingDest) Query(pindex *cbgt.PIndex, req []byte, w io.Writer,
332354
func (d *DCPLoggingDest) Stats(w io.Writer) error {
333355
return d.dest.Stats(w)
334356
}
357+
358+
func (d *DCPLoggingDest) ForceCheckpointWrite() {
359+
d.dest.ForceCheckpointWrite()
360+
}
361+
362+
var _ SGDest = &DCPDest{}
363+
var _ SGDest = &DCPLoggingDest{}

base/dcp_feed_type.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ func init() {
155155
// SGFeedSourceParams is a wrapper for cbgt's parameters.
156156
type SGFeedSourceParams struct {
157157
cbgt.DCPFeedParams
158+
cbgt.StopAfterSourceParams
158159

159160
// Used to pass the SG database name to SGFeed* shims
160161
DbName string `json:"sg_dbname,omitempty"`
@@ -165,20 +166,27 @@ type SGFeedIndexParams struct {
165166
DestKey string `json:"destKey,omitempty"`
166167
}
167168

168-
// cbgtFeedParams returns marshalled cbgt.DCPFeedParams as string, to be passed as feedparams during cbgt.Manager init.
169-
// Used to pass basic auth credentials and xattr flag to cbgt.
170-
func cbgtFeedParams(ctx context.Context, collections CollectionNames, dbName string) (string, error) {
169+
// cbgtFeedParams returns marshalled cbgt.DCPFeedParams as string. This contains information to for a given information , to be passed as feedparams during cbgt.Manager init.
170+
func cbgtFeedParams(ctx context.Context, opts ShardedDCPOptions) (string, error) {
171171
feedParams := &SGFeedSourceParams{
172-
DbName: dbName,
172+
DbName: opts.DBName,
173173
DCPFeedParams: cbgt.DCPFeedParams{
174174
AutoReconnectAfterRollback: true,
175175
IncludeXAttrs: true,
176176
},
177177
}
178-
if len(collections) > 1 {
179-
return "", RedactErrorf("cbgtFeedParams: multiple scopes not supported, got %v", MD(collections))
180-
} else if len(collections) > 0 {
181-
for s, c := range collections {
178+
if opts.EndSeqNos != nil {
179+
feedParams.StopAfterSourceParams.StopAfter = "markReached"
180+
seqMap := make(map[string]cbgt.UUIDSeq, len(opts.EndSeqNos))
181+
for vbNo, seqNo := range opts.EndSeqNos {
182+
seqMap[cbgtVbNoToPartition(vbNo)] = cbgt.UUIDSeq{Seq: seqNo}
183+
}
184+
feedParams.StopAfterSourceParams.MarkPartitionSeqs = seqMap
185+
}
186+
if len(opts.Collections) > 1 {
187+
return "", RedactErrorf("cbgtFeedParams: multiple scopes not supported, got %v", MD(opts.Collections))
188+
} else if len(opts.Collections) > 0 {
189+
for s, c := range opts.Collections {
182190
feedParams.Scope = s
183191
feedParams.Collections = c
184192
}
@@ -301,3 +309,23 @@ func getCbgtCredentials(dbName string) (cbgtCreds, bool) {
301309
cbgtGlobalsLock.Unlock() // cbgtCreds is not a pointer type, safe to unlock
302310
return creds, found
303311
}
312+
313+
// GetHighSeqNos retrieves the maximum sequence numbers for each vbucket.
314+
func GetHighSeqNos(ctx context.Context, bucket Bucket) (map[uint16]uint64, error) {
315+
b, err := AsGocbV2Bucket(bucket)
316+
if err != nil {
317+
return nil, fmt.Errorf("GetHighSeqNos: bucket is not a GocbV2Bucket: %w", err)
318+
}
319+
numVbuckets, err := bucket.GetMaxVbno(ctx)
320+
if err != nil {
321+
return nil, err
322+
}
323+
324+
// Note: extending to be per collection requires an enhancement in gocbcore to support 0x48 memcached command
325+
// for a non dcp agent, see gocbcore.DCPAgent.GetVBucketSeqNos
326+
_, highSeqNos, err := b.GetStatsVbSeqno(numVbuckets, true)
327+
if err != nil {
328+
return nil, fmt.Errorf("unable to obtain high seqnos: %w", err)
329+
}
330+
return highSeqNos, nil
331+
}

base/dcp_sharded.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ const (
3838
CBGTIndexTypeSyncGatewayResync = "syncGateway-resync"
3939
)
4040

41+
// CbgtUnregisterFeedCallback is the function invoked by cbgt when a DCP feed shuts down.
42+
type CbgtUnregisterFeedCallback func(cbgt.Feed)
43+
4144
type ShardedDCPFeedType string
4245

4346
const (
@@ -73,19 +76,21 @@ type CbgtContext struct {
7376

7477
// ShardedDCPOptions contains options for starting a DCP feed for cbgt.
7578
type ShardedDCPOptions struct {
76-
Bucket Bucket // bucket to target
77-
Cfg cbgt.Cfg // cbgt cfg used to coordinate cbgt documents
78-
Collections CollectionNames // collection names to target
79-
DBName string // database name is used to look up credentials
80-
DestKey string // key used in indexParams for this feed, used to look up the feed destination in cbgtDestFactories
81-
Heartbeater Heartbeater // heartbeater to use to find nodes
82-
IndexName string // cbgt.Manger.IndexName, used to uniquely identify the index
83-
IndexType string // cbgt.Manager.IndexType, matches name used by cbgt.RegisterPIndexImplType
84-
NumPartitions uint16 // number of cbgt partitions to use, if 0 will default to DefaultImportPartitions
85-
PreviousIndexName string // previous index name. If specified, marks IndexName as as a replacement for this name.
86-
UUID string // database uuid, used to uniquely identify nodes
87-
Datastore DataStore // datastore to perform KV ops
88-
FeedType ShardedDCPFeedType // type of sharded DCP feed to start
79+
Bucket Bucket // bucket to target
80+
Cfg cbgt.Cfg // cbgt cfg used to coordinate cbgt documents
81+
Collections CollectionNames // collection names to target
82+
DBName string // database name is used to look up credentials
83+
DestKey string // key used in indexParams for this feed, used to look up the feed destination in cbgtDestFactories
84+
Heartbeater Heartbeater // heartbeater to use to find nodes
85+
IndexName string // cbgt.Manger.IndexName, used to uniquely identify the index
86+
IndexType string // cbgt.Manager.IndexType, matches name used by cbgt.RegisterPIndexImplType
87+
NumPartitions uint16 // number of cbgt partitions to use, if 0 will default to DefaultImportPartitions
88+
PreviousIndexName string // previous index name. If specified, marks IndexName as as a replacement for this name.
89+
UUID string // database uuid, used to uniquely identify nodes
90+
Datastore DataStore // datastore to perform KV ops
91+
FeedType ShardedDCPFeedType // type of sharded DCP feed to start
92+
EndSeqNos map[uint16]uint64 // optional parameter indicating the end sequences to be used for running a one shot feed.
93+
UnregisterFeedCallback CbgtUnregisterFeedCallback // optional callback for cbgt.ManagerEventHandlers.OnUnregisterFeed
8994
}
9095

9196
// Validate makes sure that all options are specified.
@@ -166,7 +171,7 @@ func StartShardedDCPFeed(ctx context.Context, opts ShardedDCPOptions) (*CbgtCont
166171
return nil, fmt.Errorf("error asserting bucket as gocb v2 bucket: %w", err)
167172
}
168173

169-
cbgtContext, err := initCBGTManager(ctx, opts.Bucket, b.GetSpec(), opts.Cfg, opts.UUID, opts.DBName)
174+
cbgtContext, err := initCBGTManager(ctx, opts.Bucket, b.GetSpec(), opts.Cfg, opts.UUID, opts.DBName, opts.UnregisterFeedCallback)
170175
if err != nil {
171176
return nil, err
172177
}
@@ -221,7 +226,7 @@ func GenerateLegacyImportIndexName(dbName string) string {
221226
func createCBGTIndex(ctx context.Context, c *CbgtContext, opts ShardedDCPOptions) error {
222227
sourceType := SOURCE_DCP_SG
223228

224-
sourceParams, err := cbgtFeedParams(ctx, opts.Collections, opts.DBName)
229+
sourceParams, err := cbgtFeedParams(ctx, opts)
225230
if err != nil {
226231
return err
227232
}
@@ -322,7 +327,7 @@ func getCBGTIndexUUID(manager *cbgt.Manager, indexName string) (previousUUID str
322327
// createCBGTManager creates a new manager for a given bucket and bucketSpec
323328
// Inline comments below provide additional detail on how cbgt uses each manager
324329
// parameter, and the implications for SG
325-
func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG cbgt.Cfg, dbUUID string, dbName string) (*CbgtContext, error) {
330+
func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG cbgt.Cfg, dbUUID string, dbName string, unregisterCallback CbgtUnregisterFeedCallback) (*CbgtContext, error) {
326331
// uuid: Unique identifier for the node. Used to identify the node in the config.
327332
// Without UUID persistence across SG restarts, a restarted SG node relies on heartbeater to remove
328333
// the previous version of that node from the cfg, and assign pindexes to the new one.
@@ -376,7 +381,11 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
376381
dataDir := ""
377382

378383
eventHandlersCtx, eventHandlersCancel := context.WithCancelCause(ctx)
379-
eventHandlers := &sgMgrEventHandlers{ctx: eventHandlersCtx, ctxCancel: eventHandlersCancel}
384+
eventHandlers := &sgMgrEventHandlers{
385+
ctx: eventHandlersCtx,
386+
ctxCancel: eventHandlersCancel,
387+
unregisterFeedCallback: unregisterCallback,
388+
}
380389

381390
// Specify one feed per pindex
382391
options := make(map[string]string)
@@ -917,9 +926,10 @@ func GetDefaultImportPartitions(serverless bool) uint16 {
917926
}
918927

919928
type sgMgrEventHandlers struct {
920-
ctx context.Context
921-
ctxCancel context.CancelCauseFunc
922-
manager *cbgt.Manager
929+
ctx context.Context
930+
ctxCancel context.CancelCauseFunc
931+
manager *cbgt.Manager
932+
unregisterFeedCallback func(cbgt.Feed) // callback function for when a feed is unregistered. This occurs when a feed completes for any reason: Stop is requested, a rebalance of partitions, the feed naturally ends when reaching expected seequence numbers.
923933
}
924934

925935
func (meh *sgMgrEventHandlers) OnRefreshManagerOptions(options map[string]string) {
@@ -934,6 +944,14 @@ func (meh *sgMgrEventHandlers) OnUnregisterPIndex(pindex *cbgt.PIndex) {
934944
// No-op for SG
935945
}
936946

947+
// OnUnregisterFeed is called to indicate that a DCP feed has stopped. This will be called for normal and abnormal
948+
// terminations.
949+
func (meh *sgMgrEventHandlers) OnUnregisterFeed(feed cbgt.Feed) {
950+
if meh.unregisterFeedCallback != nil {
951+
meh.unregisterFeedCallback(feed)
952+
}
953+
}
954+
937955
// OnFeedError is required to trigger reconnection to a feed on a closed connection (EOF).
938956
// NotifyMgrOnClose will trigger cbgt closing and then attempt to reconnect to the feed, if the manager hasn't
939957
// been stopped.
@@ -969,3 +987,8 @@ func (meh *sgMgrEventHandlers) OnFeedError(_ string, r cbgt.Feed, feedErr error)
969987
dcpFeed.NotifyMgrOnClose()
970988
}
971989
}
990+
991+
// cbgtVbNoToPartition converts a vbucket number to a string partition name for cbgt.
992+
func cbgtVbNoToPartition(vbNo uint16) string {
993+
return fmt.Sprintf("%d", vbNo)
994+
}

base/dcp_sharded_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,13 +1575,12 @@ func TestShardedDCPCheckpointCleanup(t *testing.T) {
15751575
checkpointPrefix := "test_shared_dcp_checkpoint"
15761576
dest, err := NewDCPDest(
15771577
ctx,
1578-
nil,
1579-
metadataStore,
1580-
vBuckets,
1581-
true,
1582-
nil,
1583-
nil,
1584-
checkpointPrefix,
1578+
DCPDestOptions{
1579+
MetadataStore: metadataStore,
1580+
MaxVbNo: vBuckets,
1581+
PersistCheckpoints: true,
1582+
CheckpointPrefix: checkpointPrefix,
1583+
},
15851584
)
15861585
require.NoError(t, err)
15871586
dcpDest, ok := dest.(*DCPDest)

0 commit comments

Comments
 (0)