Skip to content

Commit f5c75b0

Browse files
authored
Merge pull request #11 from jehiah/concurrent_mutations_11
bug: fix concurrent same-row mutations
2 parents 3280dad + e1899ee commit f5c75b0

File tree

2 files changed

+115
-8
lines changed

2 files changed

+115
-8
lines changed

bttest/inmem.go

+17-8
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo
455455
// Read all rows
456456
tbl.rows.Ascend(addRow)
457457
}
458-
gcRules := tbl.gcRules()
458+
gcRules := tbl.gcRulesNoLock()
459459
tbl.mu.RUnlock()
460460

461461
rows := make([]*row, 0, len(rowSet))
@@ -883,12 +883,15 @@ func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*bt
883883
return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
884884
}
885885
fs := tbl.columnFamilies()
886+
887+
tbl.mu.Lock()
888+
defer tbl.mu.Unlock()
886889
r := tbl.mutableRow(string(req.RowKey))
887890
if err := applyMutations(tbl, r, req.Mutations, fs); err != nil {
888891
return nil, err
889892
}
890893
// JIT per-row GC
891-
r.gc(tbl.gcRules())
894+
r.gc(tbl.gcRulesNoLock())
892895
// JIT family deletion
893896
for f, _ := range r.families {
894897
if _, ok := fs[f]; !ok {
@@ -920,7 +923,8 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu
920923
res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))}
921924

922925
cfs := tbl.columnFamilies()
923-
926+
tbl.mu.Lock()
927+
defer tbl.mu.Unlock()
924928
for i, entry := range req.Entries {
925929
r := tbl.mutableRow(string(entry.RowKey))
926930
code, msg := int32(codes.OK), ""
@@ -932,7 +936,7 @@ func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_Mu
932936
Index: int64(i),
933937
Status: &statpb.Status{Code: code, Message: msg},
934938
}
935-
r.gc(tbl.gcRules())
939+
r.gc(tbl.gcRulesNoLock())
936940
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
937941
for f, _ := range r.families {
938942
if _, ok := cfs[f]; !ok {
@@ -954,7 +958,8 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate
954958
res := &btpb.CheckAndMutateRowResponse{}
955959

956960
cfs := tbl.columnFamilies()
957-
961+
tbl.mu.Lock()
962+
defer tbl.mu.Unlock()
958963
r := tbl.mutableRow(string(req.RowKey))
959964

960965
// Figure out which mutation to apply.
@@ -982,7 +987,7 @@ func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutate
982987
if err := applyMutations(tbl, r, muts, cfs); err != nil {
983988
return nil, err
984989
}
985-
r.gc(tbl.gcRules())
990+
r.gc(tbl.gcRulesNoLock())
986991
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
987992
for f, _ := range r.families {
988993
if _, ok := cfs[f]; !ok {
@@ -1119,7 +1124,8 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
11191124
}
11201125

11211126
cfs := tbl.columnFamilies()
1122-
1127+
tbl.mu.Lock()
1128+
defer tbl.mu.Unlock()
11231129
rowKey := string(req.RowKey)
11241130
r := tbl.mutableRow(rowKey)
11251131
resultRow := newRow(rowKey) // copy of updated cells
@@ -1177,7 +1183,7 @@ func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWri
11771183
resultFamily.cellsByColumn(col) // create the column
11781184
resultFamily.Cells[col] = []cell{newCell} // overwrite the cells
11791185
}
1180-
r.gc(tbl.gcRules())
1186+
r.gc(tbl.gcRulesNoLock())
11811187
// JIT family deletion; could be skipped if mutableRow doesn't return an existing row
11821188
for f, _ := range r.families {
11831189
if _, ok := cfs[f]; !ok {
@@ -1318,7 +1324,10 @@ func (t *table) gcRules() map[string]*btapb.GcRule {
13181324
// This method doesn't add or remove rows, so we only need a read lock for the table.
13191325
t.mu.RLock()
13201326
defer t.mu.RUnlock()
1327+
return t.gcRulesNoLock()
1328+
}
13211329

1330+
func (t *table) gcRulesNoLock() map[string]*btapb.GcRule {
13221331
// Gather GC rules we'll apply.
13231332
rules := make(map[string]*btapb.GcRule) // keyed by "fam"
13241333
for fam, cf := range t.families {

bttest/inmem_test.go

+98
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,104 @@ func TestConcurrentMutationsReadModifyAndGC(t *testing.T) {
173173
}
174174
}
175175

176+
func TestConcurrentMutations(t *testing.T) {
177+
// 50 concurrent mutations of different cells on the same row
178+
// expect all 50 values after
179+
s := newTestServer(t)
180+
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
181+
defer cancel()
182+
if _, err := s.CreateTable(
183+
ctx,
184+
&btapb.CreateTableRequest{Parent: "c", TableId: "t"}); err != nil {
185+
t.Fatal(err)
186+
}
187+
const name = `c/tables/t`
188+
req := &btapb.ModifyColumnFamiliesRequest{
189+
Name: name,
190+
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
191+
Id: "cf",
192+
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{Create: &btapb.ColumnFamily{}},
193+
}},
194+
}
195+
_, err := s.ModifyColumnFamilies(ctx, req)
196+
if err != nil {
197+
t.Fatal(err)
198+
}
199+
var wg sync.WaitGroup
200+
ms := func(i int) []*btpb.Mutation {
201+
return []*btpb.Mutation{{
202+
Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{
203+
FamilyName: "cf",
204+
ColumnQualifier: []byte(fmt.Sprintf("%d", i)),
205+
Value: []byte(fmt.Sprintf("%d", i)),
206+
TimestampMicros: 1000,
207+
}},
208+
}}
209+
}
210+
211+
rowKey := []byte("rowkey")
212+
start := make(chan bool)
213+
for i := 0; i < 50; i++ {
214+
i := i
215+
wg.Add(1)
216+
go func(i int) {
217+
defer wg.Done()
218+
<-start
219+
for ctx.Err() == nil {
220+
req := &btpb.MutateRowRequest{
221+
TableName: name,
222+
RowKey: rowKey,
223+
Mutations: ms(i),
224+
}
225+
if _, err := s.MutateRow(ctx, req); err != nil {
226+
panic(err) // can't use t.Fatal in goroutine
227+
}
228+
}
229+
}(i)
230+
}
231+
done := make(chan struct{})
232+
go func() {
233+
wg.Wait()
234+
close(done)
235+
}()
236+
close(start)
237+
select {
238+
case <-done:
239+
case <-time.After(1 * time.Second):
240+
t.Error("Concurrent mutations haven't completed after 1s")
241+
}
242+
243+
// verify
244+
mock := &MockReadRowsServer{}
245+
rreq := &btpb.ReadRowsRequest{TableName: name}
246+
if err = s.ReadRows(rreq, mock); err != nil {
247+
t.Fatalf("ReadRows error: %v", err)
248+
}
249+
if len(mock.responses) != 1 {
250+
t.Fatal("Response count: got 0, want 1")
251+
}
252+
if len(mock.responses[0].Chunks) != 50 {
253+
t.Errorf("Chunk count: got %d, want 50", len(mock.responses[0].Chunks))
254+
}
255+
256+
var gotChunks []*btpb.ReadRowsResponse_CellChunk
257+
for _, res := range mock.responses {
258+
gotChunks = append(gotChunks, res.Chunks...)
259+
}
260+
var seen []string
261+
for i, c := range gotChunks {
262+
if !bytes.Equal(c.RowKey, rowKey) {
263+
t.Fatalf("expected row %q got %q", c.RowKey, rowKey)
264+
}
265+
if !bytes.Equal(c.Qualifier.Value, c.Value) {
266+
t.Fatalf("[%d] expected equal got %q %q", i, c.Qualifier.Value, c.Value)
267+
}
268+
seen = append(seen, string(c.Qualifier.Value))
269+
}
270+
sort.Strings(seen)
271+
t.Logf("seen %#v", seen)
272+
}
273+
176274
func TestCreateTableResponse(t *testing.T) {
177275
// We need to ensure that invoking CreateTable returns
178276
// the ColumnFamilies as well as Granularity.

0 commit comments

Comments
 (0)