Skip to content

Commit db93c1c

Browse files
authored
Touch CAS fixes (#75)
1 parent 967376b commit db93c1c

4 files changed

Lines changed: 180 additions & 9 deletions

File tree

collection+xattrs.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,81 @@ func (c *Collection) getRawWithXattrs(key string, xattrKeys []string) (sgbucket.
448448
return rawDoc, nil
449449
}
450450

451+
// TouchXattrWithCas sets a single property within a system xattr, bumping the document's CAS.
452+
//
453+
// This mirrors gocb's subdoc UpsertSpec with StoreSemanticsReplace and a CAS guard, which is what Couchbase Server
454+
// uses to touch a metadata document during config rollback (see XattrBootstrapPersistence.touchConfigRollback in sync_gateway).
455+
//
456+
// Returns sgbucket.CasMismatchErr if the supplied cas does not match the current document CAS.
457+
func (c *Collection) TouchXattrWithCas(_ context.Context, key, xattrKey, property, value string, cas CAS) (casOut CAS, err error) {
458+
traceEnter("TouchXattrWithCas", "%q, %q.%q=%q, cas=0x%x", key, xattrKey, property, value, cas)
459+
defer func() { traceExit("TouchXattrWithCas", err, "0x%x", casOut) }()
460+
err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (*event, error) {
461+
var (
462+
bodyVal []byte
463+
existingCas CAS
464+
rawXattrs []byte
465+
revSeqNo uint64
466+
isJSON bool
467+
)
468+
row := txn.QueryRow(`SELECT value, cas, xattrs, revSeqNo, isJSON FROM documents WHERE collection=?1 AND key=?2`, c.id, key)
469+
if err := scan(row, &bodyVal, &existingCas, &rawXattrs, &revSeqNo, &isJSON); err != nil {
470+
return nil, remapKeyError(err, key)
471+
}
472+
if existingCas != cas {
473+
return nil, sgbucket.CasMismatchErr{Expected: cas, Actual: existingCas}
474+
}
475+
476+
xattrMap := semiParsedXattrs{}
477+
if len(rawXattrs) > 0 {
478+
if err := json.Unmarshal(rawXattrs, &xattrMap); err != nil {
479+
return nil, fmt.Errorf("document %q xattrs are unreadable: %w", key, err)
480+
}
481+
}
482+
// JSON-merge xattrKey.<property> = value into the xattr blob.
483+
var xattrBody map[string]json.RawMessage
484+
if existing, ok := xattrMap[xattrKey]; ok && len(existing) > 0 {
485+
if err := json.Unmarshal(existing, &xattrBody); err != nil {
486+
return nil, fmt.Errorf("document %q xattr %q is not a JSON object: %w", key, xattrKey, err)
487+
}
488+
}
489+
if xattrBody == nil {
490+
xattrBody = map[string]json.RawMessage{}
491+
}
492+
encodedValue, err := json.Marshal(value)
493+
if err != nil {
494+
return nil, err
495+
}
496+
xattrBody[property] = encodedValue
497+
encodedXattr, err := json.Marshal(xattrBody)
498+
if err != nil {
499+
return nil, err
500+
}
501+
xattrMap[xattrKey] = encodedXattr
502+
newRawXattrs, err := json.Marshal(xattrMap)
503+
if err != nil {
504+
return nil, err
505+
}
506+
507+
revSeqNo++
508+
if _, err := txn.Exec(
509+
`UPDATE documents SET cas=?1, xattrs=?2, revSeqNo=?3 WHERE collection=?4 AND key=?5`,
510+
newCas, newRawXattrs, revSeqNo, c.id, key); err != nil {
511+
return nil, err
512+
}
513+
casOut = newCas
514+
return &event{
515+
key: key,
516+
value: bodyVal,
517+
cas: newCas,
518+
isJSON: isJSON,
519+
xattrs: newRawXattrs,
520+
revSeqNo: revSeqNo,
521+
}, nil
522+
})
523+
return
524+
}
525+
451526
// DeleteWithXattrs a document's body and xattrs simultaneously.
452527
func (c *Collection) DeleteWithXattrs(ctx context.Context, key string, xattrKeys []string) error {
453528
err := c.withNewCas(func(txn *sql.Tx, newCas CAS) (*event, error) {

collection.go

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,33 @@ func (c *Collection) getRaw(q queryable, key string) (val []byte, cas CAS, revSe
122122

123123
func (c *Collection) GetAndTouchRaw(_ context.Context, key string, exp Exp) (val []byte, cas CAS, err error) {
124124
traceEnter("GetAndTouchRaw", "%q, %d", key, exp)
125-
err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (e *event, err error) {
126-
exp = absoluteExpiry(exp)
125+
newExp := absoluteExpiry(exp)
126+
err = c.bucket.inTransaction(func(txn *sql.Tx) error {
127127
var revSeqNo int64
128-
val, cas, revSeqNo, err = c.getRaw(txn, key)
129-
if err == nil {
130-
revSeqNo++
131-
_, err = txn.Exec(`UPDATE documents SET exp=?1, revSeqNo=?2 WHERE key=?3`, exp, revSeqNo, key)
128+
var existingExp Exp
129+
row := txn.QueryRow(
130+
`SELECT value, cas, revSeqNo, exp FROM documents WHERE collection=? AND key=?`,
131+
c.id, key)
132+
if scanErr := scan(row, &val, &cas, &revSeqNo, &existingExp); scanErr != nil {
133+
return remapKeyError(scanErr, key)
132134
}
133-
return
135+
if val == nil {
136+
return sgbucket.MissingError{Key: key}
137+
}
138+
if existingExp == newExp {
139+
// No-op: CB Server's KV TOUCH/GAT preserves CAS and revSeqNo when the
140+
// expiry value is unchanged.
141+
return nil
142+
}
143+
revSeqNo++
144+
newCas := CAS(hlc.Now())
145+
if _, execErr := txn.Exec(
146+
`UPDATE documents SET exp=?1, revSeqNo=?2, cas=?3 WHERE collection=?4 AND key=?5`,
147+
newExp, revSeqNo, newCas, c.id, key); execErr != nil {
148+
return execErr
149+
}
150+
cas = newCas
151+
return c.setLastCas(txn, newCas)
134152
})
135153
traceExit("GetAndTouchRaw", err, "cas=0x%x, val %s", cas, val)
136154
return

collection_test.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -921,13 +921,31 @@ func TestRevSeqNo(t *testing.T) {
921921
require.NoError(t, col.Set(ctx, docID, 0, nil, []byte(`{"foo": 4`)))
922922
assertRevSeqNo(t, col, docID, `"4"`)
923923

924-
_, _, err := col.GetAndTouchRaw(ctx, docID, 0)
924+
// Doc was last written with exp=0. A Touch/GetAndTouchRaw that doesn't change
925+
// the expiry is a no-op on CB Server: CAS and revSeqNo are preserved.
926+
_, casBeforeNoOpTouch, err := col.GetRaw(ctx, docID)
927+
require.NoError(t, err)
928+
929+
_, casAfterNoOpGAT, err := col.GetAndTouchRaw(ctx, docID, 0)
930+
require.NoError(t, err)
931+
assertRevSeqNo(t, col, docID, `"4"`)
932+
require.Equal(t, casBeforeNoOpTouch, casAfterNoOpGAT, "no-op GetAndTouchRaw should not bump CAS")
933+
934+
casAfterNoOpTouch, err := col.Touch(ctx, docID, 0)
935+
require.NoError(t, err)
936+
assertRevSeqNo(t, col, docID, `"4"`)
937+
require.Equal(t, casBeforeNoOpTouch, casAfterNoOpTouch, "no-op Touch should not bump CAS")
938+
939+
// Changing the expiry is a metadata mutation: CB Server bumps both CAS and revSeqNo.
940+
casAfterChangingTouch, err := col.Touch(ctx, docID, 60)
925941
require.NoError(t, err)
926942
assertRevSeqNo(t, col, docID, `"5"`)
943+
require.NotEqual(t, casBeforeNoOpTouch, casAfterChangingTouch, "Touch should bump CAS when exp changes")
927944

928-
_, err = col.Touch(ctx, docID, 0)
945+
_, casAfterChangingGAT, err := col.GetAndTouchRaw(ctx, docID, 120)
929946
require.NoError(t, err)
930947
assertRevSeqNo(t, col, docID, `"6"`)
948+
require.NotEqual(t, casAfterChangingTouch, casAfterChangingGAT, "GetAndTouchRaw should bump CAS when exp changes")
931949

932950
writeWithXattrsDocID := "writeWithXattrs"
933951
_, err = col.WriteWithXattrs(ctx, writeWithXattrsDocID, 0, 0, []byte(`{"foo": 1}`), nil, nil, nil)

xattrs_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,66 @@ import (
1818
"github.com/stretchr/testify/require"
1919
)
2020

21+
func TestTouchXattrWithCas(t *testing.T) {
22+
ctx := t.Context()
23+
ensureNoLeakedFeeds(t)
24+
coll := makeTestBucket(t).DefaultDataStore(ctx).(*Collection)
25+
26+
const docID = "doc1"
27+
bodyBytes := []byte(`{"hello":"world"}`)
28+
xattrsInput := map[string][]byte{
29+
syncXattrName: []byte(`{"existing":"value"}`),
30+
}
31+
originalCas, err := coll.WriteWithXattrs(ctx, docID, 0, 0, bodyBytes, xattrsInput, nil, nil)
32+
require.NoError(t, err)
33+
34+
// Stale CAS should fail.
35+
_, err = coll.TouchXattrWithCas(ctx, docID, syncXattrName, "name", "db1", originalCas+1)
36+
require.ErrorAs(t, err, &sgbucket.CasMismatchErr{})
37+
38+
// Correct CAS succeeds and bumps CAS.
39+
newCas, err := coll.TouchXattrWithCas(ctx, docID, syncXattrName, "name", "db1", originalCas)
40+
require.NoError(t, err)
41+
require.NotEqual(t, originalCas, newCas)
42+
43+
// Verify the xattr property is now visible and the existing properties were preserved.
44+
gotBody, gotXattrs, getCas, err := coll.GetWithXattrs(ctx, docID, []string{syncXattrName})
45+
require.NoError(t, err)
46+
require.Equal(t, newCas, getCas)
47+
require.Equal(t, bodyBytes, gotBody)
48+
var xattr map[string]string
49+
require.NoError(t, json.Unmarshal(gotXattrs[syncXattrName], &xattr))
50+
require.Equal(t, "db1", xattr["name"])
51+
require.Equal(t, "value", xattr["existing"])
52+
53+
// The old CAS is now stale.
54+
_, err = coll.TouchXattrWithCas(ctx, docID, syncXattrName, "name", "db2", originalCas)
55+
require.ErrorAs(t, err, &sgbucket.CasMismatchErr{})
56+
}
57+
58+
// TestTouchXattrWithCasCreatesXattr verifies that TouchXattrWithCas can populate an xattr on a doc
59+
// that does not yet have one.
60+
func TestTouchXattrWithCasCreatesXattr(t *testing.T) {
61+
ctx := t.Context()
62+
ensureNoLeakedFeeds(t)
63+
coll := makeTestBucket(t).DefaultDataStore(ctx).(*Collection)
64+
65+
const docID = "doc1"
66+
require.NoError(t, coll.SetRaw(ctx, docID, 0, nil, []byte(`{"a":1}`)))
67+
_, cas, err := coll.GetRaw(ctx, docID)
68+
require.NoError(t, err)
69+
70+
newCas, err := coll.TouchXattrWithCas(ctx, docID, syncXattrName, "name", "db1", cas)
71+
require.NoError(t, err)
72+
require.NotEqual(t, cas, newCas)
73+
74+
_, gotXattrs, _, err := coll.GetWithXattrs(ctx, docID, []string{syncXattrName})
75+
require.NoError(t, err)
76+
var xattr map[string]string
77+
require.NoError(t, json.Unmarshal(gotXattrs[syncXattrName], &xattr))
78+
require.Equal(t, "db1", xattr["name"])
79+
}
80+
2181
func TestSetXattrs(t *testing.T) {
2282
ctx := t.Context()
2383
ensureNoLeakedFeeds(t)

0 commit comments

Comments
 (0)