Skip to content

Commit fb61d45

Browse files
committed
logical: switch back to kv writer
25.2 was going to use the sql writer by default. But #146117 requires us to roll back #143100, which means the sql writer no longer implements LWW correctly in the presence of tombstones. This also includes a change to fix the cput origin time validation. If the cput is allowed to include origin time in the batch header and the cput request iff the times match. This behavior will be required by the sql writer to fix tombstone handling. This removes the version check in `getWriterType` since the SQL writer no longer changes it behavior based on the version of the KV server. Release note: none Part of: #146117 Informs: #146217
1 parent 94873b2 commit fb61d45

File tree

5 files changed

+17
-11
lines changed

5 files changed

+17
-11
lines changed

pkg/crosscluster/logical/logical_replication_job_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -2790,7 +2790,7 @@ func TestGetWriterType(t *testing.T) {
27902790
)
27912791
wt, err := getWriterType(ctx, jobspb.LogicalReplicationDetails_Immediate, st)
27922792
require.NoError(t, err)
2793-
require.Equal(t, writerTypeSQL, wt)
2793+
require.Equal(t, writerTypeLegacyKV, wt)
27942794
})
27952795

27962796
t.Run("immediate-mode-post-25.2", func(t *testing.T) {

pkg/crosscluster/logical/logical_replication_writer_processor.go

+4-7
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ var immediateModeWriter = settings.RegisterStringSetting(
114114
settings.ApplicationLevel,
115115
"logical_replication.consumer.immediate_mode_writer",
116116
"the writer to use when in immediate mode",
117-
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(writerTypeSQL), string(writerTypeLegacyKV), string(writerTypeCRUD)),
117+
// TODO(jeffswenson): re-enable the SQL writer once tombstone handling is fixed
118+
// metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(writerTypeSQL), string(writerTypeLegacyKV), string(writerTypeCRUD)),
119+
metamorphic.ConstantWithTestChoice("logical_replication.consumer.immediate_mode_writer", string(writerTypeLegacyKV)),
118120
settings.WithValidateString(func(sv *settings.Values, val string) error {
119121
if val != string(writerTypeSQL) && val != string(writerTypeLegacyKV) && val != string(writerTypeCRUD) {
120122
return errors.Newf("immediate mode writer must be either 'sql', 'legacy-kv', or 'crud', got '%s'", val)
@@ -783,12 +785,7 @@ func getWriterType(
783785
) (writerType, error) {
784786
switch mode {
785787
case jobspb.LogicalReplicationDetails_Immediate:
786-
// Require v25.2 to use the sql writer by default to ensure that the
787-
// KV origin timestamp validation is available on all nodes.
788-
if settings.Version.IsActive(ctx, clusterversion.V25_2) {
789-
return writerType(immediateModeWriter.Get(&settings.SV)), nil
790-
}
791-
return writerTypeSQL, nil
788+
return writerType(immediateModeWriter.Get(&settings.SV)), nil
792789
case jobspb.LogicalReplicationDetails_Validated:
793790
return writerTypeSQL, nil
794791
default:

pkg/crosscluster/logical/lww_row_processor_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,11 @@ func TestLWWConflictResolution(t *testing.T) {
467467
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
468468
})
469469
t.Run("cross-cluster-local-delete", func(t *testing.T) {
470+
if !useKVProc {
471+
// TODO(jeffswenson): re-enable this test
472+
skip.WithIssue(t, 146217, "crud writer does not correctly implement lww")
473+
}
474+
470475
tableNameDst, rp, encoder := setup(t, useKVProc)
471476

472477
runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2)", tableNameDst), row1...)

pkg/crosscluster/logical/table_batch_handler_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
2020
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2121
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
22+
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
2223
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2324
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2425
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -199,6 +200,9 @@ func TestTableHandlerExhaustive(t *testing.T) {
199200
defer leaktest.AfterTest(t)()
200201
defer log.Scope(t).Close(t)
201202

203+
// TODO(jeffswenson): re-enable this test
204+
skip.WithIssue(t, 146217, "crud writer does not correctly implement lww")
205+
202206
// This test is an "exhaustive" test of the table handler. It tries to test
203207
// cross product of every possible (replication event type, local value,
204208
// previous value, lww win).

pkg/kv/kvserver/batcheval/cmd_conditional_put.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ func ConditionalPut(
6161
originTimestampForValueHeader := h.WriteOptions.GetOriginTimestamp()
6262
if args.OriginTimestamp.IsSet() {
6363
originTimestampForValueHeader = args.OriginTimestamp
64-
}
65-
if args.OriginTimestamp.IsSet() && h.WriteOptions.GetOriginTimestamp().IsSet() {
66-
return result.Result{}, errors.AssertionFailedf("OriginTimestamp cannot be passed via CPut arg and in request header")
64+
if h.WriteOptions.GetOriginTimestamp().IsSet() && args.OriginTimestamp != h.WriteOptions.GetOriginTimestamp() {
65+
return result.Result{}, errors.AssertionFailedf("OriginTimestamp passed via CPut arg must match the origin timestamp in the request header")
66+
}
6767
}
6868

6969
opts := storage.ConditionalPutWriteOptions{

0 commit comments

Comments
 (0)