Skip to content

release-25.2: kvnemesis: test buffered writes for SSI transactions #146564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: release-25.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ go_library(
"//pkg/kv/kvbase",
"//pkg/kv/kvclient",
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/concurrency/lock",
Expand Down
24 changes: 16 additions & 8 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sort"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -1068,11 +1069,11 @@ func (rr requestRecord) toResp(
// The condition was satisfied; buffer the write and return a
// synthesized response.
ru.MustSetInner(&kvpb.ConditionalPutResponse{})
twb.addToBuffer(req.Key, req.Value, req.Sequence)
twb.addToBuffer(req.Key, req.Value, req.Sequence, req.KVNemesisSeq)

case *kvpb.PutRequest:
ru.MustSetInner(&kvpb.PutResponse{})
twb.addToBuffer(req.Key, req.Value, req.Sequence)
twb.addToBuffer(req.Key, req.Value, req.Sequence, req.KVNemesisSeq)

case *kvpb.DeleteRequest:
// To correctly populate FoundKey in the response, we must prefer any
Expand Down Expand Up @@ -1107,7 +1108,7 @@ func (rr requestRecord) toResp(
ru.MustSetInner(&kvpb.DeleteResponse{
FoundKey: foundKey,
})
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence)
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence, req.KVNemesisSeq)

case *kvpb.GetRequest:
val, served := twb.maybeServeRead(req.Key, req.Sequence)
Expand Down Expand Up @@ -1169,23 +1170,25 @@ func (rr requestRecords) Empty() bool {
}

// addToBuffer adds a write to the given key to the buffer.
func (twb *txnWriteBuffer) addToBuffer(key roachpb.Key, val roachpb.Value, seq enginepb.TxnSeq) {
func (twb *txnWriteBuffer) addToBuffer(
key roachpb.Key, val roachpb.Value, seq enginepb.TxnSeq, kvNemSeq kvnemesisutil.Container,
) {
it := twb.buffer.MakeIter()
seek := twb.seekItemForSpan(key, nil)

it.FirstOverlap(seek)
if it.Valid() {
// We've already seen a write for this key.
bw := it.Cur()
val := bufferedValue{val: val, seq: seq}
val := bufferedValue{val: val, seq: seq, kvNemesisSeq: kvNemSeq}
bw.vals = append(bw.vals, val)
twb.bufferSize += val.size()
} else {
twb.bufferIDAlloc++
bw := &bufferedWrite{
id: twb.bufferIDAlloc,
key: key,
vals: []bufferedValue{{val: val, seq: seq}},
vals: []bufferedValue{{val: val, seq: seq, kvNemesisSeq: kvNemSeq}},
}
twb.buffer.Set(bw)
twb.bufferSize += bw.size()
Expand Down Expand Up @@ -1328,8 +1331,11 @@ const bufferedValueStructOverhead = int64(unsafe.Sizeof(bufferedValue{}))

// bufferedValue is a value written to a key at a given sequence number.
type bufferedValue struct {
val roachpb.Value
seq enginepb.TxnSeq
// NB: Keep this at the start of the struct so that it is zero (size) cost in
// production.
kvNemesisSeq kvnemesisutil.Container
val roachpb.Value
seq enginepb.TxnSeq
}

// valPtr returns a pointer to the buffered value.
Expand Down Expand Up @@ -1380,6 +1386,7 @@ func (bw *bufferedWrite) toRequest() kvpb.RequestUnion {
putAlloc.put.Key = bw.key
putAlloc.put.Value = val.val
putAlloc.put.Sequence = val.seq
putAlloc.put.KVNemesisSeq = val.kvNemesisSeq
putAlloc.union.Put = &putAlloc.put
ru.Value = &putAlloc.union
} else {
Expand All @@ -1389,6 +1396,7 @@ func (bw *bufferedWrite) toRequest() kvpb.RequestUnion {
})
delAlloc.del.Key = bw.key
delAlloc.del.Sequence = val.seq
delAlloc.del.KVNemesisSeq = val.kvNemesisSeq
delAlloc.union.Delete = &delAlloc.del
ru.Value = &delAlloc.union
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ go_test(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/apply",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
if err := txn.SetIsoLevel(o.IsoLevel); err != nil {
panic(err)
}
txn.SetBufferedWritesEnabled(o.BufferedWrites)
if savedTxn != nil && txn.TestingCloneTxn().Epoch == 0 {
// If the txn's current epoch is 0 and we've run at least one prior
// iteration, we were just aborted.
Expand Down
24 changes: 15 additions & 9 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
type GeneratorConfig struct {
Ops OperationConfig
NumNodes, NumReplicas int

BufferedWritesProb float64
}

// OperationConfig configures the relative probabilities of producing various
Expand Down Expand Up @@ -1493,28 +1495,31 @@ func (g *generator) registerClosureTxnOps(allowed *[]opGen, c *ClosureTxnConfig)
const Commit, Rollback = ClosureTxnType_Commit, ClosureTxnType_Rollback
const SSI, SI, RC = isolation.Serializable, isolation.Snapshot, isolation.ReadCommitted
addOpGen(allowed,
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable)
makeClosureTxn(Commit, SSI, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSnapshot)
makeClosureTxn(Commit, SI, 0 /* bufferedWritesProb */, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSnapshot)
addOpGen(allowed,
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitReadCommitted)
makeClosureTxn(Commit, RC, 0 /* bufferedWritesProb */, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitReadCommitted)

addOpGen(allowed,
makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSerializable)
makeClosureTxn(Rollback, SSI, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSerializable)
addOpGen(allowed,
makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSnapshot)
makeClosureTxn(Rollback, SI, 0 /* bufferedWritesProb */, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSnapshot)
addOpGen(allowed,
makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackReadCommitted)
makeClosureTxn(Rollback, RC, 0 /* bufferedWritesProb */, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackReadCommitted)

addOpGen(allowed,
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch)
makeClosureTxn(Commit, SSI, g.Config.BufferedWritesProb, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch)
makeClosureTxn(Commit, SI, 0 /* bufferedWritesProb */, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch)
makeClosureTxn(Commit, RC, 0 /* bufferedWritesProb */, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch)
}

func makeClosureTxn(
txnType ClosureTxnType,
iso isolation.Level,
bufferedWritesProb float64,
txnClientOps *ClientOperationConfig,
txnBatchOps *BatchOperationConfig,
commitInBatch *ClientOperationConfig,
Expand Down Expand Up @@ -1547,6 +1552,7 @@ func makeClosureTxn(
maybeUpdateSavepoints(&spIDs, ops[i])
}
op := closureTxn(txnType, iso, ops...)
op.ClosureTxn.BufferedWrites = rng.Float64() < bufferedWritesProb
if commitInBatch != nil {
if txnType != ClosureTxnType_Commit {
panic(errors.AssertionFailedf(`CommitInBatch must commit got: %s`, txnType))
Expand Down
66 changes: 65 additions & 1 deletion pkg/kv/kvnemesis/kvnemesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -164,7 +165,11 @@ func (cfg kvnemesisTestCfg) testClusterArgs(
kvserver.OverrideDefaultLeaseType(ctx, &st.SV, cfg.leaseTypeOverride)
}

return base.TestClusterArgs{
if cfg.testSettings != nil {
cfg.testSettings(ctx, st)
}

args := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: storeKnobs,
Expand All @@ -190,6 +195,12 @@ func (cfg kvnemesisTestCfg) testClusterArgs(
Settings: st,
},
}

if cfg.testArgs != nil {
cfg.testArgs(&args)
}

return args
}

func randWithSeed(
Expand Down Expand Up @@ -262,12 +273,29 @@ type kvnemesisTestCfg struct {
// considered truly random, but is random enough for the desired purpose.
invalidLeaseAppliedIndexProb float64 // [0,1)
injectReproposalErrorProb float64 // [0,1)
// bufferedWriteProb is the probability that an SSI transaction is configured
// to use buffered writes. Once write buffering supports RC and SSI
// transactions, this will apply to all transactions.
bufferedWriteProb float64 // [0,1)

// If enabled, track Raft proposals and command application, and assert
// invariants (in particular that we don't double-apply a request or
// proposal).
assertRaftApply bool
// If set, overrides the default lease type for ranges.
leaseTypeOverride roachpb.LeaseType

// testSettings is passed the settings object used for the kvnemesis
// TestCluster.
testSettings func(context.Context, *cluster.Settings)

// testArgs is passed the TestClusterArgs used to start the kvnemesis
// TestCluster.
testArgs func(*base.TestClusterArgs)

// testGeneratorConfig modifies the default generator configuration. This is
// useful if a test configuration does not yet support particular operations.
testGeneratorConfig func(*GeneratorConfig)
}

func TestKVNemesisSingleNode(t *testing.T) {
Expand Down Expand Up @@ -300,6 +328,37 @@ func TestKVNemesisSingleNode_ReproposalChaos(t *testing.T) {
})
}

// TestKVNemesisMultiNode_BufferedWrites runs KVNemesis with write buffering
// enabled.
func TestKVNemesisMultiNode_BufferedWrites(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testKVNemesisImpl(t, kvnemesisTestCfg{
numNodes: 3,
numSteps: defaultNumSteps,
concurrency: 5,
seedOverride: 0,
// TODO(#145458): Until #145458 is fixed reduce the
// rate of lost writes by avoiding lease transfers and
// merges and also turning off error injection.
invalidLeaseAppliedIndexProb: 0.0,
injectReproposalErrorProb: 0.0,
assertRaftApply: true,
bufferedWriteProb: 0.70,
testGeneratorConfig: func(g *GeneratorConfig) {
g.Ops.ChangeLease = ChangeLeaseConfig{}
g.Ops.Merge = MergeConfig{}
},
testSettings: func(ctx context.Context, st *cluster.Settings) {
kvcoord.BufferedWritesEnabled.Override(ctx, &st.SV, true)
concurrency.UnreplicatedLockReliabilityLeaseTransfer.Override(ctx, &st.SV, true)
concurrency.UnreplicatedLockReliabilityMerge.Override(ctx, &st.SV, true)
concurrency.UnreplicatedLockReliabilitySplit.Override(ctx, &st.SV, true)
},
})
}

func TestKVNemesisMultiNode(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -363,9 +422,14 @@ func testKVNemesisImpl(t *testing.T, cfg kvnemesisTestCfg) {
config := NewDefaultConfig()
config.NumNodes = cfg.numNodes
config.NumReplicas = 3
config.BufferedWritesProb = cfg.bufferedWriteProb
if config.NumReplicas > cfg.numNodes {
config.NumReplicas = cfg.numNodes
}
if cfg.testGeneratorConfig != nil {
cfg.testGeneratorConfig(&config)
}

logger := newTBridge(t)
env := &Env{SQLDBs: sqlDBs, Tracker: tr, L: logger}
failures, err := RunNemesis(ctx, rng, env, config, cfg.concurrency, cfg.numSteps, dbs...)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
w.WriteString(newFctx.indent)
w.WriteString(newFctx.receiver)
fmt.Fprintf(w, `.SetIsoLevel(isolation.%s)`, o.IsoLevel)
w.WriteString("\n")
w.WriteString(newFctx.indent)
w.WriteString(newFctx.receiver)
fmt.Fprintf(w, `.SetBufferedWritesEnabled(%v)`, o.BufferedWrites)
formatOps(w, newFctx, o.Ops)
if o.CommitInBatch != nil {
newFctx.receiver = `b`
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ message ClosureTxnOperation {
cockroach.kv.kvserver.concurrency.isolation.Level iso_level = 7;
Result result = 5 [(gogoproto.nullable) = false];
roachpb.Transaction txn = 6;
bool buffered_writes = 8;
}

message GetOperation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.SetBufferedWritesEnabled(false)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
b := &kv.Batch{}
b.Get(tk(1)) // (<nil>, <nil>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.SetBufferedWritesEnabled(false)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
{
b := &kv.Batch{}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-si-delrange
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.SetBufferedWritesEnabled(false)
txn.DelRange(ctx, tk(2), tk(4), true /* @s1 */) // @<ts> <nil>
return nil
}) // @<ts> <nil>
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-si-err
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.SetBufferedWritesEnabled(false)
txn.DelRange(ctx, tk(2), tk(4), true /* @s1 */)
return nil
}) // context canceled
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.SetBufferedWritesEnabled(false)
txn.Put(ctx, tk(5), sv(0)) // @<ts> <nil>
txn.CreateSavepoint(ctx, 1) // <nil>
txn.Put(ctx, tk(5), sv(2)) // @<ts> <nil>
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-si-rollback
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.SetBufferedWritesEnabled(false)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
return errors.New("rollback")
}) // rollback
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.SetBufferedWritesEnabled(false)
txn.Put(ctx, tk(5), sv(0)) // @<ts> <nil>
txn.CreateSavepoint(ctx, 1) // <nil>
txn.Put(ctx, tk(5), sv(2)) // @<ts> <nil>
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-si-savepoint
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.SetBufferedWritesEnabled(false)
txn.Put(ctx, tk(5), sv(0)) // @<ts> <nil>
txn.CreateSavepoint(ctx, 1) // <nil>
txn.Put(ctx, tk(5), sv(2)) // @<ts> <nil>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Serializable)
txn.SetBufferedWritesEnabled(false)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
b := &kv.Batch{}
b.Get(tk(1)) // (<nil>, <nil>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Serializable)
txn.SetBufferedWritesEnabled(false)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
{
b := &kv.Batch{}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-delrange
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Serializable)
txn.SetBufferedWritesEnabled(false)
txn.DelRange(ctx, tk(2), tk(4), true /* @s1 */) // @<ts> <nil>
return nil
}) // @<ts> <nil>
Expand Down
Loading