Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
536 changes: 503 additions & 33 deletions base/bootstrap.go

Large diffs are not rendered by default.

301 changes: 298 additions & 3 deletions base/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@
package base

import (
"encoding/json"
"errors"
"sort"
"strings"
"sync"
"testing"
"time"

"dario.cat/mergo"
"github.com/couchbase/gocb/v2"
"github.com/couchbaselabs/rosmar"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -51,7 +54,7 @@ func TestBootstrapRefCounting(t *testing.T) {

var perBucketCredentialsConfig map[string]*CredentialsConfig
forcePerBucketAuth := false
cluster, err := NewCouchbaseCluster(ctx, TestClusterSpec(t), forcePerBucketAuth, perBucketCredentialsConfig, TestUseXattrs(), CachedClusterConnections)
cluster, err := NewCouchbaseCluster(ctx, TestClusterSpec(t), forcePerBucketAuth, perBucketCredentialsConfig, TestUseXattrs(), false, CachedClusterConnections)
require.NoError(t, err)
defer cluster.Close()
require.NotNil(t, cluster)
Expand Down Expand Up @@ -138,12 +141,12 @@ func newTestBootstrapConnection(t *testing.T) BootstrapConnection {
t.Helper()
ctx := TestCtx(t)
if UnitTestUrlIsWalrus() {
cluster, err := NewRosmarCluster(rosmar.InMemoryURL)
cluster, err := NewRosmarCluster(rosmar.InMemoryURL, false)
require.NoError(t, err)
t.Cleanup(cluster.Close)
return cluster
}
cluster, err := NewCouchbaseCluster(ctx, TestClusterSpec(t), false, nil, TestUseXattrs(), CachedClusterConnections)
cluster, err := NewCouchbaseCluster(ctx, TestClusterSpec(t), false, nil, TestUseXattrs(), false, CachedClusterConnections)
require.NoError(t, err)
t.Cleanup(cluster.Close)
return cluster
Expand Down Expand Up @@ -244,3 +247,295 @@ func TestTouchMetadataDocument(t *testing.T) {
require.Error(t, err)
require.True(t, IsCasMismatch(err), "expected CasMismatch on stale CAS retry, got %T: %v", err, err)
}

// bootstrapTestCfg is the value shape used by the dual-collection bootstrap tests.
type bootstrapTestCfg struct {
Foo string `json:"foo"`
}

// seedLegacyBootstrapDoc writes a bootstrap-config-shaped document directly into the bucket's
// _default._default collection, mimicking pre-migration state. The on-disk shape must match what
// configPersistence.loadConfig expects, which is controlled by NewCouchbaseCluster's
// useXattrConfig — a distinct setting from SG's general use_xattrs for application metadata.
// Callers must pass the same useXattrs value used to construct the cluster under test.
func seedLegacyBootstrapDoc(t *testing.T, bucket *gocb.Bucket, docID string, value bootstrapTestCfg, useXattrs bool) {
t.Helper()
if useXattrs {
_, err := bucket.DefaultCollection().MutateIn(docID, []gocb.MutateInSpec{
gocb.UpsertSpec(cfgXattrConfigPath, value, UpsertSpecXattr),
gocb.ReplaceSpec("", json.RawMessage(cfgXattrBody), nil),
}, &gocb.MutateInOptions{StoreSemantic: gocb.StoreSemanticsInsert})
require.NoError(t, err)
return
}
_, err := bucket.DefaultCollection().Insert(docID, value, nil)
require.NoError(t, err)
}

// bootstrapDualTestFixture pairs a BootstrapConnection in dual-collection mode with helpers that
// reach past the cluster API to inspect or seed each underlying collection. Implementations exist
// for both Rosmar and CouchbaseCluster; newBootstrapDualTestFixture selects based on the test's
// backing store.
type bootstrapDualTestFixture struct {
Cluster BootstrapConnection
BucketName string
// SeedLegacy writes a config-shaped doc directly into the fallback (_default._default)
// collection, bypassing the cluster API.
SeedLegacy func(t *testing.T, docID string, value bootstrapTestCfg)
// PrimaryExists reports whether docID currently exists in the primary (_system._mobile).
PrimaryExists func(t *testing.T, docID string) bool
// FallbackExists reports whether docID currently exists in _default._default.
FallbackExists func(t *testing.T, docID string) bool
// CleanupDoc removes docID from both collections. Idempotent.
CleanupDoc func(t *testing.T, docID string)
}

// newBootstrapDualTestFixture constructs a fixture for dual-collection bootstrap tests. useXattrs
// selects the bootstrap-config persistence mode on Couchbase Server (NewCouchbaseCluster's
// useXattrConfig); on Rosmar it's ignored since Rosmar's bootstrap path only supports
// document-mode persistence — callers that want to exercise useXattrs=true should skip on
// UnitTestUrlIsWalrus().
func newBootstrapDualTestFixture(t *testing.T, useXattrs bool) bootstrapDualTestFixture {
t.Helper()
if UnitTestUrlIsWalrus() {
return newRosmarBootstrapDualFixture(t)
}
return newCouchbaseBootstrapDualFixture(t, useXattrs)
}

// newRosmarBootstrapDualFixture builds a fixture over a test-pool Rosmar bucket. The cluster and
// the seed/inspect helpers all point at the same in-memory bucket via rosmar's process-global
// bucket registry.
func newRosmarBootstrapDualFixture(t *testing.T) bootstrapDualTestFixture {
t.Helper()
ctx := TestCtx(t)
tb := GetTestBucket(t)
t.Cleanup(func() { tb.Close(ctx) })
bucketName := tb.GetName()

cluster, err := NewRosmarCluster(UnitTestUrl(), true)
require.NoError(t, err)
t.Cleanup(cluster.Close)

defaultDS, err := tb.Bucket.NamedDataStore(ctx, DefaultScopeAndCollectionName())
require.NoError(t, err)
systemDS, err := tb.Bucket.NamedDataStore(ctx, MobileSystemScopeAndCollectionName())
require.NoError(t, err)

return bootstrapDualTestFixture{
Cluster: cluster,
BucketName: bucketName,
SeedLegacy: func(t *testing.T, docID string, value bootstrapTestCfg) {
t.Helper()
_, err := defaultDS.WriteCas(TestCtx(t), docID, 0, 0, value, 0)
require.NoError(t, err)
},
PrimaryExists: func(t *testing.T, docID string) bool {
t.Helper()
ok, err := systemDS.Exists(TestCtx(t), docID)
require.NoError(t, err)
return ok
},
FallbackExists: func(t *testing.T, docID string) bool {
t.Helper()
ok, err := defaultDS.Exists(TestCtx(t), docID)
require.NoError(t, err)
return ok
},
CleanupDoc: func(t *testing.T, docID string) {
t.Helper()
_, _ = systemDS.Remove(TestCtx(t), docID, 0)
_, _ = defaultDS.Remove(TestCtx(t), docID, 0)
},
}
}

// newCouchbaseBootstrapDualFixture builds a fixture over the first available test-pool bucket on a
// real Couchbase Server cluster running in dual-collection mode. useXattrs selects the
// bootstrap-config persistence mode and must match the seed format used by SeedLegacy.
func newCouchbaseBootstrapDualFixture(t *testing.T, useXattrs bool) bootstrapDualTestFixture {
t.Helper()
ctx := TestCtx(t)
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.Equal(c, int32(GTestBucketPool.numBuckets), GTestBucketPool.stats.TotalBucketInitCount.Load())
}, 2*time.Minute, 5*time.Millisecond)

cluster, err := NewCouchbaseCluster(ctx, TestClusterSpec(t), false, nil, useXattrs, true, CachedClusterConnections)
require.NoError(t, err)
t.Cleanup(cluster.Close)

buckets, err := cluster.GetConfigBuckets(ctx)
require.NoError(t, err)
require.NotEmpty(t, buckets)
bucketName := buckets[0]

bucket, teardown, err := cluster.getBucket(ctx, bucketName)
require.NoError(t, err)
t.Cleanup(teardown)

primaryCol := bucket.Scope(SystemScope).Collection(SystemCollectionMobile)
fallbackCol := bucket.DefaultCollection()
colExists := func(c *gocb.Collection, docID string) bool {
t.Helper()
_, err := c.Get(docID, nil)
if errors.Is(err, gocb.ErrDocumentNotFound) {
return false
}
require.NoError(t, err)
return true
}

return bootstrapDualTestFixture{
Cluster: cluster,
BucketName: bucketName,
SeedLegacy: func(t *testing.T, docID string, value bootstrapTestCfg) {
t.Helper()
seedLegacyBootstrapDoc(t, bucket, docID, value, useXattrs)
},
PrimaryExists: func(t *testing.T, docID string) bool { return colExists(primaryCol, docID) },
FallbackExists: func(t *testing.T, docID string) bool { return colExists(fallbackCol, docID) },
CleanupDoc: func(t *testing.T, docID string) {
t.Helper()
_, _ = primaryCol.Remove(docID, nil)
_, _ = fallbackCol.Remove(docID, nil)
},
}
}

// forEachBootstrapXattrMode runs body once with useXattrs=false and once with useXattrs=true,
// each as a t.Run subtest. The useXattrs=true subtest is skipped on Rosmar — its bootstrap
// path doesn't support xattr-mode persistence, so the variant has no meaningful coverage
// there.
func forEachBootstrapXattrMode(t *testing.T, body func(t *testing.T, useXattrs bool)) {
t.Helper()
for _, useXattrs := range []bool{false, true} {
name := "bootstrap_xattr=false"
if useXattrs {
name = "bootstrap_xattr=true"
}
t.Run(name, func(t *testing.T) {
if useXattrs && UnitTestUrlIsWalrus() {
t.Skip("Rosmar bootstrap does not support xattr-mode persistence")
}
body(t, useXattrs)
})
}
}

// TestBootstrapInsertMetadataDocumentWritesPrimary verifies that with no pre-existing legacy copy,
// InsertMetadataDocument lands in the primary (_system._mobile) collection and not the fallback.
func TestBootstrapInsertMetadataDocumentWritesPrimary(t *testing.T) {
forEachBootstrapXattrMode(t, func(t *testing.T, useXattrs bool) {
f := newBootstrapDualTestFixture(t, useXattrs)
ctx := TestCtx(t)
docID := SyncDocPrefix + "metadata-insert-primary"
t.Cleanup(func() { f.CleanupDoc(t, docID) })

_, err := f.Cluster.InsertMetadataDocument(ctx, f.BucketName, docID, bootstrapTestCfg{Foo: "primary"})
require.NoError(t, err)

require.True(t, f.PrimaryExists(t, docID))
require.False(t, f.FallbackExists(t, docID))

var loaded bootstrapTestCfg
_, err = f.Cluster.GetMetadataDocument(ctx, f.BucketName, docID, &loaded)
require.NoError(t, err)
require.Equal(t, "primary", loaded.Foo)
})
}

// TestBootstrapWriteMetadataDocumentFallbackCAS verifies WriteMetadataDocument replays against the
// fallback collection when the supplied CAS came from a fallback read, so callers don't see a
// spurious not-found just because the doc hasn't migrated to _system._mobile yet.
func TestBootstrapWriteMetadataDocumentFallbackCAS(t *testing.T) {
forEachBootstrapXattrMode(t, func(t *testing.T, useXattrs bool) {
f := newBootstrapDualTestFixture(t, useXattrs)
ctx := TestCtx(t)
docID := SyncDocPrefix + "metadata-write-fallback-cas"
t.Cleanup(func() { f.CleanupDoc(t, docID) })
f.SeedLegacy(t, docID, bootstrapTestCfg{Foo: "legacy"})

var initial bootstrapTestCfg
cas, err := f.Cluster.GetMetadataDocument(ctx, f.BucketName, docID, &initial)
require.NoError(t, err)
require.Equal(t, "legacy", initial.Foo)
require.NotZero(t, cas)

newCAS, err := f.Cluster.WriteMetadataDocument(ctx, f.BucketName, docID, cas, bootstrapTestCfg{Foo: "updated"})
require.NoError(t, err)
require.NotEqual(t, cas, newCAS)

var reloaded bootstrapTestCfg
_, err = f.Cluster.GetMetadataDocument(ctx, f.BucketName, docID, &reloaded)
require.NoError(t, err)
require.Equal(t, "updated", reloaded.Foo)

require.False(t, f.PrimaryExists(t, docID), "write must not have migrated the doc to primary")
require.True(t, f.FallbackExists(t, docID))
})
}

// TestBootstrapInsertMetadataDocumentFallbackDuplicate verifies InsertMetadataDocument returns
// ErrAlreadyExists when the doc already lives in the fallback collection - never silently creating
// a divergent primary copy.
func TestBootstrapInsertMetadataDocumentFallbackDuplicate(t *testing.T) {
forEachBootstrapXattrMode(t, func(t *testing.T, useXattrs bool) {
f := newBootstrapDualTestFixture(t, useXattrs)
ctx := TestCtx(t)
docID := SyncDocPrefix + "metadata-insert-fallback-dup"
t.Cleanup(func() { f.CleanupDoc(t, docID) })
f.SeedLegacy(t, docID, bootstrapTestCfg{Foo: "legacy"})

_, err := f.Cluster.InsertMetadataDocument(ctx, f.BucketName, docID, bootstrapTestCfg{Foo: "primary"})
require.ErrorIs(t, err, ErrAlreadyExists)
require.False(t, f.PrimaryExists(t, docID), "InsertMetadataDocument must not have created a primary copy")
})
}

// TestBootstrapTouchMetadataDocumentFallback verifies TouchMetadataDocument retries against the
// fallback collection when the primary returns ErrNotFound, leaving the doc in place rather than
// migrating it.
func TestBootstrapTouchMetadataDocumentFallback(t *testing.T) {
forEachBootstrapXattrMode(t, func(t *testing.T, useXattrs bool) {
f := newBootstrapDualTestFixture(t, useXattrs)
ctx := TestCtx(t)
docID := SyncDocPrefix + "metadata-touch-fallback"
t.Cleanup(func() { f.CleanupDoc(t, docID) })
f.SeedLegacy(t, docID, bootstrapTestCfg{Foo: "legacy"})

var initial bootstrapTestCfg
cas, err := f.Cluster.GetMetadataDocument(ctx, f.BucketName, docID, &initial)
require.NoError(t, err)
require.NotZero(t, cas)

newCAS, err := f.Cluster.TouchMetadataDocument(ctx, f.BucketName, docID, "version", "v2", cas)
require.NoError(t, err)
require.NotEqual(t, cas, newCAS, "Touch must bump CAS")
require.False(t, f.PrimaryExists(t, docID))
require.True(t, f.FallbackExists(t, docID))
})
}

// TestBootstrapDeleteMetadataDocumentFallback verifies DeleteMetadataDocument retries against the
// fallback collection when the primary returns ErrNotFound, removing the legacy doc.
func TestBootstrapDeleteMetadataDocumentFallback(t *testing.T) {
forEachBootstrapXattrMode(t, func(t *testing.T, useXattrs bool) {
f := newBootstrapDualTestFixture(t, useXattrs)
ctx := TestCtx(t)
docID := SyncDocPrefix + "metadata-delete-fallback"
t.Cleanup(func() { f.CleanupDoc(t, docID) })
f.SeedLegacy(t, docID, bootstrapTestCfg{Foo: "legacy"})

var initial bootstrapTestCfg
cas, err := f.Cluster.GetMetadataDocument(ctx, f.BucketName, docID, &initial)
require.NoError(t, err)
require.NotZero(t, cas)

require.NoError(t, f.Cluster.DeleteMetadataDocument(ctx, f.BucketName, docID, cas))
require.False(t, f.FallbackExists(t, docID))

var afterDelete bootstrapTestCfg
_, err = f.Cluster.GetMetadataDocument(ctx, f.BucketName, docID, &afterDelete)
require.True(t, IsDocNotFoundError(err), "expected not-found after delete, got %T: %v", err, err)
})
}
Loading
Loading