Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions base/dcp_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,26 @@ type DCPCommon struct {
callback sgbucket.FeedEventCallbackFunc // Function to callback for mutation processing
loggingCtx context.Context // Logging context, prefixes feedID
checkpointPrefix string // DCP checkpoint key prefix
endSeqNos map[uint16]uint64 // endSeqNos mark the sequence numbers keyed by vBucket ID that are the end sequence numbers for a stream
}

// NewDCPCommon creates a new DCPCommon which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. It stores checkpoints in the metaStore starting with checkpointPrefix if persistCheckpoints is true.
// Specific stats for DCP are stored in expvars rather than SgwStats.
// NewDCPCommon creates a new DCPCommon instance which manages updates coming from a cbgt-based DCP feed.
func NewDCPCommon(
ctx context.Context,
callback sgbucket.FeedEventCallbackFunc,
metaStore DataStore,
maxVbNo uint16,
persistCheckpoints bool,
dbStats *expvar.Map,
checkpointPrefix string) (*DCPCommon, error) {
opts DCPDestOptions) (*DCPCommon, error) {

c := &DCPCommon{
dbStatsExpvars: dbStats,
metaStore: metaStore,
persistCheckpoints: persistCheckpoints,
seqs: make([]uint64, maxVbNo),
meta: make([][]byte, maxVbNo),
updatesSinceCheckpoint: make([]uint64, maxVbNo),
callback: callback,
lastCheckpointTime: make([]time.Time, maxVbNo),
checkpointPrefix: checkpointPrefix,
dbStatsExpvars: opts.DcpStats,
metaStore: opts.MetadataStore,
persistCheckpoints: opts.PersistCheckpoints,
seqs: make([]uint64, opts.MaxVbNo),
meta: make([][]byte, opts.MaxVbNo),
updatesSinceCheckpoint: make([]uint64, opts.MaxVbNo),
callback: opts.Callback,
lastCheckpointTime: make([]time.Time, opts.MaxVbNo),
checkpointPrefix: opts.CheckpointPrefix,
loggingCtx: ctx,
endSeqNos: opts.EndSeqNos,
}

return c, nil
Expand Down Expand Up @@ -151,7 +147,9 @@ func (c *DCPCommon) getMetaData(vbucketId uint16) (
// rollbackEx is called when a DCP open stream issues a rollback. The metadata persisted for a given uuid and sequence number and stream reopening is deferred to cbgt via AutoReconnectAfterRollback feed parameter.
func (c *DCPCommon) rollbackEx(vbucketId uint16, vbucketUUID uint64, rollbackSeq uint64, rollbackMetaData []byte) error {
InfofCtx(c.loggingCtx, KeyDCP, "DCP RollbackEx request - rolling back DCP feed for: vbucketId: %d, rollbackSeq: %x.", vbucketId, rollbackSeq)
c.dbStatsExpvars.Add("dcp_rollback_count", 1)
if c.dbStatsExpvars != nil {
c.dbStatsExpvars.Add("dcp_rollback_count", 1)
}
c.updateSeq(vbucketId, rollbackSeq, false)
err := c.setMetaData(vbucketId, rollbackMetaData, true)
if err != nil {
Expand Down Expand Up @@ -226,6 +224,19 @@ func (c *DCPCommon) updateSeq(vbucketId uint16, seq uint64, warnOnLowerSeqNo boo
c.m.Lock()
defer c.m.Unlock()

// Check the expected maximum sequence number when running a one shot feed. Do not checkpoint if the incoming
// sequence is greater than the expected maximum sequence number.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this check earlier (in dataUpdate) and also avoid callback processing for any sequences higher than endSeqNo?

//
// DCP will provide mutations that run to the end of the snapshot that contains the end sequence number.
if c.endSeqNos != nil {
endSeq, ok := c.endSeqNos[vbucketId]
if !ok {
AssertfCtx(c.loggingCtx, "Received DCP event for vbno %d which is not tracked by the expected endSeqNos %#+v. This means that endSeqNos was specified with the incorrect number of vBuckets", vbucketId, c.endSeqNos)
} else if seq > endSeq {
return
}
}

previousSequence := c.seqs[vbucketId]

if seq < previousSequence && warnOnLowerSeqNo == true {
Expand Down
57 changes: 43 additions & 14 deletions base/dcp_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func init() {
type SGDest interface {
cbgt.Dest
cbgt.DestEx
ForceCheckpointWrite()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, PR doesn't currently have any callers of ForceCheckpointWrite. Review whether that's intentional.

}

// DCPDest implements SGDest (superset of cbgt.Dest) interface to manage updates coming from a
Expand All @@ -41,28 +42,32 @@ type DCPDest struct {
metaInitComplete []bool // Whether metadata initialization has been completed, per vbNo
}

// NewDCPDest creates a new DCPDest which manages updates coming from a cbgt-based DCP feed. The callback function will receive events from a DCP feed. If persistCheckpoints is true, stores checkpoints as documents in metadataStore starting with checkpointPrefix.
// Specific stats for DCP are stored in expvars rather than SgwStats, except for partition stat, which indicates the number of cbgt partitions assigned to this node.
type DCPDestOptions struct {
Callback sgbucket.FeedEventCallbackFunc // Callback function receives DCP events.
MetadataStore sgbucket.DataStore // location to store checkpoints in
MaxVbNo uint16 // number of vBuckets for the backing bucket (does not have to match metadata store)
PersistCheckpoints bool // if true, write checkpoints to MetadataStore with keys prefixed by CheckpointPrefix
DcpStats *expvar.Map // Optional stats for dcp_rollback_count. This is not exposed by prometheus.
PartitionStat *SgwIntStat // Optional stat for active partition count, to track cbgt partitions.
CheckpointPrefix string // document prefix for checkpoint documents.
EndSeqNos map[uint16]uint64 // If running a one shot DCP feed, these should match the end sequence numbers for each vbNo.
}

// NewDCPDest creates a new DCPDest which manages updates coming from a cbgt-based DCP feed.
// Each partition will have its own DCPDest object.
func NewDCPDest(
ctx context.Context,
callback sgbucket.FeedEventCallbackFunc,
metadataStore sgbucket.DataStore,
maxVbNo uint16,
persistCheckpoints bool,
dcpStats *expvar.Map,
partitionStat *SgwIntStat,
checkpointPrefix string,
opts DCPDestOptions,
) (SGDest, error) {
Comment thread
torcolvin marked this conversation as resolved.
dcpCommon, err := NewDCPCommon(ctx, callback, metadataStore, maxVbNo, persistCheckpoints, dcpStats, checkpointPrefix)
dcpCommon, err := NewDCPCommon(ctx, opts)
if err != nil {
return nil, err
}

d := &DCPDest{
DCPCommon: dcpCommon,
partitionCountStat: partitionStat,
metaInitComplete: make([]bool, maxVbNo),
partitionCountStat: opts.PartitionStat,
metaInitComplete: make([]bool, opts.MaxVbNo),
}

if d.partitionCountStat != nil {
Expand Down Expand Up @@ -213,7 +218,7 @@ func (d *DCPDest) RollbackEx(partition string, vbucketUUID uint64, rollbackSeq u

// TODO: Not implemented, review potential usage
func (d *DCPDest) ConsistencyWait(partition, partitionUUID string,
consistencyLevel string, consistencySeq uint64, cancelCh <-chan bool) error {
consistencyLevel cbgt.ConsistencyLevel, consistencySeq uint64, cancelCh <-chan bool) error {
WarnfCtx(d.loggingCtx, "Dest.ConsistencyWait being invoked by cbgt - not supported by Sync Gateway")
return nil
}
Expand All @@ -234,6 +239,23 @@ func (d *DCPDest) Stats(io.Writer) error {
return nil
}

// ForceCheckpointWrite forces a write on all checkpoints.
func (d *DCPDest) ForceCheckpointWrite() {
for vbNo, init := range d.metaInitComplete {
if init {
value, _, err := d.getMetaData(uint16(vbNo))
if err != nil {
WarnfCtx(d.loggingCtx, "Could not retrieve metadata for vbNo %d during ForceCheckpointWrite: %v. Skipping checkpoint write", vbNo, err)
continue
}
err = d.setMetaData(uint16(vbNo), value, true)
if err != nil {
WarnfCtx(d.loggingCtx, "Could not persist metadata for vbNo %d during ForceCheckpointWrite: %v. Skipping checkpoint write", vbNo, err)
}
}
}
}

func partitionToVbNo(ctx context.Context, partition string) uint16 {
vbNo, err := strconv.Atoi(partition)
if err != nil {
Expand Down Expand Up @@ -316,7 +338,7 @@ func (d *DCPLoggingDest) RollbackEx(partition string, vbucketUUID uint64, rollba
}

func (d *DCPLoggingDest) ConsistencyWait(partition, partitionUUID string,
consistencyLevel string, consistencySeq uint64, cancelCh <-chan bool) error {
consistencyLevel cbgt.ConsistencyLevel, consistencySeq uint64, cancelCh <-chan bool) error {
return d.dest.ConsistencyWait(partition, partitionUUID, consistencyLevel, consistencySeq, cancelCh)
}

Expand All @@ -332,3 +354,10 @@ func (d *DCPLoggingDest) Query(pindex *cbgt.PIndex, req []byte, w io.Writer,
func (d *DCPLoggingDest) Stats(w io.Writer) error {
return d.dest.Stats(w)
}

func (d *DCPLoggingDest) ForceCheckpointWrite() {
d.dest.ForceCheckpointWrite()
}

var _ SGDest = &DCPDest{}
var _ SGDest = &DCPLoggingDest{}
44 changes: 36 additions & 8 deletions base/dcp_feed_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func init() {
// SGFeedSourceParams is a wrapper for cbgt's parameters.
type SGFeedSourceParams struct {
cbgt.DCPFeedParams
cbgt.StopAfterSourceParams

// Used to pass the SG database name to SGFeed* shims
DbName string `json:"sg_dbname,omitempty"`
Expand All @@ -165,20 +166,27 @@ type SGFeedIndexParams struct {
DestKey string `json:"destKey,omitempty"`
}

// cbgtFeedParams returns marshalled cbgt.DCPFeedParams as string, to be passed as feedparams during cbgt.Manager init.
// Used to pass basic auth credentials and xattr flag to cbgt.
func cbgtFeedParams(ctx context.Context, collections CollectionNames, dbName string) (string, error) {
// cbgtFeedParams returns marshalled cbgt.DCPFeedParams as string. This contains information to for a given information , to be passed as feedparams during cbgt.Manager init.
func cbgtFeedParams(ctx context.Context, opts ShardedDCPOptions) (string, error) {
Comment thread
torcolvin marked this conversation as resolved.
feedParams := &SGFeedSourceParams{
DbName: dbName,
DbName: opts.DBName,
DCPFeedParams: cbgt.DCPFeedParams{
AutoReconnectAfterRollback: true,
IncludeXAttrs: true,
},
}
if len(collections) > 1 {
return "", RedactErrorf("cbgtFeedParams: multiple scopes not supported, got %v", MD(collections))
} else if len(collections) > 0 {
for s, c := range collections {
if opts.EndSeqNos != nil {
feedParams.StopAfterSourceParams.StopAfter = "markReached"
seqMap := make(map[string]cbgt.UUIDSeq, len(opts.EndSeqNos))
for vbNo, seqNo := range opts.EndSeqNos {
seqMap[cbgtVbNoToPartition(vbNo)] = cbgt.UUIDSeq{Seq: seqNo}
}
feedParams.StopAfterSourceParams.MarkPartitionSeqs = seqMap
}
if len(opts.Collections) > 1 {
return "", RedactErrorf("cbgtFeedParams: multiple scopes not supported, got %v", MD(opts.Collections))
} else if len(opts.Collections) > 0 {
for s, c := range opts.Collections {
feedParams.Scope = s
feedParams.Collections = c
}
Expand Down Expand Up @@ -301,3 +309,23 @@ func getCbgtCredentials(dbName string) (cbgtCreds, bool) {
cbgtGlobalsLock.Unlock() // cbgtCreds is not a pointer type, safe to unlock
return creds, found
}

// GetHighSeqNos retrieves the maximum sequence numbers for each vbucket.
func GetHighSeqNos(ctx context.Context, bucket Bucket) (map[uint16]uint64, error) {
b, err := AsGocbV2Bucket(bucket)
if err != nil {
return nil, fmt.Errorf("GetHighSeqNos: bucket is not a GocbV2Bucket: %w", err)
}
numVbuckets, err := bucket.GetMaxVbno(ctx)
if err != nil {
return nil, err
}

// Note: extending to be per collection requires an enhancement in gocbcore to support 0x48 memcached command
// for a non dcp agent, see gocbcore.DCPAgent.GetVBucketSeqNos
_, highSeqNos, err := b.GetStatsVbSeqno(numVbuckets, true)
if err != nil {
return nil, fmt.Errorf("unable to obtain high seqnos: %w", err)
}
return highSeqNos, nil
}
63 changes: 43 additions & 20 deletions base/dcp_sharded.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const (
CBGTIndexTypeSyncGatewayResync = "syncGateway-resync"
)

// CbgtUnregisterFeedCallback is the function invoked by cbgt when a DCP feed shuts down.
type CbgtUnregisterFeedCallback func(cbgt.Feed)

type ShardedDCPFeedType string

const (
Expand Down Expand Up @@ -73,19 +76,21 @@ type CbgtContext struct {

// ShardedDCPOptions contains options for starting a DCP feed for cbgt.
type ShardedDCPOptions struct {
Bucket Bucket // bucket to target
Cfg cbgt.Cfg // cbgt cfg used to coordinate cbgt documents
Collections CollectionNames // collection names to target
DBName string // database name is used to look up credentials
DestKey string // key used in indexParams for this feed, used to look up the feed destination in cbgtDestFactories
Heartbeater Heartbeater // heartbeater to use to find nodes
IndexName string // cbgt.Manger.IndexName, used to uniquely identify the index
IndexType string // cbgt.Manager.IndexType, matches name used by cbgt.RegisterPIndexImplType
NumPartitions uint16 // number of cbgt partitions to use, if 0 will default to DefaultImportPartitions
PreviousIndexName string // previous index name. If specified, marks IndexName as as a replacement for this name.
UUID string // database uuid, used to uniquely identify nodes
Datastore DataStore // datastore to perform KV ops
FeedType ShardedDCPFeedType // type of sharded DCP feed to start
Bucket Bucket // bucket to target
Cfg cbgt.Cfg // cbgt cfg used to coordinate cbgt documents
Collections CollectionNames // collection names to target
DBName string // database name is used to look up credentials
DestKey string // key used in indexParams for this feed, used to look up the feed destination in cbgtDestFactories
Heartbeater Heartbeater // heartbeater to use to find nodes
IndexName string // cbgt.Manger.IndexName, used to uniquely identify the index
IndexType string // cbgt.Manager.IndexType, matches name used by cbgt.RegisterPIndexImplType
NumPartitions uint16 // number of cbgt partitions to use, if 0 will default to DefaultImportPartitions
PreviousIndexName string // previous index name. If specified, marks IndexName as as a replacement for this name.
UUID string // database uuid, used to uniquely identify nodes
Datastore DataStore // datastore to perform KV ops
FeedType ShardedDCPFeedType // type of sharded DCP feed to start
EndSeqNos map[uint16]uint64 // optional parameter indicating the end sequences to be used for running a one shot feed.
UnregisterFeedCallback CbgtUnregisterFeedCallback // optional callback for cbgt.ManagerEventHandlers.OnUnregisterFeed
}

// Validate makes sure that all options are specified.
Expand Down Expand Up @@ -166,7 +171,7 @@ func StartShardedDCPFeed(ctx context.Context, opts ShardedDCPOptions) (*CbgtCont
return nil, fmt.Errorf("error asserting bucket as gocb v2 bucket: %w", err)
}

cbgtContext, err := initCBGTManager(ctx, opts.Bucket, b.GetSpec(), opts.Cfg, opts.UUID, opts.DBName)
cbgtContext, err := initCBGTManager(ctx, opts.Bucket, b.GetSpec(), opts.Cfg, opts.UUID, opts.DBName, opts.UnregisterFeedCallback)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -221,7 +226,7 @@ func GenerateLegacyImportIndexName(dbName string) string {
func createCBGTIndex(ctx context.Context, c *CbgtContext, opts ShardedDCPOptions) error {
sourceType := SOURCE_DCP_SG

sourceParams, err := cbgtFeedParams(ctx, opts.Collections, opts.DBName)
sourceParams, err := cbgtFeedParams(ctx, opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -322,7 +327,7 @@ func getCBGTIndexUUID(manager *cbgt.Manager, indexName string) (previousUUID str
// createCBGTManager creates a new manager for a given bucket and bucketSpec
// Inline comments below provide additional detail on how cbgt uses each manager
// parameter, and the implications for SG
func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG cbgt.Cfg, dbUUID string, dbName string) (*CbgtContext, error) {
func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG cbgt.Cfg, dbUUID string, dbName string, unregisterCallback CbgtUnregisterFeedCallback) (*CbgtContext, error) {
// uuid: Unique identifier for the node. Used to identify the node in the config.
Comment thread
torcolvin marked this conversation as resolved.
// Without UUID persistence across SG restarts, a restarted SG node relies on heartbeater to remove
// the previous version of that node from the cfg, and assign pindexes to the new one.
Expand Down Expand Up @@ -376,7 +381,11 @@ func initCBGTManager(ctx context.Context, bucket Bucket, spec BucketSpec, cfgSG
dataDir := ""

eventHandlersCtx, eventHandlersCancel := context.WithCancelCause(ctx)
eventHandlers := &sgMgrEventHandlers{ctx: eventHandlersCtx, ctxCancel: eventHandlersCancel}
eventHandlers := &sgMgrEventHandlers{
ctx: eventHandlersCtx,
ctxCancel: eventHandlersCancel,
unregisterFeedCallback: unregisterCallback,
}

// Specify one feed per pindex
options := make(map[string]string)
Expand Down Expand Up @@ -917,9 +926,10 @@ func GetDefaultImportPartitions(serverless bool) uint16 {
}

type sgMgrEventHandlers struct {
ctx context.Context
ctxCancel context.CancelCauseFunc
manager *cbgt.Manager
ctx context.Context
ctxCancel context.CancelCauseFunc
manager *cbgt.Manager
unregisterFeedCallback func(cbgt.Feed) // callback function for when a feed is unregistered. This occurs when a feed completes for any reason: Stop is requested, a rebalance of partitions, the feed naturally ends when reaching expected seequence numbers.
}
Comment thread
torcolvin marked this conversation as resolved.

func (meh *sgMgrEventHandlers) OnRefreshManagerOptions(options map[string]string) {
Expand All @@ -934,6 +944,14 @@ func (meh *sgMgrEventHandlers) OnUnregisterPIndex(pindex *cbgt.PIndex) {
// No-op for SG
}

// OnUnregisterFeed is called to indicate that a DCP feed has stopped. This will be called for normal and abnormal
// terminations.
func (meh *sgMgrEventHandlers) OnUnregisterFeed(feed cbgt.Feed) {
if meh.unregisterFeedCallback != nil {
meh.unregisterFeedCallback(feed)
}
}

// OnFeedError is required to trigger reconnection to a feed on a closed connection (EOF).
// NotifyMgrOnClose will trigger cbgt closing and then attempt to reconnect to the feed, if the manager hasn't
// been stopped.
Expand Down Expand Up @@ -969,3 +987,8 @@ func (meh *sgMgrEventHandlers) OnFeedError(_ string, r cbgt.Feed, feedErr error)
dcpFeed.NotifyMgrOnClose()
}
}

// cbgtVbNoToPartition converts a vbucket number to a string partition name for cbgt.
func cbgtVbNoToPartition(vbNo uint16) string {
return fmt.Sprintf("%d", vbNo)
}
13 changes: 6 additions & 7 deletions base/dcp_sharded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1575,13 +1575,12 @@ func TestShardedDCPCheckpointCleanup(t *testing.T) {
checkpointPrefix := "test_shared_dcp_checkpoint"
dest, err := NewDCPDest(
ctx,
nil,
metadataStore,
vBuckets,
true,
nil,
nil,
checkpointPrefix,
DCPDestOptions{
MetadataStore: metadataStore,
MaxVbNo: vBuckets,
PersistCheckpoints: true,
CheckpointPrefix: checkpointPrefix,
},
)
require.NoError(t, err)
dcpDest, ok := dest.(*DCPDest)
Expand Down
Loading
Loading