From b080ada8fcc98e639b6225497f582e5d06ab46ff Mon Sep 17 00:00:00 2001 From: Brian Vo Date: Mon, 11 May 2026 13:12:59 +0700 Subject: [PATCH] [BACKPORT 2025.2.3][#31521] CDC: Fix upsert producing double DELETE instead of DELETE+INSERT (#30903) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary Fix a bug where an upsert (`INSERT ... ON CONFLICT DO UPDATE`) on an existing row emits two DELETE records instead of a DELETE followed by an INSERT in CDC output, when `ysql_yb_skip_redundant_update_ops` is set to false. ## Motivation When a row is upserted, Yugabyte internally writes a tombstone (delete) followed by a packed row (insert) for the same key. The CDC intent processing logic in `PopulateCDCSDKIntentRecord` decides when to start a new CDC record based on key changes, tombstones, and timestamp differences - but it did not account for the case where a packed row follows a tombstone for the same key at the same timestamp. This caused both the tombstone and the packed row to be merged into a single record, producing two DELETEs instead of a DELETE + INSERT. ## Changes ### `src/yb/cdc/cdcsdk_producer.cc` Added a condition to the `new_cdc_record_needed` check: when the current value is a packed row and the previous record was a DELETE, force a new CDC record. This ensures the packed row (INSERT) is emitted as a separate record rather than being folded into the preceding DELETE. ### `src/yb/integration-tests/cdcsdk_ysql-test.cc` Added `UpsertOnExistingRowProducesDeleteThenInsert` test that: - Inserts a row, then upserts the same key with a new value - Verifies CDC output contains exactly 1 DELETE + 1 INSERT (not 2 DELETEs) ## Test plan - [x] New test `UpsertOnExistingRowProducesDeleteThenInsert` validates correct DELETE+INSERT output - [x] Existing CDC tests pass unchanged - [x] End-to-end verified with a CDC consumer app against a local YugabyteDB build — upserts on existing rows produce DELETE followed by INSERT in CDC output --- > [!NOTE] > **Medium Risk** > Touches core CDC intent-to-record grouping logic, which can affect downstream changefeed correctness; change is small and covered by a new integration test for the reported upsert case. > > **Overview** > Fixes a CDCSDK bug where a packed-row write following a tombstone for the same key could be merged into the prior record under `FLAGS_enable_single_record_update`, causing `INSERT ... ON CONFLICT DO UPDATE` to surface as two `DELETE`s. > > Updates `PopulateCDCSDKIntentRecord` to **force a new CDC record** when a packed row is encountered after a `DELETE` for the same primary key, and adds an integration test (`UpsertOnExistingRowProducesDeleteThenInsert`) asserting the expected `DELETE` then `INSERT` sequence with packed rows enabled. > > Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit f6dd8462c634beb540f67f2bf260a90e40c93151. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot). --- [CSI]() --------- Co-authored-by: Kate Galieva Original commit: 9f03ce987edc5fc7225ae96f8163db8b27d70aa3 / #30903 --- src/yb/cdc/cdcsdk_producer.cc | 1 + src/yb/integration-tests/cdcsdk_ysql-test.cc | 55 ++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/src/yb/cdc/cdcsdk_producer.cc b/src/yb/cdc/cdcsdk_producer.cc index 3d93018c3f7a..36728b809e73 100644 --- a/src/yb/cdc/cdcsdk_producer.cc +++ b/src/yb/cdc/cdcsdk_producer.cc @@ -1009,6 +1009,7 @@ Status PopulateCDCSDKIntentRecord( if (GetAtomicFlag(&FLAGS_enable_single_record_update)) { new_cdc_record_needed = (prev_key != primary_key) || + (IsPackedRow(value_type) && row_message->op() == RowMessage_Op_DELETE) || (value_type == dockv::ValueEntryType::kTombstone && decoded_key.num_subkeys() == 0) || prev_intent_phy_time != intent.intent_ht.hybrid_time().GetPhysicalValueMicros(); } else { diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 77b4dc4b6a70..1004ea1e9087 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -33,6 +33,12 @@ #include "yb/util/test_macros.h" #include "yb/util/tostring.h" +DECLARE_uint32(wait_for_ysql_backends_catalog_version_client_master_rpc_timeout_ms); +DECLARE_uint64(master_ysql_operation_lease_ttl_ms); +DECLARE_uint64(transaction_resend_applying_interval_usec); +DECLARE_bool(TEST_disable_apply_committed_transactions); +DECLARE_bool(ysql_yb_skip_redundant_update_ops); + namespace yb { using client::YBTableName; @@ -747,6 +753,55 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiColumnUpdateFollowedByUpdate CheckCount(expected_count, count); } +// Test that an upsert (INSERT ON CONFLICT DO UPDATE) that touches a primary key column +// produces DELETE + INSERT in the CDC stream, not DELETE + DELETE. +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(UpsertWithPKInSetEmitsDeleteAndInsert)) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_yb_skip_redundant_update_ops) = false; + // Packed rows default to off in debug/asan/fastdebug builds (kYsqlEnablePackedRowTargetVal = + // !kIsDebug). The fix this test guards is on the IsPackedRow branch in + // PopulateCDCSDKIntentRecord, + // so force packed rows on to exercise that path on every build flavor. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_enable_packed_row) = true; + ASSERT_OK(SetUpWithParams(3, 1, false)); + auto table = EXPECT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr)); + ASSERT_EQ(tablets.size(), 1); + xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamWithReplicationSlot()); + + // Insert a row and consume its CDC records so the next GetChanges only returns upsert records. + ASSERT_OK(WriteRows(1 /* start */, 2 /* end */, &test_cluster_)); + GetChangesResponsePB change_resp; + ASSERT_OK(WaitForGetChangesToFetchRecords(&change_resp, stream_id, tablets, 1)); + + // Upsert with PK column in SET clause — triggers YBCExecuteUpdateReplace (DELETE + INSERT). + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + ASSERT_OK(conn.Execute( + "INSERT INTO test_table VALUES (1, 10) " + "ON CONFLICT (key) DO UPDATE SET key = EXCLUDED.key, value_1 = EXCLUDED.value_1")); + + // Expect DELETE(key=1) + INSERT(key=1, value_1=10), no UPDATEs. + // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order. + const uint32_t expected_count[] = {0, 1, 0, 1, 0, 0}; + uint32_t count[] = {0, 0, 0, 0, 0, 0}; + + // Expected records: BEGIN, DELETE(key=1), INSERT(key=1, value_1=10), COMMIT. + ExpectedRecord expected_records[] = {{0, 0}, {1, 0}, {1, 10}, {0, 0}}; + + GetChangesResponsePB upsert_resp; + ASSERT_OK(WaitForGetChangesToFetchRecords( + &upsert_resp, stream_id, tablets, 2, /* is_explicit_checkpoint */ false, + &change_resp.cdc_sdk_checkpoint())); + + uint32_t record_size = upsert_resp.cdc_sdk_proto_records_size(); + ASSERT_EQ(record_size, 4); // BEGIN, DELETE, INSERT, COMMIT + for (uint32_t i = 0; i < record_size; ++i) { + const CDCSDKProtoRecordPB record = upsert_resp.cdc_sdk_proto_records(i); + CheckRecord(record, expected_records[i], count); + } + CheckCount(expected_count, count); +} + // Insert one row, delete inserted row. // Expected records: (DDL, INSERT, DELETE). TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(SingleShardDeleteWithAutoCommit)) {