Skip to content

Commit eef7ba1

Browse files
committed
Add TouchXattrWithCas
1 parent 1eeea8a commit eef7ba1

2 files changed

Lines changed: 135 additions & 0 deletions

File tree

collection+xattrs.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,81 @@ func (c *Collection) getRawWithXattrs(key string, xattrKeys []string) (sgbucket.
448448
return rawDoc, nil
449449
}
450450

451+
// TouchXattrWithCas sets a single property within a system xattr, bumping the document's CAS.
452+
//
453+
// This mirrors gocb's subdoc UpsertSpec with StoreSemanticsReplace and a CAS guard, which is what Couchbase Server
454+
// uses to touch a metadata document during config rollback (see XattrBootstrapPersistence.touchConfigRollback in sync_gateway).
455+
//
456+
// Returns sgbucket.CasMismatchErr if the supplied cas does not match the current document CAS.
457+
func (c *Collection) TouchXattrWithCas(_ context.Context, key, xattrKey, property, value string, cas CAS) (casOut CAS, err error) {
458+
traceEnter("TouchXattrWithCas", "%q, %q.%q=%q, cas=0x%x", key, xattrKey, property, value, cas)
459+
defer func() { traceExit("TouchXattrWithCas", err, "0x%x", casOut) }()
460+
err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (*event, error) {
461+
var (
462+
bodyVal []byte
463+
existingCas CAS
464+
rawXattrs []byte
465+
revSeqNo uint64
466+
isJSON bool
467+
)
468+
row := txn.QueryRow(`SELECT value, cas, xattrs, revSeqNo, isJSON FROM documents WHERE collection=?1 AND key=?2`, c.id, key)
469+
if err := scan(row, &bodyVal, &existingCas, &rawXattrs, &revSeqNo, &isJSON); err != nil {
470+
return nil, remapKeyError(err, key)
471+
}
472+
if existingCas != cas {
473+
return nil, sgbucket.CasMismatchErr{Expected: cas, Actual: existingCas}
474+
}
475+
476+
xattrMap := semiParsedXattrs{}
477+
if len(rawXattrs) > 0 {
478+
if err := json.Unmarshal(rawXattrs, &xattrMap); err != nil {
479+
return nil, fmt.Errorf("document %q xattrs are unreadable: %w", key, err)
480+
}
481+
}
482+
// JSON-merge xattrKey.<property> = value into the xattr blob.
483+
var xattrBody map[string]json.RawMessage
484+
if existing, ok := xattrMap[xattrKey]; ok && len(existing) > 0 {
485+
if err := json.Unmarshal(existing, &xattrBody); err != nil {
486+
return nil, fmt.Errorf("document %q xattr %q is not a JSON object: %w", key, xattrKey, err)
487+
}
488+
}
489+
if xattrBody == nil {
490+
xattrBody = map[string]json.RawMessage{}
491+
}
492+
encodedValue, err := json.Marshal(value)
493+
if err != nil {
494+
return nil, err
495+
}
496+
xattrBody[property] = encodedValue
497+
encodedXattr, err := json.Marshal(xattrBody)
498+
if err != nil {
499+
return nil, err
500+
}
501+
xattrMap[xattrKey] = encodedXattr
502+
newRawXattrs, err := json.Marshal(xattrMap)
503+
if err != nil {
504+
return nil, err
505+
}
506+
507+
revSeqNo++
508+
if _, err := txn.Exec(
509+
`UPDATE documents SET cas=?1, xattrs=?2, revSeqNo=?3 WHERE collection=?4 AND key=?5`,
510+
newCas, newRawXattrs, revSeqNo, c.id, key); err != nil {
511+
return nil, err
512+
}
513+
casOut = newCas
514+
return &event{
515+
key: key,
516+
value: bodyVal,
517+
cas: newCas,
518+
isJSON: isJSON,
519+
xattrs: newRawXattrs,
520+
revSeqNo: revSeqNo,
521+
}, nil
522+
})
523+
return
524+
}
525+
451526
// DeleteWithXattrs a document's body and xattrs simultaneously.
452527
func (c *Collection) DeleteWithXattrs(ctx context.Context, key string, xattrKeys []string) error {
453528
err := c.withNewCas(func(txn *sql.Tx, newCas CAS) (*event, error) {

xattrs_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,66 @@ import (
1818
"github.com/stretchr/testify/require"
1919
)
2020

21+
func TestTouchXattrWithCas(t *testing.T) {
22+
ctx := t.Context()
23+
ensureNoLeakedFeeds(t)
24+
coll := makeTestBucket(t).DefaultDataStore(ctx).(*Collection)
25+
26+
const docID = "doc1"
27+
bodyBytes := []byte(`{"hello":"world"}`)
28+
xattrsInput := map[string][]byte{
29+
syncXattrName: []byte(`{"existing":"value"}`),
30+
}
31+
originalCas, err := coll.WriteWithXattrs(ctx, docID, 0, 0, bodyBytes, xattrsInput, nil, nil)
32+
require.NoError(t, err)
33+
34+
// Stale CAS should fail.
35+
_, err = coll.TouchXattrWithCas(ctx, docID, syncXattrName, "name", "db1", originalCas+1)
36+
require.ErrorAs(t, err, &sgbucket.CasMismatchErr{})
37+
38+
// Correct CAS succeeds and bumps CAS.
39+
newCas, err := coll.TouchXattrWithCas(ctx, docID, syncXattrName, "name", "db1", originalCas)
40+
require.NoError(t, err)
41+
require.NotEqual(t, originalCas, newCas)
42+
43+
// Verify the xattr property is now visible and the existing properties were preserved.
44+
gotBody, gotXattrs, getCas, err := coll.GetWithXattrs(ctx, docID, []string{syncXattrName})
45+
require.NoError(t, err)
46+
require.Equal(t, newCas, getCas)
47+
require.Equal(t, bodyBytes, gotBody)
48+
var xattr map[string]string
49+
require.NoError(t, json.Unmarshal(gotXattrs[syncXattrName], &xattr))
50+
require.Equal(t, "db1", xattr["name"])
51+
require.Equal(t, "value", xattr["existing"])
52+
53+
// The old CAS is now stale.
54+
_, err = coll.TouchXattrWithCas(ctx, docID, syncXattrName, "name", "db2", originalCas)
55+
require.ErrorAs(t, err, &sgbucket.CasMismatchErr{})
56+
}
57+
58+
// TestTouchXattrWithCasCreatesXattr verifies that TouchXattrWithCas can populate an xattr on a doc
59+
// that does not yet have one.
60+
func TestTouchXattrWithCasCreatesXattr(t *testing.T) {
61+
ctx := t.Context()
62+
ensureNoLeakedFeeds(t)
63+
coll := makeTestBucket(t).DefaultDataStore(ctx).(*Collection)
64+
65+
const docID = "doc1"
66+
require.NoError(t, coll.SetRaw(ctx, docID, 0, nil, []byte(`{"a":1}`)))
67+
_, cas, err := coll.GetRaw(ctx, docID)
68+
require.NoError(t, err)
69+
70+
newCas, err := coll.TouchXattrWithCas(ctx, docID, syncXattrName, "name", "db1", cas)
71+
require.NoError(t, err)
72+
require.NotEqual(t, cas, newCas)
73+
74+
_, gotXattrs, _, err := coll.GetWithXattrs(ctx, docID, []string{syncXattrName})
75+
require.NoError(t, err)
76+
var xattr map[string]string
77+
require.NoError(t, json.Unmarshal(gotXattrs[syncXattrName], &xattr))
78+
require.Equal(t, "db1", xattr["name"])
79+
}
80+
2181
func TestSetXattrs(t *testing.T) {
2282
ctx := t.Context()
2383
ensureNoLeakedFeeds(t)

0 commit comments

Comments
 (0)