Skip to content

Commit cd3d59d

Browse files
authored
Merge pull request cockroachdb#165571 from DrewKimball/backport24.3-165260
release-24.3: sql: partially fix data race in CollationEnvironment
2 parents 3751291 + acb009b commit cd3d59d

File tree

8 files changed

+170
-31
lines changed

8 files changed

+170
-31
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
# LogicTest: 5node
2+
3+
subtest outbox_race
4+
5+
# Regression test for #110322: data race in CollationEnvironment when multiple
6+
# outboxes share the same evalCtx on the gateway and evaluate COLLATE
7+
# expressions concurrently.
8+
9+
statement ok
10+
CREATE TABLE t1 (c0 STRING PRIMARY KEY)
11+
12+
statement ok
13+
INSERT INTO t1 VALUES ('a'), ('b'), ('c'), ('d'), ('e')
14+
15+
statement ok
16+
ALTER TABLE t1 SPLIT AT VALUES ('b'), ('c'), ('d'), ('e')
17+
18+
retry
19+
statement ok
20+
ALTER TABLE t1 EXPERIMENTAL_RELOCATE VALUES
21+
(ARRAY[1], 'a'),
22+
(ARRAY[2], 'b'),
23+
(ARRAY[3], 'c'),
24+
(ARRAY[4], 'd'),
25+
(ARRAY[5], 'e')
26+
27+
# Force DistSQL so the query creates multiple outboxes sharing the same
28+
# evalCtx on the gateway.
29+
statement ok
30+
SET distsql = on
31+
32+
# This TLP-style UNION query creates three filter processors, each evaluating
33+
# a COLLATE expression. When distributed, the gateway runs multiple outbox
34+
# goroutines concurrently that all call NewDCollatedString through the shared
35+
# CollationEnvironment, triggering a data race.
36+
query T rowsort
37+
SELECT * FROM t1 WHERE (t1.c0 COLLATE en) > ('_' COLLATE en)
38+
UNION
39+
SELECT * FROM t1 WHERE NOT ((t1.c0 COLLATE en) > ('_' COLLATE en))
40+
UNION
41+
SELECT * FROM t1 WHERE ((t1.c0 COLLATE en) > ('_' COLLATE en)) IS NULL
42+
----
43+
a
44+
b
45+
c
46+
d
47+
e
48+
49+
# Run the same query pattern several more times to increase the chance of
50+
# triggering the race under the race detector.
51+
query T rowsort
52+
SELECT * FROM t1 WHERE (t1.c0 COLLATE en) > ('b' COLLATE en)
53+
UNION
54+
SELECT * FROM t1 WHERE NOT ((t1.c0 COLLATE en) > ('b' COLLATE en))
55+
UNION
56+
SELECT * FROM t1 WHERE ((t1.c0 COLLATE en) > ('b' COLLATE en)) IS NULL
57+
----
58+
a
59+
b
60+
c
61+
d
62+
e
63+
64+
query T rowsort
65+
SELECT * FROM t1 WHERE (t1.c0 COLLATE en) > ('c' COLLATE en)
66+
UNION
67+
SELECT * FROM t1 WHERE NOT ((t1.c0 COLLATE en) > ('c' COLLATE en))
68+
UNION
69+
SELECT * FROM t1 WHERE ((t1.c0 COLLATE en) > ('c' COLLATE en)) IS NULL
70+
----
71+
a
72+
b
73+
c
74+
d
75+
e
76+
77+
subtest end
78+
79+
subtest sampler_race
80+
81+
# Regression test for data race in CollationEnvironment when the sampler and
82+
# sample aggregator processors share the same FlowCtx.EvalCtx on the gateway.
83+
# Both processors call SampleRow -> copyRow -> truncateDatum ->
84+
# NewDCollatedString concurrently for collated strings exceeding the 400-byte
85+
# sample size limit, racing on the shared CollationEnv.
86+
#
87+
# The race window: on the gateway, the local sampler processes rows (calling
88+
# truncateDatum) while the SampleAggregator concurrently processes sample rows
89+
# arriving from remote nodes' samplers (also calling truncateDatum). Both use
90+
# the shared FlowCtx.EvalCtx.CollationEnv.
91+
#
92+
# To maximize overlap, we put many rows on the gateway (so its sampler is slow)
93+
# and fewer rows on a remote node (so its sampler finishes first and sends
94+
# sample rows to the SampleAggregator while the gateway's sampler is still
95+
# running).
96+
97+
statement ok
98+
CREATE TABLE t2 (c0 STRING COLLATE en PRIMARY KEY)
99+
100+
# Insert long collated strings (> 400 bytes) to trigger the truncateDatum path,
101+
# which calls NewDCollatedString using the shared CollationEnv. We put many
102+
# rows on node 1 (gateway) and fewer on node 2.
103+
statement ok
104+
INSERT INTO t2
105+
SELECT (chr(97 + (i % 26)) || repeat('x', 500) || i::STRING) COLLATE en
106+
FROM generate_series(1, 100) AS g(i)
107+
108+
# Split into two ranges: the first ~half on node 1 (gateway) and the rest on
109+
# node 2.
110+
statement ok
111+
ALTER TABLE t2 SPLIT AT VALUES ((chr(97 + 13) || repeat('x', 500) || '50') COLLATE en)
112+
113+
retry
114+
statement ok
115+
ALTER TABLE t2 EXPERIMENTAL_RELOCATE VALUES
116+
(ARRAY[1], (chr(97) || repeat('x', 500) || '1') COLLATE en),
117+
(ARRAY[2], (chr(97 + 13) || repeat('x', 500) || '50') COLLATE en)
118+
119+
# Run CREATE STATISTICS several times to increase the chance of triggering the
120+
# race between the sampler and sample aggregator on the gateway node. Both
121+
# processors run in separate goroutines sharing the same FlowCtx.EvalCtx.
122+
statement ok
123+
CREATE STATISTICS s1 ON c0 FROM t2
124+
125+
statement ok
126+
CREATE STATISTICS s2 ON c0 FROM t2
127+
128+
statement ok
129+
CREATE STATISTICS s3 ON c0 FROM t2
130+
131+
subtest end

pkg/sql/logictest/tests/5node/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ go_test(
1212
"//build/toolchains:is_heavy": {"test.Pool": "heavy"},
1313
"//conditions:default": {"test.Pool": "large"},
1414
}),
15-
shard_count = 20,
15+
shard_count = 21,
1616
tags = ["cpu:3"],
1717
deps = [
1818
"//pkg/base",

pkg/sql/logictest/tests/5node/generated_test.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/sql/rowexec/sample_aggregator.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ import (
4141
type sampleAggregator struct {
4242
execinfra.ProcessorBase
4343

44-
spec *execinfrapb.SampleAggregatorSpec
45-
input execinfra.RowSource
46-
inTypes []*types.T
47-
sr stats.SampleReservoir
44+
spec *execinfrapb.SampleAggregatorSpec
45+
input execinfra.RowSource
46+
inTypes []*types.T
47+
sr stats.SampleReservoir
48+
collationEnv tree.CollationEnvironment
4849

4950
// memAcc accounts for memory accumulated throughout the life of the
5051
// sampleAggregator.
@@ -422,7 +423,7 @@ func (s *sampleAggregator) sampleRow(
422423
ctx context.Context, sr *stats.SampleReservoir, sampleRow rowenc.EncDatumRow, rank uint64,
423424
) error {
424425
prevCapacity := sr.Cap()
425-
if err := sr.SampleRow(ctx, s.FlowCtx.EvalCtx, sampleRow, rank); err != nil {
426+
if err := sr.SampleRow(ctx, &s.collationEnv, sampleRow, rank); err != nil {
426427
if code := pgerror.GetPGCode(err); code != pgcode.OutOfMemory {
427428
return err
428429
}

pkg/sql/rowexec/sampler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type samplerProcessor struct {
4747
input execinfra.RowSource
4848
memAcc mon.BoundAccount
4949
sr stats.SampleReservoir
50+
collationEnv tree.CollationEnvironment
5051
sketches []sketchInfo
5152
outTypes []*types.T
5253
maxFractionIdle float64
@@ -452,7 +453,7 @@ func (s *samplerProcessor) sampleRow(
452453
// Use Int63 so we don't have headaches converting to DInt.
453454
rank := uint64(rng.Int63())
454455
prevCapacity := sr.Cap()
455-
if err := sr.SampleRow(ctx, s.FlowCtx.EvalCtx, row, rank); err != nil {
456+
if err := sr.SampleRow(ctx, &s.collationEnv, row, rank); err != nil {
456457
if !sqlerrors.IsOutOfMemoryError(err) {
457458
return false, err
458459
}

pkg/sql/sem/eval/context.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,9 +512,12 @@ func (ec *Context) SessionData() *sessiondata.SessionData {
512512
return ec.SessionDataStack.Top()
513513
}
514514

515-
// Copy returns a deep copy of ctx.
515+
// Copy returns a copy of the EvalCtx that can safely be used concurrently with
516+
// the original.
516517
func (ec *Context) Copy() *Context {
517518
ctxCopy := *ec
519+
// CollationEnvironment is not thread safe.
520+
ctxCopy.CollationEnv = tree.CollationEnvironment{}
518521
ctxCopy.iVarContainerStack = make([]tree.IndexedVarContainer, len(ec.iVarContainerStack), cap(ec.iVarContainerStack))
519522
copy(ctxCopy.iVarContainerStack, ec.iVarContainerStack)
520523
return &ctxCopy

pkg/sql/stats/row_sampling.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111

1212
"github.com/cockroachdb/cockroach/pkg/sql/memsize"
1313
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
14-
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
1514
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1615
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
1716
"github.com/cockroachdb/cockroach/pkg/sql/types"
@@ -166,7 +165,7 @@ func (sr *SampleReservoir) retryMaybeResize(ctx context.Context, op func() error
166165
// SampleRow returns an error (any type of error), no additional calls to
167166
// SampleRow should be made as the failed samples will have introduced bias.
168167
func (sr *SampleReservoir) SampleRow(
169-
ctx context.Context, evalCtx *eval.Context, row rowenc.EncDatumRow, rank uint64,
168+
ctx context.Context, collationEnv *tree.CollationEnvironment, row rowenc.EncDatumRow, rank uint64,
170169
) error {
171170
return sr.retryMaybeResize(ctx, func() error {
172171
if len(sr.samples) < cap(sr.samples) {
@@ -180,7 +179,7 @@ func (sr *SampleReservoir) SampleRow(
180179
return err
181180
}
182181
}
183-
if err := sr.copyRow(ctx, evalCtx, rowCopy, row); err != nil {
182+
if err := sr.copyRow(ctx, collationEnv, rowCopy, row); err != nil {
184183
return err
185184
}
186185
sr.samples = append(sr.samples, SampledRow{Row: rowCopy, Rank: rank})
@@ -192,7 +191,7 @@ func (sr *SampleReservoir) SampleRow(
192191
}
193192
// Replace the max rank if ours is smaller.
194193
if len(sr.samples) > 0 && rank < sr.samples[0].Rank {
195-
if err := sr.copyRow(ctx, evalCtx, sr.samples[0].Row, row); err != nil {
194+
if err := sr.copyRow(ctx, collationEnv, sr.samples[0].Row, row); err != nil {
196195
return err
197196
}
198197
sr.samples[0].Rank = rank
@@ -238,7 +237,7 @@ func (sr *SampleReservoir) GetNonNullDatums(
238237
}
239238

240239
func (sr *SampleReservoir) copyRow(
241-
ctx context.Context, evalCtx *eval.Context, dst, src rowenc.EncDatumRow,
240+
ctx context.Context, collationEnv *tree.CollationEnvironment, dst, src rowenc.EncDatumRow,
242241
) error {
243242
// First, we calculate how much memory has already been accounted for the
244243
// "before" row (row that we're about to overwrite) as well as how much
@@ -263,7 +262,7 @@ func (sr *SampleReservoir) copyRow(
263262

264263
// If the datum is too large, truncate it.
265264
if afterSize > uintptr(maxBytesPerSample) {
266-
sr.scratch[i].Datum = truncateDatum(evalCtx, sr.scratch[i].Datum, maxBytesPerSample)
265+
sr.scratch[i].Datum = truncateDatum(collationEnv, sr.scratch[i].Datum, maxBytesPerSample)
267266
afterSize = sr.scratch[i].Size()
268267
}
269268
afterRowSize += int64(afterSize)
@@ -289,7 +288,7 @@ const maxBytesPerSample = 400
289288
//
290289
// For example, if maxBytes=10, "Cockroach Labs" would be truncated to
291290
// "Cockroach ".
292-
func truncateDatum(evalCtx *eval.Context, d tree.Datum, maxBytes int) tree.Datum {
291+
func truncateDatum(collationEnv *tree.CollationEnvironment, d tree.Datum, maxBytes int) tree.Datum {
293292
switch t := d.(type) {
294293
case *tree.DBitArray:
295294
b := tree.DBitArray{BitArray: t.ToWidth(uint(maxBytes * 8))}
@@ -310,15 +309,15 @@ func truncateDatum(evalCtx *eval.Context, d tree.Datum, maxBytes int) tree.Datum
310309

311310
// Note: this will end up being larger than maxBytes due to the key and
312311
// locale, so this is just a best-effort attempt to limit the size.
313-
res, err := tree.NewDCollatedString(contents, t.Locale, &evalCtx.CollationEnv)
312+
res, err := tree.NewDCollatedString(contents, t.Locale, collationEnv)
314313
if err != nil {
315314
return d
316315
}
317316
return res
318317

319318
case *tree.DOidWrapper:
320319
return &tree.DOidWrapper{
321-
Wrapped: truncateDatum(evalCtx, t.Wrapped, maxBytes),
320+
Wrapped: truncateDatum(collationEnv, t.Wrapped, maxBytes),
322321
Oid: t.Oid,
323322
}
324323

pkg/sql/stats/row_sampling_test.go

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,16 @@ import (
2929
// runSampleTest feeds rows with the given ranks through a reservoir
3030
// of a given size and verifies the results are correct.
3131
func runSampleTest(
32-
t *testing.T,
33-
evalCtx *eval.Context,
34-
numSamples, expectedNumSamples int,
35-
ranks []int,
36-
memAcc *mon.BoundAccount,
32+
t *testing.T, numSamples, expectedNumSamples int, ranks []int, memAcc *mon.BoundAccount,
3733
) {
3834
ctx := context.Background()
3935
var sr SampleReservoir
36+
var collationEnv tree.CollationEnvironment
4037
sr.Init(numSamples, 1, []*types.T{types.Int}, memAcc, intsets.MakeFast(0))
4138
for _, r := range ranks {
4239
d := rowenc.DatumToEncDatum(types.Int, tree.NewDInt(tree.DInt(r)))
4340
prevCapacity := sr.Cap()
44-
if err := sr.SampleRow(ctx, evalCtx, rowenc.EncDatumRow{d}, uint64(r)); err != nil {
41+
if err := sr.SampleRow(ctx, &collationEnv, rowenc.EncDatumRow{d}, uint64(r)); err != nil {
4542
t.Fatal(err)
4643
} else if sr.Cap() != prevCapacity {
4744
t.Logf(
@@ -92,7 +89,6 @@ func runSampleTest(
9289
func TestSampleReservoir(t *testing.T) {
9390
ctx := context.Background()
9491
st := cluster.MakeTestingClusterSettings()
95-
evalCtx := eval.MakeTestingEvalContext(st)
9692

9793
for _, n := range []int{10, 100, 1000, 10000} {
9894
rng, _ := randutil.NewTestRand()
@@ -102,7 +98,7 @@ func TestSampleReservoir(t *testing.T) {
10298
}
10399
for _, k := range []int{1, 5, 10, 100} {
104100
t.Run(fmt.Sprintf("n=%d/k=%d/mem=nolimit", n, k), func(t *testing.T) {
105-
runSampleTest(t, &evalCtx, k, k, ranks, nil)
101+
runSampleTest(t, k, k, ranks, mon.NewStandaloneUnlimitedAccount())
106102
})
107103
for _, mem := range []int64{1 << 8, 1 << 10, 1 << 12} {
108104
t.Run(fmt.Sprintf("n=%d/k=%d/mem=%d", n, k, mem), func(t *testing.T) {
@@ -122,7 +118,7 @@ func TestSampleReservoir(t *testing.T) {
122118
} else if mem == 1<<12 && n > 10 && k > 10 {
123119
expectedK = 25
124120
}
125-
runSampleTest(t, &evalCtx, k, expectedK, ranks, &memAcc)
121+
runSampleTest(t, k, expectedK, ranks, &memAcc)
126122
})
127123
}
128124
}
@@ -132,8 +128,9 @@ func TestSampleReservoir(t *testing.T) {
132128
func TestTruncateDatum(t *testing.T) {
133129
ctx := context.Background()
134130
evalCtx := eval.MakeTestingEvalContext(cluster.MakeTestingClusterSettings())
131+
var collationEnv tree.CollationEnvironment
135132
runTest := func(d, expected tree.Datum) {
136-
actual := truncateDatum(&evalCtx, d, 10 /* maxBytes */)
133+
actual := truncateDatum(&collationEnv, d, 10 /* maxBytes */)
137134
if cmp, err := actual.Compare(ctx, &evalCtx, expected); err != nil {
138135
t.Fatal(err)
139136
} else if cmp != 0 {
@@ -185,7 +182,7 @@ corn, the green oats, and the haystacks piled up in the meadows looked beautiful
185182
func TestSampleReservoirMemAccounting(t *testing.T) {
186183
ctx := context.Background()
187184
st := cluster.MakeTestingClusterSettings()
188-
evalCtx := eval.MakeTestingEvalContext(st)
185+
var collationEnv tree.CollationEnvironment
189186

190187
getStringDatum := func(l int) rowenc.EncDatum {
191188
d := tree.DString(strings.Repeat("a", l))
@@ -210,9 +207,9 @@ func TestSampleReservoirMemAccounting(t *testing.T) {
210207
memAcc := monitor.MakeBoundAccount()
211208
var sr SampleReservoir
212209
sr.Init(2, 1, []*types.T{types.String, types.String}, &memAcc, intsets.MakeFast(0, 1))
213-
require.NoError(t, sr.SampleRow(ctx, &evalCtx, rows[0], 3))
214-
require.NoError(t, sr.SampleRow(ctx, &evalCtx, rows[1], 2))
215-
err := sr.SampleRow(ctx, &evalCtx, rows[2], 1)
210+
require.NoError(t, sr.SampleRow(ctx, &collationEnv, rows[0], 3))
211+
require.NoError(t, sr.SampleRow(ctx, &collationEnv, rows[1], 2))
212+
err := sr.SampleRow(ctx, &collationEnv, rows[2], 1)
216213
require.Error(t, err)
217214
require.True(t, testutils.IsError(err, "memory budget exceeded"))
218215
}

0 commit comments

Comments
 (0)