diff --git a/src/yb/cdc/cdcsdk_producer.cc b/src/yb/cdc/cdcsdk_producer.cc index 3d93018c3f7a..36728b809e73 100644 --- a/src/yb/cdc/cdcsdk_producer.cc +++ b/src/yb/cdc/cdcsdk_producer.cc @@ -1009,6 +1009,7 @@ Status PopulateCDCSDKIntentRecord( if (GetAtomicFlag(&FLAGS_enable_single_record_update)) { new_cdc_record_needed = (prev_key != primary_key) || + (IsPackedRow(value_type) && row_message->op() == RowMessage_Op_DELETE) || (value_type == dockv::ValueEntryType::kTombstone && decoded_key.num_subkeys() == 0) || prev_intent_phy_time != intent.intent_ht.hybrid_time().GetPhysicalValueMicros(); } else { diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 77b4dc4b6a70..1004ea1e9087 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -33,6 +33,12 @@ #include "yb/util/test_macros.h" #include "yb/util/tostring.h" +DECLARE_uint32(wait_for_ysql_backends_catalog_version_client_master_rpc_timeout_ms); +DECLARE_uint64(master_ysql_operation_lease_ttl_ms); +DECLARE_uint64(transaction_resend_applying_interval_usec); +DECLARE_bool(TEST_disable_apply_committed_transactions); +DECLARE_bool(ysql_yb_skip_redundant_update_ops); + namespace yb { using client::YBTableName; @@ -747,6 +753,55 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiColumnUpdateFollowedByUpdate CheckCount(expected_count, count); } +// Test that an upsert (INSERT ON CONFLICT DO UPDATE) that touches a primary key column +// produces DELETE + INSERT in the CDC stream, not DELETE + DELETE. +TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(UpsertWithPKInSetEmitsDeleteAndInsert)) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_yb_skip_redundant_update_ops) = false; + // Packed rows default to off in debug/asan/fastdebug builds (kYsqlEnablePackedRowTargetVal = + // !kIsDebug). The fix this test guards is on the IsPackedRow branch in + // PopulateCDCSDKIntentRecord, + // so force packed rows on to exercise that path on every build flavor. + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_enable_packed_row) = true; + ASSERT_OK(SetUpWithParams(3, 1, false)); + auto table = EXPECT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName)); + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr)); + ASSERT_EQ(tablets.size(), 1); + xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamWithReplicationSlot()); + + // Insert a row and consume its CDC records so the next GetChanges only returns upsert records. + ASSERT_OK(WriteRows(1 /* start */, 2 /* end */, &test_cluster_)); + GetChangesResponsePB change_resp; + ASSERT_OK(WaitForGetChangesToFetchRecords(&change_resp, stream_id, tablets, 1)); + + // Upsert with PK column in SET clause — triggers YBCExecuteUpdateReplace (DELETE + INSERT). + auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); + ASSERT_OK(conn.Execute( + "INSERT INTO test_table VALUES (1, 10) " + "ON CONFLICT (key) DO UPDATE SET key = EXCLUDED.key, value_1 = EXCLUDED.value_1")); + + // Expect DELETE(key=1) + INSERT(key=1, value_1=10), no UPDATEs. + // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order. + const uint32_t expected_count[] = {0, 1, 0, 1, 0, 0}; + uint32_t count[] = {0, 0, 0, 0, 0, 0}; + + // Expected records: BEGIN, DELETE(key=1), INSERT(key=1, value_1=10), COMMIT. + ExpectedRecord expected_records[] = {{0, 0}, {1, 0}, {1, 10}, {0, 0}}; + + GetChangesResponsePB upsert_resp; + ASSERT_OK(WaitForGetChangesToFetchRecords( + &upsert_resp, stream_id, tablets, 2, /* is_explicit_checkpoint */ false, + &change_resp.cdc_sdk_checkpoint())); + + uint32_t record_size = upsert_resp.cdc_sdk_proto_records_size(); + ASSERT_EQ(record_size, 4); // BEGIN, DELETE, INSERT, COMMIT + for (uint32_t i = 0; i < record_size; ++i) { + const CDCSDKProtoRecordPB record = upsert_resp.cdc_sdk_proto_records(i); + CheckRecord(record, expected_records[i], count); + } + CheckCount(expected_count, count); +} + // Insert one row, delete inserted row. // Expected records: (DDL, INSERT, DELETE). TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(SingleShardDeleteWithAutoCommit)) {