Skip to content

Commit da0b8ec

Browse files
support for atomic writes for gcp (#3523)
* support for atomic writes for gcp * docstore add mongo and firestore as well * mongodb to single replica replicaset * fix test name * fix test
1 parent 5350a7e commit da0b8ec

31 files changed

+231
-17
lines changed

docstore/awsdynamodb/dynamo_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ type harness struct {
6060
closer func()
6161
}
6262

63+
func (h *harness) SupportsAtomicWrites() bool {
64+
return true
65+
}
66+
6367
func newHarness(ctx context.Context, t *testing.T) (drivertest.Harness, error) {
6468
t.Helper()
6569

docstore/drivertest/drivertest.go

+9
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"io"
2424
"math"
2525
"reflect"
26+
"strings"
2627
"testing"
2728
"time"
2829

@@ -80,6 +81,9 @@ type Harness interface {
8081
// RevisionsEqual reports whether two revisions are equal.
8182
RevisionsEqual(rev1, rev2 interface{}) bool
8283

84+
// SupportsAtomicWrites should report if a collection supports atomic writes
85+
SupportsAtomicWrites() bool
86+
8387
// Close closes resources used by the harness.
8488
Close()
8589
}
@@ -170,6 +174,8 @@ func RunConformanceTests(t *testing.T, newHarness HarnessMaker, ct CodecTester,
170174
t.Run("BlindCodec", func(t *testing.T) { testBlindDecode(t, ct) })
171175

172176
t.Run("Create", func(t *testing.T) { withRevCollections(t, newHarness, testCreate) })
177+
t.Run("AtomicWrites", func(t *testing.T) { withRevCollections(t, newHarness, testAtomicWrites) })
178+
t.Run("AtomicWritesFail", func(t *testing.T) { withRevCollections(t, newHarness, testAtomicWritesFail) })
173179
t.Run("Put", func(t *testing.T) { withRevCollections(t, newHarness, testPut) })
174180
t.Run("Replace", func(t *testing.T) { withRevCollections(t, newHarness, testReplace) })
175181
t.Run("Get", func(t *testing.T) { withRevCollections(t, newHarness, testGet) })
@@ -234,6 +240,9 @@ func withRevCollections(t *testing.T, newHarness HarnessMaker, f func(*testing.T
234240
t.Fatal(err)
235241
}
236242
defer h.Close()
243+
if strings.Contains(t.Name(), "AtomicWrites") && !h.SupportsAtomicWrites() {
244+
t.Skip()
245+
}
237246

238247
t.Run("StdRev", func(t *testing.T) {
239248
withColl(t, h, SingleKey, func(t *testing.T, _ Harness, coll *docstore.Collection) {

docstore/gcpfirestore/fs.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,9 @@ func (c *collection) RevisionField() string {
265265
// RunActions implements driver.RunActions.
266266
func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, opts *driver.RunActionsOptions) driver.ActionListError {
267267
errs := make([]error, len(actions))
268-
beforeGets, gets, writes, _, afterGets := driver.GroupActions(actions)
268+
beforeGets, gets, writes, writesTx, afterGets := driver.GroupActions(actions)
269269
calls := c.buildCommitCalls(writes, errs)
270+
atomicWritesCall := c.buildAtomicWritesCommitCall(writesTx, errs)
270271
// runGets does not issue concurrent RPCs, so it doesn't need a throttle.
271272
c.runGets(ctx, beforeGets, errs, opts)
272273
t := driver.NewThrottle(c.opts.MaxOutstandingActionRPCs)
@@ -278,6 +279,15 @@ func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, o
278279
c.doCommitCall(ctx, call, errs, opts)
279280
}()
280281
}
282+
// commit the atomic writes
283+
if atomicWritesCall != nil && len(atomicWritesCall.actions) > 0 {
284+
t.Acquire()
285+
go func() {
286+
defer t.Release()
287+
c.doCommitCall(ctx, atomicWritesCall, errs, opts)
288+
}()
289+
}
290+
281291
t.Acquire()
282292
c.runGets(ctx, gets, errs, opts)
283293
t.Release()
@@ -407,6 +417,23 @@ func (c *collection) buildCommitCalls(actions []*driver.Action, errs []error) []
407417
return append(pCalls, nCall)
408418
}
409419

420+
// Construct a commit call with all the atomic writes
421+
func (c *collection) buildAtomicWritesCommitCall(actions []*driver.Action, errs []error) *commitCall {
422+
atomicWritesCommitCall := &commitCall{}
423+
for _, a := range actions {
424+
ws, nn, err := c.actionToWrites(a)
425+
if err != nil {
426+
errs[a.Index] = err
427+
return nil
428+
} else {
429+
atomicWritesCommitCall.writes = append(atomicWritesCommitCall.writes, ws...)
430+
atomicWritesCommitCall.actions = append(atomicWritesCommitCall.actions, a)
431+
atomicWritesCommitCall.newNames = append(atomicWritesCommitCall.newNames, nn)
432+
}
433+
}
434+
return atomicWritesCommitCall
435+
}
436+
410437
// Convert an action to one or more Firestore Write protos.
411438
func (c *collection) actionToWrites(a *driver.Action) ([]*pb.Write, string, error) {
412439
var (

docstore/gcpfirestore/fs_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333

3434
const (
3535
// projectID is the project ID that was used during the last test run using --record.
36-
projectID = "go-cloud-test-216917"
36+
projectID = "go-cloud-test-216918"
3737
collectionName1 = "docstore-test-1"
3838
collectionName2 = "docstore-test-2"
3939
collectionName3 = "docstore-test-3"
@@ -45,6 +45,10 @@ type harness struct {
4545
done func()
4646
}
4747

48+
func (h *harness) SupportsAtomicWrites() bool {
49+
return true
50+
}
51+
4852
func newHarness(ctx context.Context, t *testing.T) (drivertest.Harness, error) {
4953
t.Helper()
5054

Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

docstore/memdocstore/mem_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ func (*harness) BeforeQueryTypes() []interface{} { return nil }
5252

5353
func (*harness) RevisionsEqual(rev1, rev2 interface{}) bool { return rev1 == rev2 }
5454

55+
func (*harness) SupportsAtomicWrites() bool { return false }
56+
5557
func (*harness) Close() {}
5658

5759
func TestConformance(t *testing.T) {

docstore/mongodocstore/cosmos_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func TestConformanceCosmos(t *testing.T) {
5252
}()
5353

5454
newHarness := func(context.Context, *testing.T) (drivertest.Harness, error) {
55-
return &harness{client.Database(dbName)}, nil
55+
return &harness{client.Database(dbName), true}, nil
5656
}
5757
drivertest.RunConformanceTests(t, newHarness, codecTester{}, []drivertest.AsTest{verifyAs{}})
5858
}
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
version: '3.8'
2+
services:
3+
mongo:
4+
image: mongo:4.4
5+
container_name: mongo
6+
restart: always
7+
volumes:
8+
- mongo_data:/data/db
9+
ports:
10+
- "27017:27017"
11+
command: ["--replSet", "dbrs", "--bind_ip_all"]
12+
13+
mongosetup:
14+
image: mongo:4.4
15+
container_name: mongosetup
16+
depends_on:
17+
- mongo
18+
entrypoint: >
19+
bash -c "sleep 5 &&
20+
mongo --host mongo:27017 --eval '
21+
rs.initiate({
22+
_id: \"dbrs\",
23+
members: [
24+
{ _id: 0, host: \"mongo:27017\"}
25+
]
26+
})
27+
'"
28+
29+
volumes:
30+
mongo_data:

docstore/mongodocstore/localmongo.sh

+6-5
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020
set -euo pipefail
2121

2222
echo "Starting MongoDB v4 listening on 27017..."
23-
docker rm -f mongo4 &> /dev/null || :
24-
docker run -d --name mongo4 -p 27017:27017 mongo:4 &> /dev/null
25-
echo "...done. Run \"docker rm -f mongo4\" to clean up the container."
23+
docker rm -f mongo4 mongosetup &> /dev/null || :
24+
docker compose -f ./docstore/mongodocstore/docker-compose.yml up --wait &> /dev/null
25+
sleep 3
26+
echo "...done. Run \"docker rm -f mongo4 mongosetup\" to clean up the container."
2627
echo
2728

28-
echo "Starting MongoDB v3 listening on 27018..."
29+
echo "Starting MongoDB v3 listening on 27020..."
2930
docker rm -f mongo3 &> /dev/null || :
30-
docker run -d --name mongo3 -p 27018:27017 mongo:3 &> /dev/null
31+
docker run -d --name mongo3 -p 27020:27017 mongo:3 &> /dev/null
3132
echo "...done. Run \"docker rm -f mongo3\" to clean up the container."
3233
echo
3334

docstore/mongodocstore/mongo.go

+135-2
Original file line numberDiff line numberDiff line change
@@ -204,12 +204,15 @@ const mongoIDField = "_id"
204204

205205
func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, opts *driver.RunActionsOptions) driver.ActionListError {
206206
errs := make([]error, len(actions))
207-
beforeGets, gets, writes, _, afterGets := driver.GroupActions(actions)
207+
beforeGets, gets, writes, writesTx, afterGets := driver.GroupActions(actions)
208208
c.runGets(ctx, beforeGets, errs, opts)
209209
ch := make(chan []error)
210210
go func() { ch <- c.bulkWrite(ctx, writes, errs, opts) }()
211+
ch2 := make(chan []error)
212+
go func() { ch2 <- c.txWrite(ctx, writesTx, errs, opts) }()
211213
c.runGets(ctx, gets, errs, opts)
212214
writeErrs := <-ch
215+
<-ch2
213216
c.runGets(ctx, afterGets, errs, opts)
214217
alerr := driver.NewActionListError(errs)
215218
for _, werr := range writeErrs {
@@ -560,8 +563,136 @@ func (c *collection) bulkWrite(ctx context.Context, actions []*driver.Action, er
560563
return reterrs
561564
}
562565

563-
func (c *collection) determineDeleteErrors(ctx context.Context, models []mongo.WriteModel, actions []*driver.Action, errs []error) {
566+
func (c *collection) txWrite(ctx context.Context, actions []*driver.Action, errs []error, dopts *driver.RunActionsOptions) []error {
567+
var (
568+
models []mongo.WriteModel
569+
modelActions []*driver.Action // corresponding action for each model
570+
newIDs []interface{} // new IDs for Create actions, corresponding to models slice
571+
revs []string // new revisions, corresponding to models slice
572+
nDeletes int64
573+
nNonCreateWrite int64 // total operations expected from Put, Replace and Update
574+
)
575+
576+
// all actions will fail atomically even if a single action fails
577+
setErr := func(err error) {
578+
for _, a := range actions {
579+
errs[a.Index] = err
580+
}
581+
}
582+
583+
for _, a := range actions {
584+
var m mongo.WriteModel
585+
var err error
586+
var newID interface{}
587+
var rev string
588+
switch a.Kind {
589+
case driver.Create:
590+
m, newID, rev, err = c.newCreateModel(a)
591+
case driver.Delete:
592+
m, err = c.newDeleteModel(a)
593+
if err == nil {
594+
nDeletes++
595+
}
596+
case driver.Replace, driver.Put:
597+
m, rev, err = c.newReplaceModel(a, a.Kind == driver.Put)
598+
if err == nil {
599+
nNonCreateWrite++
600+
}
601+
case driver.Update:
602+
m, rev, err = c.newUpdateModel(a)
603+
if err == nil && m != nil {
604+
nNonCreateWrite++
605+
}
606+
default:
607+
err = gcerr.Newf(gcerr.Internal, nil, "bad action %+v", a)
608+
}
609+
if err != nil {
610+
setErr(err)
611+
return nil
612+
} else if m != nil { // m can be nil for a no-op update
613+
models = append(models, m)
614+
modelActions = append(modelActions, a)
615+
newIDs = append(newIDs, newID)
616+
revs = append(revs, rev)
617+
}
618+
}
619+
if len(models) == 0 {
620+
return nil
621+
}
622+
623+
bopts := options.BulkWrite().SetOrdered(true)
624+
if dopts.BeforeDo != nil {
625+
asFunc := func(target interface{}) bool {
626+
switch t := target.(type) {
627+
case *[]mongo.WriteModel:
628+
*t = models
629+
case **options.BulkWriteOptions:
630+
*t = bopts
631+
default:
632+
return false
633+
}
634+
return true
635+
}
636+
if err := dopts.BeforeDo(asFunc); err != nil {
637+
return []error{err}
638+
}
639+
}
640+
641+
client := c.coll.Database().Client()
642+
session, err := client.StartSession()
643+
if err != nil {
644+
setErr(err)
645+
return nil
646+
}
647+
defer session.EndSession(ctx)
648+
649+
callback := func(sessionCtx mongo.SessionContext) error {
650+
if err := session.StartTransaction(); err != nil {
651+
return err
652+
}
653+
654+
res, err := c.coll.BulkWrite(sessionCtx, models, bopts)
655+
if res.DeletedCount != nDeletes {
656+
// Some Delete actions failed. It's not an error if a Delete failed because
657+
// the document didn't exist, but it is an error if it failed because of a
658+
// precondition mismatch. Find all the documents with revisions we tried to delete; if
659+
// any are still present, that's an error.
660+
if c.determineDeleteErrors(ctx, models, modelActions, errs) {
661+
setErr(gcerr.Newf(gcerr.FailedPrecondition, nil,
662+
"wrong revision for document to be deleted"))
663+
}
664+
}
665+
if res.MatchedCount+res.UpsertedCount != nNonCreateWrite {
666+
err = gcerr.Newf(gcerr.NotFound, nil, "some writes failed (replaced %d, upserted %d, out of total %d)", res.MatchedCount, res.UpsertedCount, nNonCreateWrite)
667+
}
668+
if err != nil {
669+
abortTxErr := session.AbortTransaction(context.Background())
670+
if abortTxErr != nil {
671+
return err
672+
}
673+
return err
674+
}
675+
676+
if err = session.CommitTransaction(sessionCtx); err != nil {
677+
return err
678+
}
679+
680+
return nil
681+
}
682+
683+
err = mongo.WithSession(ctx, session, callback)
684+
685+
if err != nil {
686+
setErr(err)
687+
return nil
688+
}
689+
return nil
690+
}
691+
692+
// determineDeleteErrors find the errors for the delete and return true if found any
693+
func (c *collection) determineDeleteErrors(ctx context.Context, models []mongo.WriteModel, actions []*driver.Action, errs []error) bool {
564694
// TODO(jba): do this concurrently.
695+
foundErr := false
565696
for i, m := range models {
566697
if dm, ok := m.(*mongo.DeleteOneModel); ok {
567698
filter := dm.Filter.(bson.D)
@@ -580,10 +711,12 @@ func (c *collection) determineDeleteErrors(ctx context.Context, models []mongo.W
580711
// revision.
581712
errs[actions[i].Index] = gcerr.Newf(gcerr.FailedPrecondition, nil,
582713
"wrong revision for document with ID %v", actions[i].Key)
714+
foundErr = true
583715
}
584716
}
585717
}
586718
}
719+
return foundErr
587720
}
588721

589722
func (c *collection) newCreateModel(a *driver.Action) (*mongo.InsertOneModel, interface{}, string, error) {

0 commit comments

Comments
 (0)