Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,7 @@ Status PopulateCDCSDKIntentRecord(
if (GetAtomicFlag(&FLAGS_enable_single_record_update)) {
new_cdc_record_needed =
(prev_key != primary_key) ||
(IsPackedRow(value_type) && row_message->op() == RowMessage_Op_DELETE) ||
(value_type == dockv::ValueEntryType::kTombstone && decoded_key.num_subkeys() == 0) ||
prev_intent_phy_time != intent.intent_ht.hybrid_time().GetPhysicalValueMicros();
} else {
Expand Down
55 changes: 55 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
#include "yb/util/test_macros.h"
#include "yb/util/tostring.h"

DECLARE_uint32(wait_for_ysql_backends_catalog_version_client_master_rpc_timeout_ms);
DECLARE_uint64(master_ysql_operation_lease_ttl_ms);
DECLARE_uint64(transaction_resend_applying_interval_usec);
DECLARE_bool(TEST_disable_apply_committed_transactions);
DECLARE_bool(ysql_yb_skip_redundant_update_ops);

namespace yb {

using client::YBTableName;
Expand Down Expand Up @@ -747,6 +753,55 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiColumnUpdateFollowedByUpdate
CheckCount(expected_count, count);
}

// Test that an upsert (INSERT ON CONFLICT DO UPDATE) that touches a primary key column
// produces DELETE + INSERT in the CDC stream, not DELETE + DELETE.
TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(UpsertWithPKInSetEmitsDeleteAndInsert)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_yb_skip_redundant_update_ops) = false;
// Packed rows default to off in debug/asan/fastdebug builds (kYsqlEnablePackedRowTargetVal =
// !kIsDebug). The fix this test guards is on the IsPackedRow branch in
// PopulateCDCSDKIntentRecord,
// so force packed rows on to exercise that path on every build flavor.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_enable_packed_row) = true;
ASSERT_OK(SetUpWithParams(3, 1, false));
auto table = EXPECT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 1);
xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());

// Insert a row and consume its CDC records so the next GetChanges only returns upsert records.
ASSERT_OK(WriteRows(1 /* start */, 2 /* end */, &test_cluster_));
GetChangesResponsePB change_resp;
ASSERT_OK(WaitForGetChangesToFetchRecords(&change_resp, stream_id, tablets, 1));

// Upsert with PK column in SET clause — triggers YBCExecuteUpdateReplace (DELETE + INSERT).
auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName));
ASSERT_OK(conn.Execute(
"INSERT INTO test_table VALUES (1, 10) "
"ON CONFLICT (key) DO UPDATE SET key = EXCLUDED.key, value_1 = EXCLUDED.value_1"));

// Expect DELETE(key=1) + INSERT(key=1, value_1=10), no UPDATEs.
// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE in that order.
const uint32_t expected_count[] = {0, 1, 0, 1, 0, 0};
uint32_t count[] = {0, 0, 0, 0, 0, 0};

// Expected records: BEGIN, DELETE(key=1), INSERT(key=1, value_1=10), COMMIT.
ExpectedRecord expected_records[] = {{0, 0}, {1, 0}, {1, 10}, {0, 0}};

GetChangesResponsePB upsert_resp;
ASSERT_OK(WaitForGetChangesToFetchRecords(
&upsert_resp, stream_id, tablets, 2, /* is_explicit_checkpoint */ false,
&change_resp.cdc_sdk_checkpoint()));

uint32_t record_size = upsert_resp.cdc_sdk_proto_records_size();
ASSERT_EQ(record_size, 4); // BEGIN, DELETE, INSERT, COMMIT
for (uint32_t i = 0; i < record_size; ++i) {
const CDCSDKProtoRecordPB record = upsert_resp.cdc_sdk_proto_records(i);
CheckRecord(record, expected_records[i], count);
}
CheckCount(expected_count, count);
}

// Insert one row, delete inserted row.
// Expected records: (DDL, INSERT, DELETE).
TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(SingleShardDeleteWithAutoCommit)) {
Expand Down