Skip to content

Commit 3c7897c

Browse files
bbrksclaude
andcommitted
CBG-5223: include both MetadataStore datastores in caching feed
Subscribe the caching DCP feed to both the primary (_system._mobile) and fallback (_default._default) datastores when the metadata store is a *base.MetadataStore, so _sync:* mutations are observed regardless of which collection currently holds the doc during metadata migration. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 3df9aee commit 3c7897c

2 files changed

Lines changed: 110 additions & 7 deletions

File tree

db/change_listener.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,35 @@ func (listener *changeListener) OnDocChanged(event sgbucket.FeedEvent, docType D
8686
listener.OnChangeCallback(event, docType)
8787
}
8888

89+
// cachingFeedCollections returns the set of (scope, collection) pairs that the caching DCP
90+
// feed must subscribe to. When metadataStore is a *base.MetadataStore (active during the
91+
// metadata migration window), both its primary (_system._mobile) and fallback
92+
// (_default._default) datastores are included so that _sync:* mutations are observed
93+
// regardless of which collection currently holds the doc.
94+
func cachingFeedCollections(metadataStore base.DataStore, scopes map[string]Scope) base.CollectionNameSet {
95+
collectionNames := base.NewCollectionNameSet()
96+
if ms, ok := metadataStore.(*base.MetadataStore); ok {
97+
collectionNames.Add(ms.Primary())
98+
collectionNames.Add(ms.Fallback())
99+
} else {
100+
collectionNames.Add(metadataStore)
101+
}
102+
for scopeName, collections := range scopes {
103+
for collectionName := range collections.Collections {
104+
collectionNames.Add(sgbucket.DataStoreNameImpl{Scope: scopeName, Collection: collectionName})
105+
}
106+
}
107+
return collectionNames
108+
}
109+
89110
// Starts a changeListener on a given Bucket.
90111
func (listener *changeListener) Start(ctx context.Context, bucket base.Bucket, dbStats *expvar.Map, scopes map[string]Scope, metadataStore base.DataStore) error {
91112

92113
listener.terminator = make(chan bool)
93114
listener.bucket = bucket
94115
listener.bucketName = bucket.GetName()
95116

96-
collectionNames := base.NewCollectionNameSet()
97-
collectionNames.Add(metadataStore)
98-
for scopeName, collections := range scopes {
99-
for collectionName := range collections.Collections {
100-
collectionNames.Add(sgbucket.DataStoreNameImpl{Scope: scopeName, Collection: collectionName})
101-
}
102-
}
117+
collectionNames := cachingFeedCollections(metadataStore, scopes)
103118
listener.StartNotifierBroadcaster(ctx) // start broadcast changes goroutine
104119

105120
opts := base.DCPClientOptions{
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright 2026-Present Couchbase, Inc.
2+
//
3+
// Use of this software is governed by the Business Source License included
4+
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
5+
// in that file, in accordance with the Business Source License, use of this
6+
// software will be governed by the Apache License, Version 2.0, included in
7+
// the file licenses/APL2.txt.
8+
9+
package db
10+
11+
import (
12+
"testing"
13+
14+
"github.com/couchbase/sync_gateway/base"
15+
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
17+
)
18+
19+
// TestCachingFeedCollections_DualMetadataStore verifies that when the metadata store is a
20+
// *base.MetadataStore, the caching feed subscribes to both the primary (_system._mobile)
21+
// and fallback (_default._default) datastores so that _sync:* mutations in either
22+
// collection are observed during the metadata migration window.
23+
func TestCachingFeedCollections_DualMetadataStore(t *testing.T) {
24+
ctx := base.TestCtx(t)
25+
bucket := base.GetTestBucket(t)
26+
defer bucket.Close(ctx)
27+
28+
primary := bucket.GetMobileSystemDataStore() // skips if backing store does not support system collections
29+
fallback := bucket.DefaultDataStore()
30+
ms := base.NewMetadataStore(primary, fallback)
31+
32+
got := cachingFeedCollections(ms, nil)
33+
34+
require.Contains(t, got, base.SystemScope, "expected %s scope to be present", base.SystemScope)
35+
assert.Contains(t, got[base.SystemScope], base.SystemCollectionMobile)
36+
37+
require.Contains(t, got, base.DefaultScope, "expected %s scope to be present", base.DefaultScope)
38+
assert.Contains(t, got[base.DefaultScope], base.DefaultCollection)
39+
}
40+
41+
// TestCachingFeedCollections_SingleMetadataStore verifies that when the metadata store is a
42+
// plain DataStore (not the dual wrapper), the caching feed subscribes only to that store's
43+
// collection — preserving the pre-CBG-5223 behaviour.
44+
func TestCachingFeedCollections_SingleMetadataStore(t *testing.T) {
45+
ctx := base.TestCtx(t)
46+
bucket := base.GetTestBucket(t)
47+
defer bucket.Close(ctx)
48+
49+
metadataStore := bucket.DefaultDataStore()
50+
51+
got := cachingFeedCollections(metadataStore, nil)
52+
53+
require.Contains(t, got, base.DefaultScope)
54+
assert.Contains(t, got[base.DefaultScope], base.DefaultCollection)
55+
// Should not have added any other scope (notably _system).
56+
assert.Len(t, got, 1, "expected exactly one scope in single-metadata-store case, got %v", got)
57+
}
58+
59+
// TestCachingFeedCollections_UserScopesIncluded verifies that user-data scopes/collections
60+
// are appended to the set regardless of whether the metadata store is a dual wrapper.
61+
func TestCachingFeedCollections_UserScopesIncluded(t *testing.T) {
62+
ctx := base.TestCtx(t)
63+
bucket := base.GetTestBucket(t)
64+
defer bucket.Close(ctx)
65+
66+
primary := bucket.GetMobileSystemDataStore()
67+
fallback := bucket.DefaultDataStore()
68+
ms := base.NewMetadataStore(primary, fallback)
69+
70+
scopes := map[string]Scope{
71+
"myScope": {
72+
Collections: map[string]*DatabaseCollection{
73+
"collA": nil,
74+
"collB": nil,
75+
},
76+
},
77+
}
78+
79+
got := cachingFeedCollections(ms, scopes)
80+
81+
require.Contains(t, got, base.SystemScope)
82+
assert.Contains(t, got[base.SystemScope], base.SystemCollectionMobile)
83+
require.Contains(t, got, base.DefaultScope)
84+
assert.Contains(t, got[base.DefaultScope], base.DefaultCollection)
85+
require.Contains(t, got, "myScope")
86+
assert.Contains(t, got["myScope"], "collA")
87+
assert.Contains(t, got["myScope"], "collB")
88+
}

0 commit comments

Comments
 (0)