From 4b38b00814d45b43ce678a0c9c5f51fa40902491 Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 1 May 2026 17:57:09 -0700 Subject: [PATCH 01/15] Stop retrying master RPCs when server is shutting down --- src/yb/client/client_master_rpc.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/yb/client/client_master_rpc.cc b/src/yb/client/client_master_rpc.cc index ca383ef24bfd..a7fada36b11a 100644 --- a/src/yb/client/client_master_rpc.cc +++ b/src/yb/client/client_master_rpc.cc @@ -145,7 +145,9 @@ void ClientMasterRpcBase::Finished(const Status& status) { } if (new_status.IsNetworkError() || new_status.IsRemoteError()) { - if (rpc::RpcError(new_status) != rpc::ErrorStatusPB::ERROR_NO_SUCH_METHOD) { + const auto rpc_error = rpc::RpcError(new_status); + if (rpc_error != rpc::ErrorStatusPB::ERROR_NO_SUCH_METHOD && + rpc_error != rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN) { LOG(WARNING) << ToString() << ": Encountered a network error from the Master(" << client_data_->leader_master_hostport().ToString() << "): " << new_status.ToString() << ", retrying..."; From f2675a3f897785f425090089e7e89ac94f8af67e Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 1 May 2026 17:58:59 -0700 Subject: [PATCH 02/15] Fix dangling reference: capture ts_uuid by value in thread pool lambda --- src/yb/master/ysql_backends_manager.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/yb/master/ysql_backends_manager.cc b/src/yb/master/ysql_backends_manager.cc index 10270e2cae7e..217abcc67bad 100644 --- a/src/yb/master/ysql_backends_manager.cc +++ b/src/yb/master/ysql_backends_manager.cc @@ -544,7 +544,7 @@ Status BackendsCatalogVersionJob::Launch(int64_t term) { Status BackendsCatalogVersionJob::LaunchTS(TabletServerId ts_uuid, int num_lagging_backends) { auto task = std::make_shared( shared_from_this(), object_lock_info_manager_, ts_uuid, num_lagging_backends); - Status s = threadpool()->SubmitFunc([this, &ts_uuid, task]() { + Status s = threadpool()->SubmitFunc([this, ts_uuid, task]() { Status s = task->Run(); if (!s.ok()) { LOG_WITH_PREFIX(WARNING) << "got bad status " << s.ToString() From 028994f1383e535ba2ce0d4e178f289f6513d861 Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 1 May 2026 17:59:04 -0700 Subject: [PATCH 03/15] Stop index backfill when backend is terminated --- src/yb/client/client-internal.cc | 6 +- src/yb/client/client-internal.h | 3 +- src/yb/client/client.cc | 6 +- src/yb/client/client.h | 3 +- src/yb/master/backfill_index.cc | 78 +++- src/yb/master/backfill_index.h | 21 +- src/yb/master/catalog_entity_info.h | 30 ++ src/yb/master/catalog_manager.cc | 11 +- src/yb/master/master_ddl.proto | 3 + src/yb/master/ysql_ddl_verification_task.cc | 114 ++++- src/yb/master/ysql_ddl_verification_task.h | 55 ++- src/yb/server/monitored_task.h | 1 + src/yb/tserver/pg_client_session.cc | 14 +- .../yql/pgwrapper/pg_index_backfill-test.cc | 411 ++++++++++++++++++ 14 files changed, 733 insertions(+), 23 deletions(-) diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index 81bd2a00a8ff..16ad35f5d7f2 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -990,7 +990,8 @@ Status YBClient::Data::BackfillIndex(YBClient* client, const YBTableName& index_name, const TableId& index_id, CoarseTimePoint deadline, - bool wait) { + bool wait, + std::optional requester_transaction) { BackfillIndexRequestPB req; BackfillIndexResponsePB resp; @@ -1000,6 +1001,9 @@ Status YBClient::Data::BackfillIndex(YBClient* client, if (!index_id.empty()) { req.mutable_index_identifier()->set_table_id(index_id); } + if (requester_transaction.has_value()) { + requester_transaction->ToPB(req.mutable_requester_transaction()); + } RETURN_NOT_OK((SyncLeaderMasterRpc( deadline, req, &resp, "BackfillIndex", &master::MasterDdlProxy::BackfillIndexAsync))); diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index c8746b90eee0..015246df5814 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -209,7 +209,8 @@ class YBClient::Data { const YBTableName& table_name, const TableId& table_id, CoarseTimePoint deadline, - bool wait = true); + bool wait = true, + std::optional requester_transaction = std::nullopt); Status IsBackfillIndexInProgress(YBClient* client, const TableId& table_id, const TableId& index_id, diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index ba56791b441d..2a748770d73a 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -622,11 +622,13 @@ Status YBClient::TruncateTables(const TableIds& table_ids, bool wait) { return data_->TruncateTables(this, table_ids, deadline, wait); } -Status YBClient::BackfillIndex(const TableId& table_id, bool wait, CoarseTimePoint deadline) { +Status YBClient::BackfillIndex(const TableId& table_id, bool wait, CoarseTimePoint deadline, + std::optional requester_transaction) { if (deadline == CoarseTimePoint()) { deadline = CoarseMonoClock::Now() + FLAGS_backfill_index_client_rpc_timeout_ms * 1ms; } - return data_->BackfillIndex(this, YBTableName(), table_id, deadline, wait); + return data_->BackfillIndex( + this, YBTableName(), table_id, deadline, wait, std::move(requester_transaction)); } Status YBClient::GetIndexBackfillProgress( diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 1bbdbf5d318d..e37961ea8b90 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -349,7 +349,8 @@ class YBClient { // Backfill the specified index table. This is only supported for YSQL at the moment. Status BackfillIndex( - const TableId& table_id, bool wait = true, CoarseTimePoint deadline = CoarseTimePoint()); + const TableId& table_id, bool wait = true, CoarseTimePoint deadline = CoarseTimePoint(), + std::optional requester_transaction = std::nullopt); Status GetIndexBackfillProgress( const TableIds& index_ids, diff --git a/src/yb/master/backfill_index.cc b/src/yb/master/backfill_index.cc index d9c343afebb9..1a1bd87c51f5 100644 --- a/src/yb/master/backfill_index.cc +++ b/src/yb/master/backfill_index.cc @@ -119,6 +119,10 @@ DEFINE_test_flag(bool, skip_index_backfill, false, DEFINE_test_flag(bool, block_do_backfill, false, "Block DoBackfill from proceeding."); +DEFINE_test_flag(bool, skip_ddl_requester_liveness_check, false, + "Skip starting the requester liveness task. Used in tests to simulate the pre-fix behavior " + "where master continues sending BackfillIndex RPCs after the backend is killed."); + DEFINE_test_flag(bool, simulate_empty_indexes_during_backfill, false, "Simulates BackfillTable::indexes_to_build() to return an empty set."); @@ -324,7 +328,8 @@ Status MultiStageAlterTable::StartBackfillingData( CatalogManager* catalog_manager, const scoped_refptr& indexed_table, const std::vector& idx_infos, - std::optional current_version, const LeaderEpoch& epoch) { + std::optional current_version, const LeaderEpoch& epoch, + std::optional requester_transaction) { // We leave the table state as ALTERING so that a master failover can resume the backfill. RETURN_NOT_OK(ClearFullyAppliedAndUpdateState( catalog_manager, indexed_table, current_version, /* change_state to RUNNING */ false, epoch)); @@ -337,6 +342,13 @@ Status MultiStageAlterTable::StartBackfillingData( VLOG(0) << __func__ << " starting backfill on " << indexed_table->ToString() << " for " << yb::ToString(idx_infos); + // Retrieve the requester transaction if it was stored during the permission-update phase. + // Pass current_version so Take rejects stale transactions from earlier backfill attempts. + if (!requester_transaction && current_version) { + requester_transaction = + indexed_table->TakePendingBackfillRequesterTransaction(*current_version); + } + if (FLAGS_TEST_skip_index_backfill) { TRACE("Skipping backfill of data on tservers"); LOG(INFO) << "Skipping backfill of data on tservers"; @@ -345,7 +357,7 @@ Status MultiStageAlterTable::StartBackfillingData( auto backfill_table = std::make_shared( catalog_manager->master_, catalog_manager->AsyncTaskPool(), indexed_table, idx_infos, - *ns_info, epoch); + *ns_info, epoch, std::move(requester_transaction)); Status s = backfill_table->Launch(); if (!s.ok()) { indexed_table->ClearIsBackfilling(); @@ -388,7 +400,7 @@ IndexPermissions NextPermission(IndexPermissions perm) { Status MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( CatalogManager* catalog_manager, const scoped_refptr& indexed_table, uint32_t current_version, const LeaderEpoch& epoch, bool respect_backfill_deferrals, - bool update_ysql_to_backfill) { + bool update_ysql_to_backfill, std::optional requester_transaction) { DVLOG_WITH_FUNC(3) << Format("$0, version: $1, respect_deferrals: $2, update_ysql_to_backfill: $3", *indexed_table, current_version, respect_backfill_deferrals, @@ -502,6 +514,14 @@ Status MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( if (permissions_updated.ok() && *permissions_updated) { VLOG(1) << "Sending alter table request with updated permissions"; + // Store the requester transaction so StartBackfillingData can retrieve it when the + // permission change reaches DO_BACKFILL and the second call launches backfill. + // Store current_version+1 (the new version after this permission update) so Take can + // verify the transaction belongs to this exact backfill attempt and not a stale one. + if (requester_transaction) { + indexed_table->SetPendingBackfillRequesterTransaction( + std::move(requester_transaction), current_version + 1); + } RETURN_NOT_OK(catalog_manager->SendAlterTableRequest(indexed_table, epoch)); return Status::OK(); } @@ -530,7 +550,8 @@ Status MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( } WARN_NOT_OK( StartBackfillingData( - catalog_manager, indexed_table.get(), indexes_to_backfill, current_version, epoch), + catalog_manager, indexed_table.get(), indexes_to_backfill, current_version, epoch, + std::move(requester_transaction)), yb::Format("Could not launch backfill for $0", indexed_table->ToString())); } @@ -627,7 +648,7 @@ std::string RetrieveIndexNames(CatalogManager* mgr, BackfillTable::BackfillTable( Master* master, ThreadPool* callback_pool, const scoped_refptr& indexed_table, std::vector indexes, const scoped_refptr& ns_info, - LeaderEpoch epoch) + LeaderEpoch epoch, std::optional requester_transaction) : master_(master), callback_pool_(callback_pool), indexed_table_(indexed_table), @@ -637,7 +658,8 @@ BackfillTable::BackfillTable( RetrieveIndexNames(master->catalog_manager_impl(), requested_index_ids_)), ns_info_(ns_info), epoch_(std::move(epoch)), - wait_state_(ash::WaitStateInfo::CreateIfAshIsEnabled()) { + wait_state_(ash::WaitStateInfo::CreateIfAshIsEnabled()), + requester_transaction_(std::move(requester_transaction)) { if (wait_state_) { if (const auto& current_state = ash::WaitStateInfo::CurrentWaitState()) { wait_state_->UpdateMetadata(current_state->metadata()); @@ -951,6 +973,7 @@ Status BackfillTable::DoLaunchBackfill() { } Status BackfillTable::DoBackfill() { + StartRequesterLivenessMonitor(); while (FLAGS_TEST_block_do_backfill) { constexpr auto kSpinWait = 100ms; LOG(INFO) << Format("Blocking $0 for $1", __func__, kSpinWait); @@ -984,6 +1007,7 @@ Status BackfillTable::Done(const Status& s, const std::unordered_set& f if (!done() && --tablets_pending_ == 0) { LOG_WITH_PREFIX(INFO) << "Completed backfilling the index table."; done_.store(true, std::memory_order_release); + StopLivenessMonitor(); RETURN_NOT_OK_PREPEND( MarkAllIndexesAsSuccess(), "Failed to mark indexes as successfully backfilled."); RETURN_NOT_OK_PREPEND(UpdateIndexPermissionsForIndexes(), "Failed to complete backfill."); @@ -997,6 +1021,7 @@ Status BackfillTable::MarkIndexesAsFailed( const std::unordered_set& failed_indexes, const string& message) { if (indexes_to_build() == failed_indexes) { done_.store(true, std::memory_order_release); + StopLivenessMonitor(); backfill_job_->SetState(MonitoredTaskState::kFailed); } return MarkIndexesAsDesired(failed_indexes, BackfillJobPB::FAILED, message); @@ -1077,6 +1102,46 @@ Status BackfillTable::MarkIndexesAsDesired( return Status::OK(); } +void BackfillTable::StartRequesterLivenessMonitor() { + if (!requester_transaction_) return; + if (PREDICT_FALSE(FLAGS_TEST_skip_ddl_requester_liveness_check)) { + LOG_WITH_PREFIX(INFO) << "Skipping requester liveness monitor (TEST flag set)"; + return; + } + VLOG_WITH_PREFIX(1) << "Starting requester liveness monitor for transaction " + << requester_transaction_->transaction_id; + + auto self = shared_from_this(); + BackgroundDdlCallbacks callbacks{ + .done_ = [self] { return self->done(); }, + .abort_ = [self] { return self->Abort(); }, + }; + auto task = DdlRequesterLivenessTask::CreateAndStartTask( + *master_->catalog_manager_impl(), + indexed_table_, + *requester_transaction_, + std::move(callbacks), + master_->client_future(), + *master_->messenger(), + epoch_); + + std::lock_guard l(mutex_); + DCHECK(!liveness_task_) << "Liveness task already exists"; + liveness_task_ = std::move(task); +} + +void BackfillTable::StopLivenessMonitor() { + std::shared_ptr task; + { + std::lock_guard l(mutex_); + task = std::move(liveness_task_); + } + if (task) { + task->AbortAndReturnPrevState(STATUS(Aborted, "BackfillTable is done")); + } +} + + Status BackfillTable::Abort() { LOG(WARNING) << "Backfill failed/aborted."; RETURN_NOT_OK(MarkAllIndexesAsFailed()); @@ -1086,6 +1151,7 @@ Status BackfillTable::Abort() { Status BackfillTable::CheckIfDone() { if (indexes_to_build().empty()) { done_.store(true, std::memory_order_release); + StopLivenessMonitor(); RETURN_NOT_OK_PREPEND( UpdateIndexPermissionsForIndexes(), "Could not update index permissions after backfill"); diff --git a/src/yb/master/backfill_index.h b/src/yb/master/backfill_index.h index 602e4b2948c9..de2c58d5944f 100644 --- a/src/yb/master/backfill_index.h +++ b/src/yb/master/backfill_index.h @@ -27,6 +27,7 @@ #include "yb/ash/wait_state.h" #include "yb/common/entity_ids.h" +#include "yb/common/transaction.h" #include "yb/dockv/partition.h" #include "yb/gutil/integral_types.h" @@ -34,6 +35,7 @@ #include "yb/master/async_rpc_tasks_base.h" #include "yb/master/catalog_entity_info.h" +#include "yb/master/ysql_ddl_verification_task.h" #include "yb/qlexpr/index.h" @@ -68,7 +70,8 @@ class MultiStageAlterTable { static Status LaunchNextTableInfoVersionIfNecessary( CatalogManager* mgr, const scoped_refptr& Info, uint32_t current_version, const LeaderEpoch& epoch, bool respect_backfill_deferrals = true, - bool update_ysql_to_backfill = false); + bool update_ysql_to_backfill = false, + std::optional requester_transaction = std::nullopt); // Clears the fully_applied_* state for the given table and optionally sets it to RUNNING. // If the version has changed and does not match the expected version no @@ -94,10 +97,13 @@ class MultiStageAlterTable { private: // Start Index Backfill process/step for the specified table/index. + // If requester_transaction is provided it will be used to monitor the liveness of the + // PG backend that initiated the backfill. static Status StartBackfillingData( CatalogManager* catalog_manager, const scoped_refptr& indexed_table, const std::vector& idx_infos, std::optional expected_version, - const LeaderEpoch& epoch); + const LeaderEpoch& epoch, + std::optional requester_transaction = std::nullopt); }; class BackfillTablet; @@ -112,7 +118,8 @@ class BackfillTable : public std::enable_shared_from_this { const scoped_refptr &indexed_table, std::vector indexes, const scoped_refptr &ns_info, - LeaderEpoch epoch); + LeaderEpoch epoch, + std::optional requester_transaction = std::nullopt); Status Launch(); @@ -169,6 +176,8 @@ class BackfillTable : public std::enable_shared_from_this { static void UnsetIndexTableRetainsDeleteMarkers(PersistentTableInfo* index_table); + Status Abort(); + private: void LaunchBackfillOrAbort(); Status WaitForTabletSplitting(); @@ -188,7 +197,8 @@ class BackfillTable : public std::enable_shared_from_this { Status AlterTableStateToAbort(); Status AlterTableStateToSuccess(); - Status Abort(); + void StartRequesterLivenessMonitor(); + void StopLivenessMonitor(); Status CheckIfDone(); Status UpdateIndexPermissionsForIndexes(); Status ClearCheckpointStateInTablets(); @@ -230,8 +240,11 @@ class BackfillTable : public std::enable_shared_from_this { const scoped_refptr ns_info_; LeaderEpoch epoch_; ash::WaitStateInfoPtr wait_state_; + std::optional requester_transaction_; + std::shared_ptr liveness_task_ GUARDED_BY(mutex_); }; + class BackfillTableJob : public server::MonitoredTask { public: explicit BackfillTableJob(std::shared_ptr backfill_table) diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index 00c5e72d252d..4ba40717a1e6 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -874,6 +874,30 @@ class TableInfo : public RefCountedThreadSafe, is_backfilling_ = false; } + // Store/retrieve the DDL transaction from the PG backend that initiated the backfill. + // Stored when CatalogManager::BackfillIndex moves the index from WRITE_AND_DELETE to + // DO_BACKFILL; retrieved when StartBackfillingData actually creates the BackfillTable. + // schema_version must be the table version produced by the permission update (current + 1). + void SetPendingBackfillRequesterTransaction( + std::optional txn, uint32_t schema_version) { + std::lock_guard l(lock_); + pending_backfill_requester_transaction_ = std::move(txn); + pending_backfill_requester_transaction_version_ = schema_version; + } + + // Returns the stored transaction and clears it, but only if schema_version matches the value + // passed to SetPendingBackfillRequesterTransaction. Returns nullopt otherwise so that a stale + // transaction from an earlier backfill attempt is never used for a later one. + std::optional TakePendingBackfillRequesterTransaction( + uint32_t schema_version) { + std::lock_guard l(lock_); + if (!pending_backfill_requester_transaction_ || + pending_backfill_requester_transaction_version_ != schema_version) { + return std::nullopt; + } + return std::exchange(pending_backfill_requester_transaction_, std::nullopt); + } + // Returns true if an "Alter" operation is in-progress. Result IsAlterInProgress(uint32_t version) const; @@ -985,6 +1009,12 @@ class TableInfo : public RefCountedThreadSafe, // In memory state set during backfill to prevent multiple backfill jobs. bool is_backfilling_ = false; + // DDL transaction from the PG backend that initiated the backfill, and the table schema version + // at which it was stored. Set when BackfillIndex updates permissions (WRITE_AND_DELETE → + // DO_BACKFILL) and cleared when StartBackfillingData creates the BackfillTable. + std::optional pending_backfill_requester_transaction_ GUARDED_BY(lock_); + uint32_t pending_backfill_requester_transaction_version_ GUARDED_BY(lock_) = 0; + std::atomic is_system_{false}; const bool colocated_; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 6f9c65fa042c..3a80244996fc 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -6586,9 +6586,18 @@ Status CatalogManager::BackfillIndex( IndexPermissions_Name(index_info_pb.index_permissions()))); } + std::optional requester_txn; + if (req->has_requester_transaction()) { + auto result = TransactionMetadata::FromPB(req->requester_transaction()); + if (result.ok()) { + requester_txn = std::move(*result); + } else { + LOG(WARNING) << "BackfillIndex: failed to decode requester transaction: " << result.status(); + } + } return MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( this, indexed_table, current_version, epoch, /* respect deferrals for backfill */ false, - /* update ysql to backfill */ true); + /* update ysql to backfill */ true, std::move(requester_txn)); } Status CatalogManager::GetBackfillJobs( diff --git a/src/yb/master/master_ddl.proto b/src/yb/master/master_ddl.proto index 2dbc624b6525..d1ac1a94fd83 100644 --- a/src/yb/master/master_ddl.proto +++ b/src/yb/master/master_ddl.proto @@ -324,6 +324,9 @@ message IsTruncateTableDoneResponsePB { message BackfillIndexRequestPB { // The index to backfill. Used for YSQL optional TableIdentifierPB index_identifier = 1; + // The DDL transaction held by the CREATE INDEX backend. If this transaction is detected as + // ABORTED (e.g. pg_terminate_backend killed the session), the master will stop the backfill. + optional TransactionMetadataPB requester_transaction = 2; } message BackfillIndexResponsePB { diff --git a/src/yb/master/ysql_ddl_verification_task.cc b/src/yb/master/ysql_ddl_verification_task.cc index 201b1eb0e9be..b932aabc156b 100644 --- a/src/yb/master/ysql_ddl_verification_task.cc +++ b/src/yb/master/ysql_ddl_verification_task.cc @@ -55,6 +55,10 @@ DEFINE_test_flag(bool, yb_test_table_rewrite_keep_old_table, false, "concurrent DMLs. If the table is dropped too soon, we will just get a " "table does not exist error instead."); +DEFINE_RUNTIME_int32(ddl_requester_liveness_check_interval_secs, 10, + "Interval in seconds between liveness checks for a background DDL operation's initiating " + "transaction. If the transaction is detected as aborted, the background operation is stopped."); + using std::string; using std::vector; @@ -659,7 +663,8 @@ void PollTransactionStatusBase::TransactionReceived( // If this transaction isn't pending, then the transaction is in a terminal state. // Note: We ignore the resp.status() now, because it could be ABORT'd but actually a SUCCESS. // Determine whether the transaction was a success by comparing with the PG schema. - FinishPollTransaction(); + bool aborted = (resp.status_size() > 0 && resp.status(0) == TransactionStatus::ABORTED); + FinishPollTransaction(aborted); } NamespaceVerificationTask::NamespaceVerificationTask( @@ -714,7 +719,7 @@ void NamespaceVerificationTask::TransactionPending() { }, "VerifyTransaction"); } -void NamespaceVerificationTask::FinishPollTransaction() { +void NamespaceVerificationTask::FinishPollTransaction(bool /*aborted*/) { ScheduleNextStep( std::bind(&NamespaceVerificationTask::CheckNsExists, this), "CheckNsExists"); @@ -825,7 +830,7 @@ Status TableSchemaVerificationTask::ValidateRunnable() { return Status::OK(); } -void TableSchemaVerificationTask::FinishPollTransaction() { +void TableSchemaVerificationTask::FinishPollTransaction(bool /*aborted*/) { ScheduleNextStep([this] { return ddl_atomicity_enabled_ ? CompareSchema() : CheckTableExists(); }, "Compare Schema"); @@ -881,5 +886,108 @@ void TableSchemaVerificationTask::PerformAbort() { Shutdown(); } +// --- DdlRequesterLivenessTask --- + +DdlRequesterLivenessTask::DdlRequesterLivenessTask( + CatalogManager& catalog_manager, + scoped_refptr table, + const TransactionMetadata& transaction, + BackgroundDdlCallbacks callbacks, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch) + : MultiStepTableTaskBase( + catalog_manager, *catalog_manager.AsyncTaskPool(), messenger, std::move(table), epoch), + PollTransactionStatusBase(transaction, std::move(client_future)), + callbacks_(std::move(callbacks)) {} + +std::shared_ptr DdlRequesterLivenessTask::CreateAndStartTask( + CatalogManager& catalog_manager, + scoped_refptr table, + const TransactionMetadata& transaction, + BackgroundDdlCallbacks callbacks, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch) { + auto task = std::make_shared( + catalog_manager, std::move(table), transaction, std::move(callbacks), + std::move(client_future), messenger, epoch); + task->Start(); + return task; +} + +std::string DdlRequesterLivenessTask::description() const { + return Format("DdlRequesterLivenessTask for $0", table_info_->id()); +} + +Status DdlRequesterLivenessTask::FirstStep() { + ScheduleNextStepWithDelay( + [this] { return VerifyTransaction(); }, "VerifyTransaction", + MonoDelta::FromSeconds(FLAGS_ddl_requester_liveness_check_interval_secs)); + return Status::OK(); +} + +void DdlRequesterLivenessTask::TransactionPending() { + if (callbacks_.done_()) { + // We're inside the GetTransactionStatus RPC callback. Calling Complete() directly here + // would deadlock: Complete() → TaskCompleted() → Shutdown() → sync_.Wait() blocks + // because the callback's user_cb (which signals sync_) hasn't fired yet. + // Schedule asynchronously so user_cb fires first. + ScheduleNextStep( + [this] { + Complete(); + return Status::OK(); + }, + "CompleteLivenessTask"); + return; + } + ScheduleNextStepWithDelay( + [this] { return VerifyTransaction(); }, "VerifyTransaction", + MonoDelta::FromSeconds(FLAGS_ddl_requester_liveness_check_interval_secs)); +} + +void DdlRequesterLivenessTask::FinishPollTransaction(bool aborted) { + // We're inside the GetTransactionStatus RPC callback. Calling Complete() directly here + // would deadlock: Complete() → TaskCompleted() → Shutdown() → sync_.Wait() blocks + // because the callback's user_cb (which signals sync_) hasn't fired yet. + // Schedule asynchronously so user_cb fires first. + // + // Complete() must still be called before callbacks_.abort_() to avoid a different deadlock: + // Abort() may call BackfillTable::StopLivenessMonitor() → AbortAndReturnPrevState(), which + // acquires step_execution_mutex_ to wait for the current step to finish. Calling Complete() + // first moves the task to terminal state so AbortAndReturnPrevState() finds + // TrySetState(kAborted) failing and returns immediately without touching the mutex. + ScheduleNextStep( + [this, aborted] { + Complete(); + if (aborted && !callbacks_.done_()) { + LOG(INFO) << "DdlRequesterLivenessTask: requester transaction aborted for " + << table_info_->id() << ", aborting background DDL operation"; + auto s = callbacks_.abort_(); + if (!s.ok()) { + LOG(ERROR) << "Failed to abort background DDL operation after requester death: " << s; + } + } + return Status::OK(); + }, + "CompleteLivenessTask"); +} + +void DdlRequesterLivenessTask::TaskCompleted(const Status& status) { + Shutdown(); +} + +Status DdlRequesterLivenessTask::ValidateRunnable() { + if (callbacks_.done_()) { + return STATUS(Aborted, "Background DDL operation is done, stopping liveness monitor"); + } + return Status::OK(); +} + +void DdlRequesterLivenessTask::PerformAbort() { + MultiStepTableTaskBase::PerformAbort(); // Cancels the reactor-scheduled poll timer. + Shutdown(); // Cancels any in-flight GetTransactionStatus RPC. +} + } // namespace master } // namespace yb diff --git a/src/yb/master/ysql_ddl_verification_task.h b/src/yb/master/ysql_ddl_verification_task.h index e5d1c9044a43..843fc1b2cd85 100644 --- a/src/yb/master/ysql_ddl_verification_task.h +++ b/src/yb/master/ysql_ddl_verification_task.h @@ -65,6 +65,15 @@ namespace master { * or use DDL entities created by uncommitted transactions. */ +// Callbacks for DdlRequesterLivenessTask to interact with the background DDL operation +// (e.g. BackfillTable) that it is monitoring. +struct BackgroundDdlCallbacks { + // Returns true when the operation has completed and no further polling is needed. + std::function done_; + // Cancels the operation. Called when the initiating PG backend is detected as killed. + std::function abort_; +}; + // Helper class that encapsulates the logic to poll the transaction status. class PollTransactionStatusBase { public: @@ -76,7 +85,7 @@ class PollTransactionStatusBase { protected: Status VerifyTransaction(); virtual void TransactionPending() = 0; - virtual void FinishPollTransaction() = 0; + virtual void FinishPollTransaction(bool aborted) = 0; void Shutdown(); TransactionMetadata transaction_; @@ -133,7 +142,7 @@ class NamespaceVerificationTask : public MultiStepNamespaceTaskBase, Status FirstStep() override; void TransactionPending() override; Status ValidateRunnable() override; - void FinishPollTransaction() override; + void FinishPollTransaction(bool aborted) override; Status CheckNsExists(); void TaskCompleted(const Status& status) override; void PerformAbort() override; @@ -186,7 +195,7 @@ class TableSchemaVerificationTask : public MultiStepTableTaskBase, Status CheckTableExists(); Status CompareSchema(); Status FinishTask(Result> is_committed); - void FinishPollTransaction() override; + void FinishPollTransaction(bool aborted) override; void TaskCompleted(const Status& status) override; void PerformAbort() override; @@ -195,5 +204,45 @@ class TableSchemaVerificationTask : public MultiStepTableTaskBase, std::optional is_committed_{std::nullopt}; }; +// Periodically polls a DDL backend's transaction and calls BackgroundDdlCallbacks::abort +// when the transaction is detected as ABORTED (e.g. pg_terminate_backend killed the session). +// Can be attached to any long-running background DDL operation. +class DdlRequesterLivenessTask : public MultiStepTableTaskBase, + public PollTransactionStatusBase { + public: + static std::shared_ptr CreateAndStartTask( + CatalogManager& catalog_manager, + scoped_refptr table, + const TransactionMetadata& transaction, + BackgroundDdlCallbacks callbacks, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch); + DdlRequesterLivenessTask( + CatalogManager& catalog_manager, + scoped_refptr table, + const TransactionMetadata& transaction, + BackgroundDdlCallbacks callbacks, + std::shared_future client_future, + rpc::Messenger& messenger, + const LeaderEpoch& epoch); + + server::MonitoredTaskType type() const override { + return server::MonitoredTaskType::kDdlRequesterLiveness; + } + std::string type_name() const override { return "DdlRequesterLivenessTask"; } + std::string description() const override; + + private: + Status FirstStep() override; + void TransactionPending() override; + void FinishPollTransaction(bool aborted) override; + void TaskCompleted(const Status& status) override; + Status ValidateRunnable() override; + void PerformAbort() override; + + BackgroundDdlCallbacks callbacks_; +}; + } // namespace master } // namespace yb diff --git a/src/yb/server/monitored_task.h b/src/yb/server/monitored_task.h index d47863c2e2d8..f1b6b3980b6c 100644 --- a/src/yb/server/monitored_task.h +++ b/src/yb/server/monitored_task.h @@ -63,6 +63,7 @@ YB_DEFINE_ENUM(MonitoredTaskType, (kBackendsCatalogVersion) (kBackendsCatalogVersionTs) (kBackfillDone) + (kDdlRequesterLiveness) (kBackfillTable) (kBackfillTabletChunk) (kChangeConfig) diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 4ca95d2de506..2f4e9c543ae6 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -2054,9 +2054,21 @@ class PgClientSession::Impl { Status BackfillIndex( const PgBackfillIndexRequestPB& req, PgBackfillIndexResponsePB* resp, rpc::RpcContext* context) { + std::optional txn_metadata; + // The PG backend holds a DDL transaction open for the entire backfill duration + // (StartTransactionCommand at indexcmds.c:2334). Pass it to the master so it can detect when + // this backend is killed (→ txn aborted) and stop launching new backfill chunks. + auto meta = GetDdlTransactionMetadata( + true /* use_transaction */, false /* use_regular_transaction_block */, + context->GetClientDeadline(), IsTxnUsingTableLocks(false)); + if (meta.ok() && *meta) { + txn_metadata = **meta; + } else { + VLOG(1) << "BackfillIndex: no DDL transaction metadata available for cancellation detection"; + } return client_.BackfillIndex( PgObjectId::GetYbTableIdFromPB(req.table_id()), /* wait= */ true, - context->GetClientDeadline()); + context->GetClientDeadline(), std::move(txn_metadata)); } Status CreateTablegroup( diff --git a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc index 5352d2df79d0..545159dd0859 100644 --- a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc +++ b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc @@ -3417,4 +3417,415 @@ TEST_P(PgIndexBackfillBlockDoBackfill, ConcurrentInplaceUpdateCoveringIndex) { ASSERT_OK(CheckIndexConsistency("idx_tbl")); } +// Test class for verifying that killing the PG backend running CREATE INDEX CONCURRENTLY +// propagates the cancellation to the master-side backfill. +// +// Reproduces the bug where pg_terminate_backend kills only the PG connection but leaves +// the distributed backfill on the master running indefinitely. +class PgIndexBackfillCancellationTest : public PgIndexBackfillTest { + public: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + PgIndexBackfillTest::UpdateMiniClusterOptions(options); + // This test uses SPLIT INTO 1 TABLETS and only needs one tserver. Use RF=1 so the + // system.transactions table (and all other system tables) can be created with a single + // tserver. Also cap ysql_num_tablets=1 so user table initialization stays minimal. + options->replication_factor = 1; + options->extra_tserver_flags.push_back("--ysql_num_tablets=1"); + // Slow down each BackfillIndex RPC so we have time to kill the backend mid-backfill + // and then observe whether additional chunks continue to be issued. + options->extra_tserver_flags.push_back( + Format("--TEST_slowdown_backfill_by_ms=$0", kSlowdown.ToMilliseconds())); + // Limit rows scanned per BackfillIndex RPC so the master must issue multiple sequential + // RPCs per tablet (resume chunks). Without this, all 2000 rows fit in a single RPC and + // the kill always arrives after backfill is already done, leaving nothing to observe. + // + // Two flags together produce 2000/500 = 4 sequential BackfillIndex RPCs (500 rows each): + // - yb_fetch_row_limit=500: controls how many rows PostgreSQL fetches from DocDB per + // BACKFILL INDEX SQL call. The default is 1024, which would produce only 2 RPCs. + // - TEST_backfill_paging_size=500: exits the do-while loop in BackfillIndexesForYsql + // after 500 rows have been accumulated, forcing a resume-key response to the master. + options->extra_tserver_flags.push_back("--ysql_yb_fetch_row_limit=500"); + options->extra_tserver_flags.push_back("--TEST_backfill_paging_size=500"); + // Poll frequently so we detect transaction abort within the test observation window. + options->extra_master_flags.push_back( + "--ddl_requester_liveness_check_interval_secs=2"); + // Reduce transaction RPC timeout so that GetTransactionStatus calls during cluster teardown + // (from the DDL verification cleanup task for the failed CREATE INDEX) time out quickly + // instead of blocking the master's worker thread for the default 5-15 seconds. + options->extra_master_flags.push_back("--transaction_rpc_timeout_ms=1000"); + } + + int GetNumTabletServers() const override { return 1; } + + protected: + // Each BackfillIndex RPC to a tserver takes this long due to the slowdown flag. + const MonoDelta kSlowdown = 2s; +}; + +INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationTest, ::testing::Bool()); + +// Regression test: after pg_terminate_backend kills the CREATE INDEX CONCURRENTLY session, +// the master must stop issuing new BackfillIndex RPCs to tservers. +// +// Without the fix: the master continues launching backfill chunks, visible as new +// BackfillIndex RPC completions on tservers after the backend is gone. +// With the fix: the master detects the DDL transaction is ABORTED and stops. +// +// Manual reproduction (as suggested by the YugabyteDB developer): +// SET yb_slowdown_backfill_by_ms = 10000; +// CREATE INDEX CONCURRENTLY idx ON my_table (col); -- session 1 +// SELECT pg_terminate_backend(); -- session 2 +// -- observe in tserver logs (--ysql_pg_conf_csv=log_statement=ALL): +// -- backfill statements keep appearing even after the kill +TEST_P(PgIndexBackfillCancellationTest, + YB_DISABLE_TEST_IN_SANITIZERS(BackfillStopsAfterBackendKill)) { + // Single-tablet table with 2000 rows. TEST_backfill_paging_size=500 (set in the fixture) + // causes the tserver to return a resume key after every 500 rows, so the master issues + // 4 sequential BackfillIndex RPCs (each taking kSlowdown=2s). This gives a clear + // observation window: after killing mid-backfill we can tell whether the master stops or + // continues issuing new chunks. + ASSERT_OK(conn_->ExecuteFormat( + "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); + ASSERT_OK(conn_->ExecuteFormat( + "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); + // Close the setup connection so WaitForYsqlBackendsCatalogVersion doesn't stall waiting + // for this idle backend to refresh its catalog version after CREATE INDEX increments it. + conn_.reset(); + + // Capture the CREATE INDEX backend PID before the DDL starts, then run the DDL. + // The DDL will fail with a network error when pg_terminate_backend kills the connection. + std::atomic create_index_pid{0}; + CountDownLatch pid_ready(1); + + thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { + auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); + pid_ready.CountDown(); + // Expected to fail once pg_terminate_backend is called. + WARN_NOT_OK( + conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName), + "CREATE INDEX CONCURRENTLY failed (expected after pg_terminate_backend)"); + }); + + // Wait for the first BackfillIndex RPC to complete. This guarantees at least one more + // chunk (resume from the 500-row page boundary) would be issued without the fix, giving + // the observation window a clear before/after signal. + pid_ready.Wait(); + ASSERT_OK(WaitFor( + [this]() -> Result { + return VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())) >= 1; + }, + 30s * kTimeMultiplier, "first BackfillIndex RPC to complete")); + + // Kill the CREATE INDEX backend from a separate connection. + auto killer_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + ASSERT_TRUE(ASSERT_RESULT( + killer_conn.FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid)))); + LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid; + + // Allow the currently in-flight BackfillIndex RPC (chunk 2, if any) to complete + // plus the liveness task to fire and suppress any further chunks. + SleepFor(kSlowdown + 1s); + auto rpcs_after_kill = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + LOG(INFO) << "BackfillIndex RPC count after kill + grace: " << rpcs_after_kill; + + // Wait long enough for chunks 3 and 4 to finish if the bug were present. + SleepFor(kSlowdown * 2 + 1s); + auto rpcs_after_wait = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + LOG(INFO) << "BackfillIndex RPC count after observation window: " << rpcs_after_wait; + + thread_holder_.JoinAll(); + + // With the fix: the liveness task detects the ABORTED transaction and calls Abort(), + // preventing any chunks after chunk 1 (or chunk 2 at most if it raced to start). + // rpcs_after_wait must equal rpcs_after_kill — no new chunks in the observation window. + EXPECT_EQ(rpcs_after_wait, rpcs_after_kill) + << "BackfillIndex RPCs continued after pg_terminate_backend: " + << (rpcs_after_wait - rpcs_after_kill) << " unexpected additional RPC(s) issued"; +} + +// Negative-regression test: demonstrates the pre-fix bug. +// +// With --TEST_skip_ddl_requester_liveness_check=true the master never starts the +// liveness task, so it never detects the ABORTED transaction and keeps sending +// BackfillIndex RPCs. The assertion here is the mirror image of BackfillStopsAfterBackendKill: +// we EXPECT more RPCs after the observation window to prove the bug is present when the fix is +// disabled. +class PgIndexBackfillCancellationWithoutFixTest : public PgIndexBackfillCancellationTest { + public: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + PgIndexBackfillCancellationTest::UpdateMiniClusterOptions(options); + options->extra_master_flags.push_back( + "--TEST_skip_ddl_requester_liveness_check=true"); + } +}; + +INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationWithoutFixTest, ::testing::Values(false)); + +TEST_P(PgIndexBackfillCancellationWithoutFixTest, + YB_DISABLE_TEST_IN_SANITIZERS(BackfillContinuesAfterBackendKill)) { + ASSERT_OK(conn_->ExecuteFormat( + "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); + ASSERT_OK(conn_->ExecuteFormat( + "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); + // Close the setup connection so WaitForYsqlBackendsCatalogVersion doesn't stall waiting + // for this idle backend to refresh its catalog version after CREATE INDEX increments it. + conn_.reset(); + + std::atomic create_index_pid{0}; + CountDownLatch pid_ready(1); + thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { + auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); + pid_ready.CountDown(); + WARN_NOT_OK( + conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName), + "CREATE INDEX CONCURRENTLY failed (expected after pg_terminate_backend)"); + }); + + pid_ready.Wait(); + + // Wait until the first BackfillIndex RPC has completed. With yb_fetch_row_limit=500 and + // TEST_backfill_paging_size=500 (both set in the fixture) and 2000 rows there are 4 sequential + // RPCs. After chunk 1 finishes chunks 2-4 remain: without the liveness check the master + // issues all of them even though the backend is dead. + ASSERT_OK(WaitFor( + [this]() -> Result { + return VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())) >= 1; + }, + 30s * kTimeMultiplier, "first BackfillIndex RPC to complete")); + + auto killer_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + ASSERT_TRUE(ASSERT_RESULT( + killer_conn.FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid)))); + LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid; + + auto rpcs_at_kill = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + LOG(INFO) << "BackfillIndex RPC count at kill: " << rpcs_at_kill; + + // kSlowdown*2+1s covers at least 2 additional chunks (chunks 2 and 3 each take kSlowdown). + SleepFor(kSlowdown * 2 + 1s); + auto rpcs_after_wait = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + LOG(INFO) << "BackfillIndex RPC count after observation window: " << rpcs_after_wait; + + thread_holder_.JoinAll(); + + // Without the liveness task the master keeps issuing chunks: rpcs_after_wait must be + // strictly greater than rpcs_at_kill, proving the bug is present when the fix is off. + EXPECT_GT(rpcs_after_wait, rpcs_at_kill) + << "Expected BackfillIndex RPCs to continue after pg_terminate_backend " + "(liveness check disabled — demonstrating pre-fix behavior), but they stopped"; +} + +// Tests that kill the CREATE INDEX backend BEFORE any tserver BackfillIndex RPCs have been +// sent. This exercises the Register/Launch split: Register() stores the requester_transaction +// on the placeholder BackfillTable before DoBackfill() is called. If the transaction is not +// forwarded correctly, the DdlRequesterLivenessTask is never started and backfill runs to +// completion despite the killed backend. +// +// Timing sketch (kSlowdown = 2s, kAlterTableDelay = 4s): +// T=0 PG backend calls BackfillIndex RPC → master enters UpdateIndexPermission +// T~4s First sleep inside UpdateIndexPermission completes (kAlterTableDelay) +// T~4s Permission updated; Register() called → placeholder created with requester_transaction +// T~8s Second sleep inside UpdateIndexPermission completes → AlterTable RPCs proceed +// T~9s Test kills PG backend (kAlterTableDelay * 2 + 1s after indisready detected) +// → 0 BackfillIndex RPCs have been sent yet +// T~10s DoBackfill starts, StartRequesterLivenessMonitor() fires +// T~12s Liveness first poll (ddl_requester_liveness_check_interval_secs = 2s): detects ABORTED +// → abort callback cancels backfill +// → ≤ 2 chunks complete (chunk 1 races with first liveness poll; chunk 2 may slip through) +class PgIndexBackfillCancellationEarlyKillTest : public PgIndexBackfillCancellationTest { + public: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + PgIndexBackfillCancellationTest::UpdateMiniClusterOptions(options); + options->extra_master_flags.push_back( + Format("--TEST_slowdown_backfill_alter_table_rpcs_ms=$0", + kAlterTableDelay.ToMilliseconds())); + } + + protected: + // Each sleep inside UpdateIndexPermission lasts this long (two sleeps → 2× total delay). + const MonoDelta kAlterTableDelay = 4s; +}; + +INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationEarlyKillTest, ::testing::Bool()); + +// Positive test: with the fix, the DdlRequesterLivenessTask receives the requester_transaction +// from the placeholder BackfillTable and aborts backfill after ≤ 2 chunks. +TEST_P(PgIndexBackfillCancellationEarlyKillTest, + YB_DISABLE_TEST_IN_SANITIZERS(BackfillStopsAfterEarlyBackendKill)) { + ASSERT_OK(conn_->ExecuteFormat( + "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); + ASSERT_OK(conn_->ExecuteFormat( + "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); + // Close the setup connection; it is not needed for the kill step and must not linger. + conn_.reset(); + + std::atomic create_index_pid{0}; + CountDownLatch pid_ready(1); + + thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { + auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); + pid_ready.CountDown(); + WARN_NOT_OK( + conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName), + "CREATE INDEX CONCURRENTLY failed (expected after pg_terminate_backend)"); + }); + + pid_ready.Wait(); + + // Wait for indisready: the PG backend has reached (or passed) the UpdateIndexPermission call + // that triggers Register(). Open a fresh connection for polling and close it before the kill + // so it does not block WaitForYsqlBackendsCatalogVersion inside the kill path. + { + auto monitor_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + ASSERT_OK(WaitFor( + [&monitor_conn]() -> Result { + auto res = VERIFY_RESULT(monitor_conn.FetchFormat( + "SELECT indisready FROM pg_class" + " JOIN pg_index ON pg_class.oid = pg_index.indexrelid" + " WHERE pg_class.relname = '$0'", + kIndexName)); + if (PQntuples(res.get()) == 0) { + return false; + } + return VERIFY_RESULT(GetValue(res.get(), 0, 0)); + }, + 30s * kTimeMultiplier, "index to reach indisready")); + } // monitor_conn closed here + + // UpdateIndexPermission sleeps kAlterTableDelay before AND after the permission update. + // Register() is called after the first sleep, so at T_indisready + kAlterTableDelay. + // Sleeping kAlterTableDelay * 2 + 1s ensures Register() has completed and no BackfillIndex + // RPCs have been sent yet. + SleepFor(kAlterTableDelay * 2 + 1s); + + ASSERT_EQ(ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())), 0) + << "BackfillIndex RPCs already started — kill window was missed. " + "Increase kAlterTableDelay or reduce system load."; + + auto killer_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + ASSERT_TRUE(ASSERT_RESULT( + killer_conn.FetchRow( + Format("SELECT pg_terminate_backend($0)", create_index_pid)))); + LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid + << " (before any tserver BackfillIndex RPCs)"; + + // Allow enough time for all 4 chunks (4 × kSlowdown = 8s) plus a generous margin. + // If the liveness task did not fire, all 4 complete; with the fix, ≤ 2 complete. + SleepFor(kSlowdown * 4 + 5s); + auto rpcs_final = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + LOG(INFO) << "BackfillIndex RPC count after observation window: " << rpcs_final; + + thread_holder_.JoinAll(); + + // With the fix: liveness detects ABORTED on the first poll (≈ 2s after DoBackfill starts) + // and cancels the backfill. At most 2 chunks can finish before the abort propagates. + // Without the fix: all 4 chunks run to completion (requester_transaction not forwarded). + EXPECT_LE(rpcs_final, 2) + << "Expected ≤ 2 BackfillIndex RPCs (liveness task should abort after ≤ 2 chunks), " + << "got " << rpcs_final << ". Suggests requester_transaction was not forwarded " + "from the placeholder BackfillTable to the real BackfillTable."; +} + +// Negative test: with liveness monitoring disabled the master keeps running all 4 chunks even +// though the requesting backend was killed before any tserver BackfillIndex RPCs were sent. +class PgIndexBackfillCancellationEarlyKillWithoutFixTest + : public PgIndexBackfillCancellationEarlyKillTest { + public: + void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { + PgIndexBackfillCancellationEarlyKillTest::UpdateMiniClusterOptions(options); + options->extra_master_flags.push_back("--TEST_skip_ddl_requester_liveness_check=true"); + // Disable the master-side DDL transaction verifier so it cannot roll back the index + // permissions when the CREATE INDEX backend is killed. Unlike TEST_pause_ddl_rollback + // (which blocks inside YsqlDdlTxnCompleteCallbackInternal while holding an async_task_pool_ + // thread), this flag makes VerifyTransaction() return immediately, so no pool thread is + // consumed. Consuming pool threads with the pause flag starves BackfillTable::Launch(), + // which also needs an async_task_pool_ thread in debug builds (pool size = NumCPUs). + options->extra_master_flags.push_back("--TEST_skip_transaction_verification=true"); + } +}; + +INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationEarlyKillWithoutFixTest, + ::testing::Values(false)); + +TEST_P(PgIndexBackfillCancellationEarlyKillWithoutFixTest, + YB_DISABLE_TEST_IN_SANITIZERS(BackfillContinuesAfterEarlyBackendKill)) { + ASSERT_OK(conn_->ExecuteFormat( + "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); + ASSERT_OK(conn_->ExecuteFormat( + "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); + conn_.reset(); + + std::atomic create_index_pid{0}; + CountDownLatch pid_ready(1); + + thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { + auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); + pid_ready.CountDown(); + WARN_NOT_OK( + conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName), + "CREATE INDEX CONCURRENTLY failed (expected after pg_terminate_backend)"); + }); + + pid_ready.Wait(); + + { + auto monitor_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + ASSERT_OK(WaitFor( + [&monitor_conn]() -> Result { + auto res = VERIFY_RESULT(monitor_conn.FetchFormat( + "SELECT indisready FROM pg_class" + " JOIN pg_index ON pg_class.oid = pg_index.indexrelid" + " WHERE pg_class.relname = '$0'", + kIndexName)); + if (PQntuples(res.get()) == 0) { + return false; + } + return VERIFY_RESULT(GetValue(res.get(), 0, 0)); + }, + 30s * kTimeMultiplier, "index to reach indisready")); + } + + // Wait for both alter-table delay sleeps to complete so SendAlterTableRequest has gone out + // to the tablets, but HandleTabletSchemaVersionReport has not yet fired (backfill not started). + // DDL rollback is suppressed by TEST_skip_transaction_verification (set at cluster startup), + // so no pool threads are consumed by spinning verification tasks. + SleepFor(kAlterTableDelay * 2 + 1s); + + ASSERT_EQ(ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())), 0) + << "BackfillIndex RPCs already started — kill window was missed."; + + auto killer_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + ASSERT_TRUE(ASSERT_RESULT( + killer_conn.FetchRow( + Format("SELECT pg_terminate_backend($0)", create_index_pid)))); + LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid + << " (before any tserver BackfillIndex RPCs)"; + + // Wait for all 4 chunks to complete. We use WaitFor instead of a fixed SleepFor because + // GetSafeTime may block briefly while the killed PG backend's transaction heartbeat expires + // (up to ~1.5-5s), delaying DoBackfill startup. A fixed window can be too short in that case. + ASSERT_OK(WaitFor( + [&]() -> Result { + return VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())) >= 4; + }, + 60s * kTimeMultiplier, + "all 4 BackfillIndex RPCs to complete")); + auto rpcs_final = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); + LOG(INFO) << "BackfillIndex RPC count after observation window: " << rpcs_final; + + thread_holder_.JoinAll(); + + // With liveness monitoring disabled (TEST_skip_ddl_requester_liveness_check) and DDL + // verification disabled (TEST_skip_transaction_verification), all 4 chunks run to completion. + // Contrast with the positive test where ≤ 2 chunks complete before the liveness task fires + // and aborts the backfill. + EXPECT_GE(rpcs_final, 4) + << "Expected 4 BackfillIndex RPCs (liveness check and DDL verification disabled), " + << "got " << rpcs_final; +} + } // namespace yb::pgwrapper From ebaafb382c7790d068412c29dd1725e7eb467caa Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 1 May 2026 18:37:10 -0700 Subject: [PATCH 04/15] As per review, added DCHECK, fixed comments, modifed transaction_rpc_timeout_ms --- src/yb/master/ysql_ddl_verification_task.cc | 2 ++ src/yb/master/ysql_ddl_verification_task.h | 2 +- src/yb/yql/pgwrapper/pg_index_backfill-test.cc | 17 +++++++++-------- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/yb/master/ysql_ddl_verification_task.cc b/src/yb/master/ysql_ddl_verification_task.cc index b932aabc156b..bd7cd3ac642a 100644 --- a/src/yb/master/ysql_ddl_verification_task.cc +++ b/src/yb/master/ysql_ddl_verification_task.cc @@ -963,6 +963,8 @@ void DdlRequesterLivenessTask::FinishPollTransaction(bool aborted) { if (aborted && !callbacks_.done_()) { LOG(INFO) << "DdlRequesterLivenessTask: requester transaction aborted for " << table_info_->id() << ", aborting background DDL operation"; + // As explained by comments above Complete() must precede abort_(). + DCHECK(IsStateTerminal(state())); auto s = callbacks_.abort_(); if (!s.ok()) { LOG(ERROR) << "Failed to abort background DDL operation after requester death: " << s; diff --git a/src/yb/master/ysql_ddl_verification_task.h b/src/yb/master/ysql_ddl_verification_task.h index 843fc1b2cd85..d02f992d8884 100644 --- a/src/yb/master/ysql_ddl_verification_task.h +++ b/src/yb/master/ysql_ddl_verification_task.h @@ -210,7 +210,7 @@ class TableSchemaVerificationTask : public MultiStepTableTaskBase, class DdlRequesterLivenessTask : public MultiStepTableTaskBase, public PollTransactionStatusBase { public: - static std::shared_ptr CreateAndStartTask( + static std::shared_ptr CreateAndStartTask( CatalogManager& catalog_manager, scoped_refptr table, const TransactionMetadata& transaction, diff --git a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc index 545159dd0859..3fe5358b264e 100644 --- a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc +++ b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc @@ -3449,16 +3449,17 @@ class PgIndexBackfillCancellationTest : public PgIndexBackfillTest { // Poll frequently so we detect transaction abort within the test observation window. options->extra_master_flags.push_back( "--ddl_requester_liveness_check_interval_secs=2"); - // Reduce transaction RPC timeout so that GetTransactionStatus calls during cluster teardown - // (from the DDL verification cleanup task for the failed CREATE INDEX) time out quickly - // instead of blocking the master's worker thread for the default 5-15 seconds. - options->extra_master_flags.push_back("--transaction_rpc_timeout_ms=1000"); + // Reduce transaction RPC timeout so that in-flight GetTransactionStatus RPCs + // drain quickly during cluster teardown rather than delaying the master's + // graceful shutdown by the default 5-15 seconds + options->extra_master_flags.push_back( + Format("--transaction_rpc_timeout_ms=$0", 2000 * kTimeMultiplier)); } int GetNumTabletServers() const override { return 1; } protected: - // Each BackfillIndex RPC to a tserver takes this long due to the slowdown flag. + // This delay is added to the BackfillIndex RPC's processing time. const MonoDelta kSlowdown = 2s; }; @@ -3467,11 +3468,11 @@ INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationTest, ::testing::Bool()); // Regression test: after pg_terminate_backend kills the CREATE INDEX CONCURRENTLY session, // the master must stop issuing new BackfillIndex RPCs to tservers. // -// Without the fix: the master continues launching backfill chunks, visible as new +// Without the liveness check: the master continues launching backfill chunks, visible as new // BackfillIndex RPC completions on tservers after the backend is gone. -// With the fix: the master detects the DDL transaction is ABORTED and stops. +// With the liveness check: the master detects the DDL transaction is ABORTED and stops. // -// Manual reproduction (as suggested by the YugabyteDB developer): +// Manual reproduction: // SET yb_slowdown_backfill_by_ms = 10000; // CREATE INDEX CONCURRENTLY idx ON my_table (col); -- session 1 // SELECT pg_terminate_backend(); -- session 2 From 8b620dc2f556b46fcacd5546c78cadea3784b372 Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Mon, 4 May 2026 10:43:53 -0700 Subject: [PATCH 05/15] Deduplicate the test code, replace deprecated FetchFormat with FetchRow, sort kDdlRequesterLiveness --- src/yb/master/ysql_ddl_verification_task.h | 2 +- src/yb/server/monitored_task.h | 2 +- .../yql/pgwrapper/pg_index_backfill-test.cc | 361 ++++++------------ 3 files changed, 129 insertions(+), 236 deletions(-) diff --git a/src/yb/master/ysql_ddl_verification_task.h b/src/yb/master/ysql_ddl_verification_task.h index d02f992d8884..4ea29e3963e3 100644 --- a/src/yb/master/ysql_ddl_verification_task.h +++ b/src/yb/master/ysql_ddl_verification_task.h @@ -208,7 +208,7 @@ class TableSchemaVerificationTask : public MultiStepTableTaskBase, // when the transaction is detected as ABORTED (e.g. pg_terminate_backend killed the session). // Can be attached to any long-running background DDL operation. class DdlRequesterLivenessTask : public MultiStepTableTaskBase, - public PollTransactionStatusBase { + public PollTransactionStatusBase { public: static std::shared_ptr CreateAndStartTask( CatalogManager& catalog_manager, diff --git a/src/yb/server/monitored_task.h b/src/yb/server/monitored_task.h index f1b6b3980b6c..82d34165c90f 100644 --- a/src/yb/server/monitored_task.h +++ b/src/yb/server/monitored_task.h @@ -63,7 +63,6 @@ YB_DEFINE_ENUM(MonitoredTaskType, (kBackendsCatalogVersion) (kBackendsCatalogVersionTs) (kBackfillDone) - (kDdlRequesterLiveness) (kBackfillTable) (kBackfillTabletChunk) (kChangeConfig) @@ -71,6 +70,7 @@ YB_DEFINE_ENUM(MonitoredTaskType, (kClonePgSchema) (kCloneTablet) (kCreateReplica) + (kDdlRequesterLiveness) (kDeleteReplica) (kEnableDbConns) (kFlushTablets) diff --git a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc index 3fe5358b264e..e542095188c5 100644 --- a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc +++ b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc @@ -3443,7 +3443,7 @@ class PgIndexBackfillCancellationTest : public PgIndexBackfillTest { // - yb_fetch_row_limit=500: controls how many rows PostgreSQL fetches from DocDB per // BACKFILL INDEX SQL call. The default is 1024, which would produce only 2 RPCs. // - TEST_backfill_paging_size=500: exits the do-while loop in BackfillIndexesForYsql - // after 500 rows have been accumulated, forcing a resume-key response to the master. + // after 500 rows have been accumulated, returning a non-empty backfilled_until to the master. options->extra_tserver_flags.push_back("--ysql_yb_fetch_row_limit=500"); options->extra_tserver_flags.push_back("--TEST_backfill_paging_size=500"); // Poll frequently so we detect transaction abort within the test observation window. @@ -3459,6 +3459,57 @@ class PgIndexBackfillCancellationTest : public PgIndexBackfillTest { int GetNumTabletServers() const override { return 1; } protected: + // Sets up a 2000-row single-tablet table, runs CREATE INDEX CONCURRENTLY in a background + // thread, waits for at least one BackfillIndex RPC to complete (3 more remain, so the kill + // always lands while backfill is ongoing), then kills the backend. Returns the RPC count + // sampled after a grace period (rpcs_before) and again after an observation window (rpcs_after). + // + // Note: after the kill there is a race between the master dispatching the next chunk and + // the liveness task detecting the ABORTED transaction. The liveness task could fire before + // the next chunk starts, leaving 0 additional RPCs. Both callers handle this correctly + // via their respective assertions. + Result> RunBackfillAndKillMidway() { + RETURN_NOT_OK(conn_->ExecuteFormat( + "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); + RETURN_NOT_OK(conn_->ExecuteFormat( + "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); + + std::atomic create_index_pid{0}; + CountDownLatch pid_ready(1); + thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { + auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); + pid_ready.CountDown(); + (void)conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName); + }); + + pid_ready.Wait(); + RETURN_NOT_OK(WaitFor( + [this]() -> Result { + return VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())) >= 1; + }, + 30s * kTimeMultiplier, "first BackfillIndex RPC to complete")); + + auto killer_conn = VERIFY_RESULT(ConnectToDB(kDatabaseName)); + auto terminated = VERIFY_RESULT( + killer_conn.FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid))); + if (!terminated) { + return STATUS(IllegalState, "pg_terminate_backend returned false"); + } + LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid; + + SleepFor(kSlowdown + 1s); + auto rpcs_before = VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())); + LOG(INFO) << "BackfillIndex RPC count after kill + grace: " << rpcs_before; + + SleepFor(kSlowdown * 2 + 1s); + auto rpcs_after = VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())); + LOG(INFO) << "BackfillIndex RPC count after observation window: " << rpcs_after; + + thread_holder_.JoinAll(); + return std::make_pair(rpcs_before, rpcs_after); + } + // This delay is added to the BackfillIndex RPC's processing time. const MonoDelta kSlowdown = 2s; }; @@ -3473,76 +3524,16 @@ INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationTest, ::testing::Bool()); // With the liveness check: the master detects the DDL transaction is ABORTED and stops. // // Manual reproduction: -// SET yb_slowdown_backfill_by_ms = 10000; // CREATE INDEX CONCURRENTLY idx ON my_table (col); -- session 1 // SELECT pg_terminate_backend(); -- session 2 // -- observe in tserver logs (--ysql_pg_conf_csv=log_statement=ALL): // -- backfill statements keep appearing even after the kill TEST_P(PgIndexBackfillCancellationTest, YB_DISABLE_TEST_IN_SANITIZERS(BackfillStopsAfterBackendKill)) { - // Single-tablet table with 2000 rows. TEST_backfill_paging_size=500 (set in the fixture) - // causes the tserver to return a resume key after every 500 rows, so the master issues - // 4 sequential BackfillIndex RPCs (each taking kSlowdown=2s). This gives a clear - // observation window: after killing mid-backfill we can tell whether the master stops or - // continues issuing new chunks. - ASSERT_OK(conn_->ExecuteFormat( - "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); - ASSERT_OK(conn_->ExecuteFormat( - "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); - // Close the setup connection so WaitForYsqlBackendsCatalogVersion doesn't stall waiting - // for this idle backend to refresh its catalog version after CREATE INDEX increments it. - conn_.reset(); - - // Capture the CREATE INDEX backend PID before the DDL starts, then run the DDL. - // The DDL will fail with a network error when pg_terminate_backend kills the connection. - std::atomic create_index_pid{0}; - CountDownLatch pid_ready(1); - - thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { - auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); - pid_ready.CountDown(); - // Expected to fail once pg_terminate_backend is called. - WARN_NOT_OK( - conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName), - "CREATE INDEX CONCURRENTLY failed (expected after pg_terminate_backend)"); - }); - - // Wait for the first BackfillIndex RPC to complete. This guarantees at least one more - // chunk (resume from the 500-row page boundary) would be issued without the fix, giving - // the observation window a clear before/after signal. - pid_ready.Wait(); - ASSERT_OK(WaitFor( - [this]() -> Result { - return VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())) >= 1; - }, - 30s * kTimeMultiplier, "first BackfillIndex RPC to complete")); - - // Kill the CREATE INDEX backend from a separate connection. - auto killer_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - ASSERT_TRUE(ASSERT_RESULT( - killer_conn.FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid)))); - LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid; - - // Allow the currently in-flight BackfillIndex RPC (chunk 2, if any) to complete - // plus the liveness task to fire and suppress any further chunks. - SleepFor(kSlowdown + 1s); - auto rpcs_after_kill = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); - LOG(INFO) << "BackfillIndex RPC count after kill + grace: " << rpcs_after_kill; - - // Wait long enough for chunks 3 and 4 to finish if the bug were present. - SleepFor(kSlowdown * 2 + 1s); - auto rpcs_after_wait = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); - LOG(INFO) << "BackfillIndex RPC count after observation window: " << rpcs_after_wait; - - thread_holder_.JoinAll(); - - // With the fix: the liveness task detects the ABORTED transaction and calls Abort(), - // preventing any chunks after chunk 1 (or chunk 2 at most if it raced to start). - // rpcs_after_wait must equal rpcs_after_kill — no new chunks in the observation window. - EXPECT_EQ(rpcs_after_wait, rpcs_after_kill) + auto [rpcs_before, rpcs_after] = ASSERT_RESULT(RunBackfillAndKillMidway()); + EXPECT_EQ(rpcs_after, rpcs_before) << "BackfillIndex RPCs continued after pg_terminate_backend: " - << (rpcs_after_wait - rpcs_after_kill) << " unexpected additional RPC(s) issued"; + << (rpcs_after - rpcs_before) << " unexpected additional RPC(s) issued"; } // Negative-regression test: demonstrates the pre-fix bug. @@ -3565,76 +3556,30 @@ INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationWithoutFixTest, ::testing:: TEST_P(PgIndexBackfillCancellationWithoutFixTest, YB_DISABLE_TEST_IN_SANITIZERS(BackfillContinuesAfterBackendKill)) { - ASSERT_OK(conn_->ExecuteFormat( - "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); - ASSERT_OK(conn_->ExecuteFormat( - "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); - // Close the setup connection so WaitForYsqlBackendsCatalogVersion doesn't stall waiting - // for this idle backend to refresh its catalog version after CREATE INDEX increments it. - conn_.reset(); - - std::atomic create_index_pid{0}; - CountDownLatch pid_ready(1); - thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { - auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); - pid_ready.CountDown(); - WARN_NOT_OK( - conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName), - "CREATE INDEX CONCURRENTLY failed (expected after pg_terminate_backend)"); - }); - - pid_ready.Wait(); - - // Wait until the first BackfillIndex RPC has completed. With yb_fetch_row_limit=500 and - // TEST_backfill_paging_size=500 (both set in the fixture) and 2000 rows there are 4 sequential - // RPCs. After chunk 1 finishes chunks 2-4 remain: without the liveness check the master - // issues all of them even though the backend is dead. - ASSERT_OK(WaitFor( - [this]() -> Result { - return VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())) >= 1; - }, - 30s * kTimeMultiplier, "first BackfillIndex RPC to complete")); - - auto killer_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - ASSERT_TRUE(ASSERT_RESULT( - killer_conn.FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid)))); - LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid; - - auto rpcs_at_kill = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); - LOG(INFO) << "BackfillIndex RPC count at kill: " << rpcs_at_kill; - - // kSlowdown*2+1s covers at least 2 additional chunks (chunks 2 and 3 each take kSlowdown). - SleepFor(kSlowdown * 2 + 1s); - auto rpcs_after_wait = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); - LOG(INFO) << "BackfillIndex RPC count after observation window: " << rpcs_after_wait; - - thread_holder_.JoinAll(); - - // Without the liveness task the master keeps issuing chunks: rpcs_after_wait must be - // strictly greater than rpcs_at_kill, proving the bug is present when the fix is off. - EXPECT_GT(rpcs_after_wait, rpcs_at_kill) + auto [rpcs_before, rpcs_after] = ASSERT_RESULT(RunBackfillAndKillMidway()); + EXPECT_GT(rpcs_after, rpcs_before) << "Expected BackfillIndex RPCs to continue after pg_terminate_backend " "(liveness check disabled — demonstrating pre-fix behavior), but they stopped"; } // Tests that kill the CREATE INDEX backend BEFORE any tserver BackfillIndex RPCs have been -// sent. This exercises the Register/Launch split: Register() stores the requester_transaction -// on the placeholder BackfillTable before DoBackfill() is called. If the transaction is not -// forwarded correctly, the DdlRequesterLivenessTask is never started and backfill runs to -// completion despite the killed backend. +// sent. This exercises the permission-update → backfill handoff: UpdateIndexPermission stores +// the requester_transaction on the table via SetPendingBackfillRequesterTransaction, and +// StartBackfillingData retrieves it via TakePendingBackfillRequesterTransaction before launching +// DoBackfill. If the transaction is not forwarded correctly, DdlRequesterLivenessTask is never +// started and backfill runs to completion despite the killed backend. // // Timing sketch (kSlowdown = 2s, kAlterTableDelay = 4s): // T=0 PG backend calls BackfillIndex RPC → master enters UpdateIndexPermission // T~4s First sleep inside UpdateIndexPermission completes (kAlterTableDelay) -// T~4s Permission updated; Register() called → placeholder created with requester_transaction +// T~4s Permission updated; requester_transaction stored via SetPendingBackfillRequesterTransaction // T~8s Second sleep inside UpdateIndexPermission completes → AlterTable RPCs proceed // T~9s Test kills PG backend (kAlterTableDelay * 2 + 1s after indisready detected) // → 0 BackfillIndex RPCs have been sent yet // T~10s DoBackfill starts, StartRequesterLivenessMonitor() fires // T~12s Liveness first poll (ddl_requester_liveness_check_interval_secs = 2s): detects ABORTED // → abort callback cancels backfill -// → ≤ 2 chunks complete (chunk 1 races with first liveness poll; chunk 2 may slip through) +// → < 4 chunks complete (1-2 chunks may slip through before abort propagates) class PgIndexBackfillCancellationEarlyKillTest : public PgIndexBackfillCancellationTest { public: void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { @@ -3645,7 +3590,61 @@ class PgIndexBackfillCancellationEarlyKillTest : public PgIndexBackfillCancellat } protected: - // Each sleep inside UpdateIndexPermission lasts this long (two sleeps → 2× total delay). + // Sets up a 2000-row table, starts CREATE INDEX CONCURRENTLY in a background thread, waits + // for indisready, sleeps past both UpdateIndexPermission delays so the requester_transaction + // has been stored but no BackfillIndex RPCs have been sent, then kills the backend. + // The caller is responsible for observing RPC counts and joining thread_holder_. + Status KillBackendBeforeBackfill() { + RETURN_NOT_OK(conn_->ExecuteFormat( + "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); + RETURN_NOT_OK(conn_->ExecuteFormat( + "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); + conn_.reset(); + + std::atomic create_index_pid{0}; + CountDownLatch pid_ready(1); + thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { + auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); + create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); + pid_ready.CountDown(); + (void)conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName); + }); + + pid_ready.Wait(); + { + auto monitor_conn = VERIFY_RESULT(ConnectToDB(kDatabaseName)); + RETURN_NOT_OK(WaitFor( + [&monitor_conn]() -> Result { + auto rows = VERIFY_RESULT(monitor_conn.FetchRows(Format( + "SELECT indisready FROM pg_class" + " JOIN pg_index ON pg_class.oid = pg_index.indexrelid" + " WHERE pg_class.relname = '$0'", + kIndexName))); + return !rows.empty() && rows[0]; + }, + 30s * kTimeMultiplier, "index to reach indisready")); + } + + SleepFor(kAlterTableDelay * 2 + 1s); + auto rpcs = VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())); + if (rpcs != 0) { + return STATUS_FORMAT(IllegalState, + "BackfillIndex RPCs already started — kill window was missed ($0 RPCs). " + "Increase kAlterTableDelay or reduce system load.", rpcs); + } + + auto killer_conn = VERIFY_RESULT(ConnectToDB(kDatabaseName)); + auto terminated = VERIFY_RESULT( + killer_conn.FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid))); + if (!terminated) { + return STATUS(IllegalState, "pg_terminate_backend returned false"); + } + LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid + << " (before any tserver BackfillIndex RPCs)"; + return Status::OK(); + } + + // Each sleep inside UpdateIndexPermission lasts this long (two sleeps -> 2× total delay). const MonoDelta kAlterTableDelay = 4s; }; @@ -3655,63 +3654,7 @@ INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationEarlyKillTest, ::testing::B // from the placeholder BackfillTable and aborts backfill after ≤ 2 chunks. TEST_P(PgIndexBackfillCancellationEarlyKillTest, YB_DISABLE_TEST_IN_SANITIZERS(BackfillStopsAfterEarlyBackendKill)) { - ASSERT_OK(conn_->ExecuteFormat( - "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); - ASSERT_OK(conn_->ExecuteFormat( - "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); - // Close the setup connection; it is not needed for the kill step and must not linger. - conn_.reset(); - - std::atomic create_index_pid{0}; - CountDownLatch pid_ready(1); - - thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { - auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); - pid_ready.CountDown(); - WARN_NOT_OK( - conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName), - "CREATE INDEX CONCURRENTLY failed (expected after pg_terminate_backend)"); - }); - - pid_ready.Wait(); - - // Wait for indisready: the PG backend has reached (or passed) the UpdateIndexPermission call - // that triggers Register(). Open a fresh connection for polling and close it before the kill - // so it does not block WaitForYsqlBackendsCatalogVersion inside the kill path. - { - auto monitor_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - ASSERT_OK(WaitFor( - [&monitor_conn]() -> Result { - auto res = VERIFY_RESULT(monitor_conn.FetchFormat( - "SELECT indisready FROM pg_class" - " JOIN pg_index ON pg_class.oid = pg_index.indexrelid" - " WHERE pg_class.relname = '$0'", - kIndexName)); - if (PQntuples(res.get()) == 0) { - return false; - } - return VERIFY_RESULT(GetValue(res.get(), 0, 0)); - }, - 30s * kTimeMultiplier, "index to reach indisready")); - } // monitor_conn closed here - - // UpdateIndexPermission sleeps kAlterTableDelay before AND after the permission update. - // Register() is called after the first sleep, so at T_indisready + kAlterTableDelay. - // Sleeping kAlterTableDelay * 2 + 1s ensures Register() has completed and no BackfillIndex - // RPCs have been sent yet. - SleepFor(kAlterTableDelay * 2 + 1s); - - ASSERT_EQ(ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())), 0) - << "BackfillIndex RPCs already started — kill window was missed. " - "Increase kAlterTableDelay or reduce system load."; - - auto killer_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - ASSERT_TRUE(ASSERT_RESULT( - killer_conn.FetchRow( - Format("SELECT pg_terminate_backend($0)", create_index_pid)))); - LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid - << " (before any tserver BackfillIndex RPCs)"; + ASSERT_OK(KillBackendBeforeBackfill()); // Allow enough time for all 4 chunks (4 × kSlowdown = 8s) plus a generous margin. // If the liveness task did not fire, all 4 complete; with the fix, ≤ 2 complete. @@ -3721,11 +3664,12 @@ TEST_P(PgIndexBackfillCancellationEarlyKillTest, thread_holder_.JoinAll(); - // With the fix: liveness detects ABORTED on the first poll (≈ 2s after DoBackfill starts) - // and cancels the backfill. At most 2 chunks can finish before the abort propagates. - // Without the fix: all 4 chunks run to completion (requester_transaction not forwarded). - EXPECT_LE(rpcs_final, 2) - << "Expected ≤ 2 BackfillIndex RPCs (liveness task should abort after ≤ 2 chunks), " + // With the liveness check: liveness detects ABORTED on the first poll and cancels backfill. + // Without the liveness check: all 4 chunks run to completion (requester_transaction not forwarded). + // We assert < 4 (not == 0) because abort propagation has a small lag: 1-2 chunks may + // complete between DoBackfill starting and the abort taking effect. + EXPECT_LT(rpcs_final, 4) + << "Expected < 4 BackfillIndex RPCs (liveness task should abort backfill), " << "got " << rpcs_final << ". Suggests requester_transaction was not forwarded " "from the placeholder BackfillTable to the real BackfillTable."; } @@ -3753,58 +3697,7 @@ INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationEarlyKillWithoutFixTest, TEST_P(PgIndexBackfillCancellationEarlyKillWithoutFixTest, YB_DISABLE_TEST_IN_SANITIZERS(BackfillContinuesAfterEarlyBackendKill)) { - ASSERT_OK(conn_->ExecuteFormat( - "CREATE TABLE $0 (k INT PRIMARY KEY, v INT) SPLIT INTO 1 TABLETS", kTableName)); - ASSERT_OK(conn_->ExecuteFormat( - "INSERT INTO $0 SELECT i, i FROM generate_series(1, 2000) AS i", kTableName)); - conn_.reset(); - - std::atomic create_index_pid{0}; - CountDownLatch pid_ready(1); - - thread_holder_.AddThreadFunctor([this, &create_index_pid, &pid_ready] { - auto conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - create_index_pid.store(ASSERT_RESULT(conn.FetchRow("SELECT pg_backend_pid()"))); - pid_ready.CountDown(); - WARN_NOT_OK( - conn.ExecuteFormat("CREATE INDEX CONCURRENTLY $0 ON $1 (v)", kIndexName, kTableName), - "CREATE INDEX CONCURRENTLY failed (expected after pg_terminate_backend)"); - }); - - pid_ready.Wait(); - - { - auto monitor_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - ASSERT_OK(WaitFor( - [&monitor_conn]() -> Result { - auto res = VERIFY_RESULT(monitor_conn.FetchFormat( - "SELECT indisready FROM pg_class" - " JOIN pg_index ON pg_class.oid = pg_index.indexrelid" - " WHERE pg_class.relname = '$0'", - kIndexName)); - if (PQntuples(res.get()) == 0) { - return false; - } - return VERIFY_RESULT(GetValue(res.get(), 0, 0)); - }, - 30s * kTimeMultiplier, "index to reach indisready")); - } - - // Wait for both alter-table delay sleeps to complete so SendAlterTableRequest has gone out - // to the tablets, but HandleTabletSchemaVersionReport has not yet fired (backfill not started). - // DDL rollback is suppressed by TEST_skip_transaction_verification (set at cluster startup), - // so no pool threads are consumed by spinning verification tasks. - SleepFor(kAlterTableDelay * 2 + 1s); - - ASSERT_EQ(ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())), 0) - << "BackfillIndex RPCs already started — kill window was missed."; - - auto killer_conn = ASSERT_RESULT(ConnectToDB(kDatabaseName)); - ASSERT_TRUE(ASSERT_RESULT( - killer_conn.FetchRow( - Format("SELECT pg_terminate_backend($0)", create_index_pid)))); - LOG(INFO) << "Terminated CREATE INDEX backend PID " << create_index_pid - << " (before any tserver BackfillIndex RPCs)"; + ASSERT_OK(KillBackendBeforeBackfill()); // Wait for all 4 chunks to complete. We use WaitFor instead of a fixed SleepFor because // GetSafeTime may block briefly while the killed PG backend's transaction heartbeat expires From b8f23354b17d19f3e87f89ccb905ae5b050d9792 Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Mon, 4 May 2026 12:05:39 -0700 Subject: [PATCH 06/15] lint fixes --- src/yb/yql/pgwrapper/pg_index_backfill-test.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc index e542095188c5..11fc235acca9 100644 --- a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc +++ b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc @@ -3572,7 +3572,8 @@ TEST_P(PgIndexBackfillCancellationWithoutFixTest, // Timing sketch (kSlowdown = 2s, kAlterTableDelay = 4s): // T=0 PG backend calls BackfillIndex RPC → master enters UpdateIndexPermission // T~4s First sleep inside UpdateIndexPermission completes (kAlterTableDelay) -// T~4s Permission updated; requester_transaction stored via SetPendingBackfillRequesterTransaction +// T~4s Permission updated; requester_transaction stored via +// SetPendingBackfillRequesterTransaction // T~8s Second sleep inside UpdateIndexPermission completes → AlterTable RPCs proceed // T~9s Test kills PG backend (kAlterTableDelay * 2 + 1s after indisready detected) // → 0 BackfillIndex RPCs have been sent yet @@ -3664,8 +3665,9 @@ TEST_P(PgIndexBackfillCancellationEarlyKillTest, thread_holder_.JoinAll(); - // With the liveness check: liveness detects ABORTED on the first poll and cancels backfill. - // Without the liveness check: all 4 chunks run to completion (requester_transaction not forwarded). + // With the liveness check: liveness detects ABORTED on the first poll + // and cancels backfill. Without the liveness check: all 4 chunks run + // to completion (requester_transaction not forwarded). // We assert < 4 (not == 0) because abort propagation has a small lag: 1-2 chunks may // complete between DoBackfill starting and the abort taking effect. EXPECT_LT(rpcs_final, 4) From b707b74c28877640ecb7004e3ef95921a4ed8fe9 Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Wed, 6 May 2026 00:14:29 -0700 Subject: [PATCH 07/15] as per comments fixed comment formatting --- src/yb/master/catalog_entity_info.h | 2 +- src/yb/master/ysql_ddl_verification_task.cc | 6 +++--- src/yb/tserver/pg_client_session.cc | 2 +- src/yb/yql/pgwrapper/pg_index_backfill-test.cc | 12 ++++++------ 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/yb/master/catalog_entity_info.h b/src/yb/master/catalog_entity_info.h index eee2bb89ff42..d2ae3c709011 100644 --- a/src/yb/master/catalog_entity_info.h +++ b/src/yb/master/catalog_entity_info.h @@ -1010,7 +1010,7 @@ class TableInfo : public RefCountedThreadSafe, bool is_backfilling_ = false; // DDL transaction from the PG backend that initiated the backfill, and the table schema version - // at which it was stored. Set when BackfillIndex updates permissions (WRITE_AND_DELETE → + // at which it was stored. Set when BackfillIndex updates permissions (WRITE_AND_DELETE -> // DO_BACKFILL) and cleared when StartBackfillingData creates the BackfillTable. std::optional pending_backfill_requester_transaction_ GUARDED_BY(lock_); uint32_t pending_backfill_requester_transaction_version_ GUARDED_BY(lock_) = 0; diff --git a/src/yb/master/ysql_ddl_verification_task.cc b/src/yb/master/ysql_ddl_verification_task.cc index b2b6d05b2339..59e45a03b9f0 100644 --- a/src/yb/master/ysql_ddl_verification_task.cc +++ b/src/yb/master/ysql_ddl_verification_task.cc @@ -945,7 +945,7 @@ Status DdlRequesterLivenessTask::FirstStep() { void DdlRequesterLivenessTask::TransactionPending() { if (callbacks_.done_()) { // We're inside the GetTransactionStatus RPC callback. Calling Complete() directly here - // would deadlock: Complete() → TaskCompleted() → Shutdown() → sync_.Wait() blocks + // would deadlock: Complete() -> TaskCompleted() -> Shutdown() -> sync_.Wait() blocks // because the callback's user_cb (which signals sync_) hasn't fired yet. // Schedule asynchronously so user_cb fires first. ScheduleNextStep( @@ -963,12 +963,12 @@ void DdlRequesterLivenessTask::TransactionPending() { void DdlRequesterLivenessTask::FinishPollTransaction(bool aborted) { // We're inside the GetTransactionStatus RPC callback. Calling Complete() directly here - // would deadlock: Complete() → TaskCompleted() → Shutdown() → sync_.Wait() blocks + // would deadlock: Complete() -> TaskCompleted() -> Shutdown() -> sync_.Wait() blocks // because the callback's user_cb (which signals sync_) hasn't fired yet. // Schedule asynchronously so user_cb fires first. // // Complete() must still be called before callbacks_.abort_() to avoid a different deadlock: - // Abort() may call BackfillTable::StopLivenessMonitor() → AbortAndReturnPrevState(), which + // Abort() may call BackfillTable::StopLivenessMonitor() -> AbortAndReturnPrevState(), which // acquires step_execution_mutex_ to wait for the current step to finish. Calling Complete() // first moves the task to terminal state so AbortAndReturnPrevState() finds // TrySetState(kAborted) failing and returns immediately without touching the mutex. diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 2f4e9c543ae6..85c441924c76 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -2057,7 +2057,7 @@ class PgClientSession::Impl { std::optional txn_metadata; // The PG backend holds a DDL transaction open for the entire backfill duration // (StartTransactionCommand at indexcmds.c:2334). Pass it to the master so it can detect when - // this backend is killed (→ txn aborted) and stop launching new backfill chunks. + // this backend is killed (-> txn aborted) and stop launching new backfill chunks. auto meta = GetDdlTransactionMetadata( true /* use_transaction */, false /* use_regular_transaction_block */, context->GetClientDeadline(), IsTxnUsingTableLocks(false)); diff --git a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc index 11fc235acca9..45196a384683 100644 --- a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc +++ b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc @@ -3563,24 +3563,24 @@ TEST_P(PgIndexBackfillCancellationWithoutFixTest, } // Tests that kill the CREATE INDEX backend BEFORE any tserver BackfillIndex RPCs have been -// sent. This exercises the permission-update → backfill handoff: UpdateIndexPermission stores +// sent. This exercises the permission-update -> backfill handoff: UpdateIndexPermission stores // the requester_transaction on the table via SetPendingBackfillRequesterTransaction, and // StartBackfillingData retrieves it via TakePendingBackfillRequesterTransaction before launching // DoBackfill. If the transaction is not forwarded correctly, DdlRequesterLivenessTask is never // started and backfill runs to completion despite the killed backend. // // Timing sketch (kSlowdown = 2s, kAlterTableDelay = 4s): -// T=0 PG backend calls BackfillIndex RPC → master enters UpdateIndexPermission +// T=0 PG backend calls BackfillIndex RPC -> master enters UpdateIndexPermission // T~4s First sleep inside UpdateIndexPermission completes (kAlterTableDelay) // T~4s Permission updated; requester_transaction stored via // SetPendingBackfillRequesterTransaction -// T~8s Second sleep inside UpdateIndexPermission completes → AlterTable RPCs proceed +// T~8s Second sleep inside UpdateIndexPermission completes -> AlterTable RPCs proceed // T~9s Test kills PG backend (kAlterTableDelay * 2 + 1s after indisready detected) -// → 0 BackfillIndex RPCs have been sent yet +// -> 0 BackfillIndex RPCs have been sent yet // T~10s DoBackfill starts, StartRequesterLivenessMonitor() fires // T~12s Liveness first poll (ddl_requester_liveness_check_interval_secs = 2s): detects ABORTED -// → abort callback cancels backfill -// → < 4 chunks complete (1-2 chunks may slip through before abort propagates) +// -> abort callback cancels backfill +// -> < 4 chunks complete (1-2 chunks may slip through before abort propagates) class PgIndexBackfillCancellationEarlyKillTest : public PgIndexBackfillCancellationTest { public: void UpdateMiniClusterOptions(ExternalMiniClusterOptions* options) override { From a48912711e55d19af10b4755acba9eca8d5732cd Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Wed, 6 May 2026 14:11:44 -0700 Subject: [PATCH 08/15] Fixed comments --- src/yb/master/backfill_index.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/yb/master/backfill_index.cc b/src/yb/master/backfill_index.cc index 1a1bd87c51f5..178cd86a6aee 100644 --- a/src/yb/master/backfill_index.cc +++ b/src/yb/master/backfill_index.cc @@ -343,7 +343,8 @@ Status MultiStageAlterTable::StartBackfillingData( << yb::ToString(idx_infos); // Retrieve the requester transaction if it was stored during the permission-update phase. - // Pass current_version so Take rejects stale transactions from earlier backfill attempts. + // Pass current_version so TakePendingBackfillRequesterTransaction rejects stale + // transactions from earlier backfill attempts. if (!requester_transaction && current_version) { requester_transaction = indexed_table->TakePendingBackfillRequesterTransaction(*current_version); From 9140fce8bbbcdccec0a809822b592a50330f43fb Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 8 May 2026 11:12:34 -0700 Subject: [PATCH 09/15] Removed defaulted nullopt and changed to VERIFY_RESULT --- src/yb/client/client-internal.cc | 4 ++-- src/yb/client/client-internal.h | 4 ++-- src/yb/client/client.cc | 7 ++++--- src/yb/client/client.h | 6 ++++-- src/yb/master/backfill_index.cc | 5 +++-- src/yb/master/backfill_index.h | 11 ++++++----- src/yb/master/catalog_manager.cc | 10 ++++++---- src/yb/tserver/pg_client_session.cc | 15 +++++---------- 8 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index 16ad35f5d7f2..e4958532dff6 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -990,8 +990,8 @@ Status YBClient::Data::BackfillIndex(YBClient* client, const YBTableName& index_name, const TableId& index_id, CoarseTimePoint deadline, - bool wait, - std::optional requester_transaction) { + std::optional requester_transaction, + bool wait) { BackfillIndexRequestPB req; BackfillIndexResponsePB resp; diff --git a/src/yb/client/client-internal.h b/src/yb/client/client-internal.h index 015246df5814..152fcac45211 100644 --- a/src/yb/client/client-internal.h +++ b/src/yb/client/client-internal.h @@ -209,8 +209,8 @@ class YBClient::Data { const YBTableName& table_name, const TableId& table_id, CoarseTimePoint deadline, - bool wait = true, - std::optional requester_transaction = std::nullopt); + std::optional requester_transaction, + bool wait = true); Status IsBackfillIndexInProgress(YBClient* client, const TableId& table_id, const TableId& index_id, diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 2a748770d73a..b39945a83f05 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -622,13 +622,14 @@ Status YBClient::TruncateTables(const TableIds& table_ids, bool wait) { return data_->TruncateTables(this, table_ids, deadline, wait); } -Status YBClient::BackfillIndex(const TableId& table_id, bool wait, CoarseTimePoint deadline, - std::optional requester_transaction) { +Status YBClient::BackfillIndex(const TableId& table_id, + std::optional requester_transaction, + bool wait, CoarseTimePoint deadline) { if (deadline == CoarseTimePoint()) { deadline = CoarseMonoClock::Now() + FLAGS_backfill_index_client_rpc_timeout_ms * 1ms; } return data_->BackfillIndex( - this, YBTableName(), table_id, deadline, wait, std::move(requester_transaction)); + this, YBTableName(), table_id, deadline, std::move(requester_transaction), wait); } Status YBClient::GetIndexBackfillProgress( diff --git a/src/yb/client/client.h b/src/yb/client/client.h index e37961ea8b90..5dff5da59287 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -349,8 +349,10 @@ class YBClient { // Backfill the specified index table. This is only supported for YSQL at the moment. Status BackfillIndex( - const TableId& table_id, bool wait = true, CoarseTimePoint deadline = CoarseTimePoint(), - std::optional requester_transaction = std::nullopt); + const TableId& table_id, + std::optional requester_transaction, + bool wait = true, + CoarseTimePoint deadline = CoarseTimePoint()); Status GetIndexBackfillProgress( const TableIds& index_ids, diff --git a/src/yb/master/backfill_index.cc b/src/yb/master/backfill_index.cc index 178cd86a6aee..d5c9e3147e06 100644 --- a/src/yb/master/backfill_index.cc +++ b/src/yb/master/backfill_index.cc @@ -400,8 +400,9 @@ IndexPermissions NextPermission(IndexPermissions perm) { Status MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( CatalogManager* catalog_manager, const scoped_refptr& indexed_table, - uint32_t current_version, const LeaderEpoch& epoch, bool respect_backfill_deferrals, - bool update_ysql_to_backfill, std::optional requester_transaction) { + uint32_t current_version, const LeaderEpoch& epoch, + std::optional requester_transaction, bool respect_backfill_deferrals, + bool update_ysql_to_backfill) { DVLOG_WITH_FUNC(3) << Format("$0, version: $1, respect_deferrals: $2, update_ysql_to_backfill: $3", *indexed_table, current_version, respect_backfill_deferrals, diff --git a/src/yb/master/backfill_index.h b/src/yb/master/backfill_index.h index de2c58d5944f..c9de1665e474 100644 --- a/src/yb/master/backfill_index.h +++ b/src/yb/master/backfill_index.h @@ -69,9 +69,10 @@ class MultiStageAlterTable { // INDEX_PERM_DELETE_ONLY -> INDEX_PERM_WRITE_AND_DELETE -> BACKFILL static Status LaunchNextTableInfoVersionIfNecessary( CatalogManager* mgr, const scoped_refptr& Info, uint32_t current_version, - const LeaderEpoch& epoch, bool respect_backfill_deferrals = true, - bool update_ysql_to_backfill = false, - std::optional requester_transaction = std::nullopt); + const LeaderEpoch& epoch, + std::optional requester_transaction, + bool respect_backfill_deferrals = true, + bool update_ysql_to_backfill = false); // Clears the fully_applied_* state for the given table and optionally sets it to RUNNING. // If the version has changed and does not match the expected version no @@ -103,7 +104,7 @@ class MultiStageAlterTable { CatalogManager* catalog_manager, const scoped_refptr& indexed_table, const std::vector& idx_infos, std::optional expected_version, const LeaderEpoch& epoch, - std::optional requester_transaction = std::nullopt); + std::optional requester_transaction); }; class BackfillTablet; @@ -119,7 +120,7 @@ class BackfillTable : public std::enable_shared_from_this { std::vector indexes, const scoped_refptr &ns_info, LeaderEpoch epoch, - std::optional requester_transaction = std::nullopt); + std::optional requester_transaction); Status Launch(); diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 36d42b77e561..3afb381c8c1f 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -6596,8 +6596,8 @@ Status CatalogManager::BackfillIndex( } } return MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( - this, indexed_table, current_version, epoch, /* respect deferrals for backfill */ false, - /* update ysql to backfill */ true, std::move(requester_txn)); + this, indexed_table, current_version, epoch, std::move(requester_txn), + /* respect_backfill_deferrals */ false, /* update_ysql_to_backfill */ true); } Status CatalogManager::GetBackfillJobs( @@ -6774,7 +6774,8 @@ Status CatalogManager::LaunchBackfillIndexForTable( } auto s = MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( - this, indexed_table, current_version, epoch, /* respect deferrals for backfill */ false); + this, indexed_table, current_version, epoch, std::nullopt, + /* respect_backfill_deferrals */ false); if (!s.ok()) { VLOG(3) << __func__ << " Done failed " << s; return SetupError(resp->mutable_error(), MasterErrorPB::UNKNOWN_ERROR, s); @@ -11792,7 +11793,8 @@ Status CatalogManager::HandleTabletSchemaVersionReport( table->id(), table->EraseDdlTxnForRollbackToSubTxnWaitingForSchemaVersion(version)); } - return MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary(this, table, version, epoch); + return MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( + this, table, version, epoch, std::nullopt); } Status CatalogManager::ProcessPendingAssignmentsPerTable( diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 85c441924c76..a06cc1a9b2ee 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -2054,21 +2054,16 @@ class PgClientSession::Impl { Status BackfillIndex( const PgBackfillIndexRequestPB& req, PgBackfillIndexResponsePB* resp, rpc::RpcContext* context) { - std::optional txn_metadata; // The PG backend holds a DDL transaction open for the entire backfill duration // (StartTransactionCommand at indexcmds.c:2334). Pass it to the master so it can detect when // this backend is killed (-> txn aborted) and stop launching new backfill chunks. - auto meta = GetDdlTransactionMetadata( + const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata( true /* use_transaction */, false /* use_regular_transaction_block */, - context->GetClientDeadline(), IsTxnUsingTableLocks(false)); - if (meta.ok() && *meta) { - txn_metadata = **meta; - } else { - VLOG(1) << "BackfillIndex: no DDL transaction metadata available for cancellation detection"; - } + context->GetClientDeadline(), IsTxnUsingTableLocks(false))); return client_.BackfillIndex( - PgObjectId::GetYbTableIdFromPB(req.table_id()), /* wait= */ true, - context->GetClientDeadline(), std::move(txn_metadata)); + PgObjectId::GetYbTableIdFromPB(req.table_id()), + metadata ? std::make_optional(*metadata) : std::nullopt, + /* wait= */ true, context->GetClientDeadline()); } Status CreateTablegroup( From 69826e43a4803275e420da06fa2883970b09b120 Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 8 May 2026 11:38:01 -0700 Subject: [PATCH 10/15] Style formatting --- src/yb/master/backfill_index.cc | 4 +++- src/yb/master/catalog_manager.cc | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/yb/master/backfill_index.cc b/src/yb/master/backfill_index.cc index d5c9e3147e06..bb1f5287b7c3 100644 --- a/src/yb/master/backfill_index.cc +++ b/src/yb/master/backfill_index.cc @@ -1105,7 +1105,9 @@ Status BackfillTable::MarkIndexesAsDesired( } void BackfillTable::StartRequesterLivenessMonitor() { - if (!requester_transaction_) return; + if (!requester_transaction_) { + return; + } if (PREDICT_FALSE(FLAGS_TEST_skip_ddl_requester_liveness_check)) { LOG_WITH_PREFIX(INFO) << "Skipping requester liveness monitor (TEST flag set)"; return; diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 3afb381c8c1f..49c719eb657a 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -6595,6 +6595,7 @@ Status CatalogManager::BackfillIndex( LOG(WARNING) << "BackfillIndex: failed to decode requester transaction: " << result.status(); } } + return MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( this, indexed_table, current_version, epoch, std::move(requester_txn), /* respect_backfill_deferrals */ false, /* update_ysql_to_backfill */ true); From d2aa0c832fb872b3503d99e0e45d145f6013b110 Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 8 May 2026 11:40:43 -0700 Subject: [PATCH 11/15] Update src/yb/master/backfill_index.cc Co-authored-by: jasonyb <93959687+jasonyb@users.noreply.github.com> --- src/yb/master/backfill_index.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/yb/master/backfill_index.cc b/src/yb/master/backfill_index.cc index bb1f5287b7c3..6214bda925bd 100644 --- a/src/yb/master/backfill_index.cc +++ b/src/yb/master/backfill_index.cc @@ -1145,7 +1145,6 @@ void BackfillTable::StopLivenessMonitor() { } } - Status BackfillTable::Abort() { LOG(WARNING) << "Backfill failed/aborted."; RETURN_NOT_OK(MarkAllIndexesAsFailed()); From 8d8680c626725467723a6765689dd84c283c9129 Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 8 May 2026 11:41:50 -0700 Subject: [PATCH 12/15] Update src/yb/master/backfill_index.cc Co-authored-by: jasonyb <93959687+jasonyb@users.noreply.github.com> --- src/yb/master/backfill_index.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/yb/master/backfill_index.cc b/src/yb/master/backfill_index.cc index 6214bda925bd..bc4323e9103a 100644 --- a/src/yb/master/backfill_index.cc +++ b/src/yb/master/backfill_index.cc @@ -518,7 +518,7 @@ Status MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( VLOG(1) << "Sending alter table request with updated permissions"; // Store the requester transaction so StartBackfillingData can retrieve it when the // permission change reaches DO_BACKFILL and the second call launches backfill. - // Store current_version+1 (the new version after this permission update) so Take can + // Store current_version+1 (the new version after this permission update) so TakePendingBackfillRequesterTransaction can // verify the transaction belongs to this exact backfill attempt and not a stale one. if (requester_transaction) { indexed_table->SetPendingBackfillRequesterTransaction( From 16646e900a59421f9cb294c6907099162fc3be3d Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 8 May 2026 11:54:16 -0700 Subject: [PATCH 13/15] Lint fixes --- src/yb/master/backfill_index.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/yb/master/backfill_index.cc b/src/yb/master/backfill_index.cc index bc4323e9103a..76eaad055010 100644 --- a/src/yb/master/backfill_index.cc +++ b/src/yb/master/backfill_index.cc @@ -518,8 +518,9 @@ Status MultiStageAlterTable::LaunchNextTableInfoVersionIfNecessary( VLOG(1) << "Sending alter table request with updated permissions"; // Store the requester transaction so StartBackfillingData can retrieve it when the // permission change reaches DO_BACKFILL and the second call launches backfill. - // Store current_version+1 (the new version after this permission update) so TakePendingBackfillRequesterTransaction can - // verify the transaction belongs to this exact backfill attempt and not a stale one. + // Store current_version+1 (the new version after this permission update) + // so TakePendingBackfillRequesterTransaction can verify the transaction + // belongs to this exact backfill attempt and not a stale one. if (requester_transaction) { indexed_table->SetPendingBackfillRequesterTransaction( std::move(requester_transaction), current_version + 1); From 7c3c66d49602e2a183e704434020cf0e9963055d Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Fri, 8 May 2026 18:11:14 -0700 Subject: [PATCH 14/15] as per discussion, revert the changes around VERIFY_RESULT --- src/yb/tserver/pg_client_session.cc | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index a06cc1a9b2ee..33a1156ea23a 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -2057,12 +2057,18 @@ class PgClientSession::Impl { // The PG backend holds a DDL transaction open for the entire backfill duration // (StartTransactionCommand at indexcmds.c:2334). Pass it to the master so it can detect when // this backend is killed (-> txn aborted) and stop launching new backfill chunks. - const auto* metadata = VERIFY_RESULT(GetDdlTransactionMetadata( + auto meta = GetDdlTransactionMetadata( true /* use_transaction */, false /* use_regular_transaction_block */, - context->GetClientDeadline(), IsTxnUsingTableLocks(false))); + context->GetClientDeadline(), IsTxnUsingTableLocks(false)); + std::optional txn_metadata; + if (meta.ok() && *meta) { + txn_metadata = **meta; + } else { + VLOG(1) << "BackfillIndex: no DDL transaction metadata available for cancellation detection"; + } return client_.BackfillIndex( PgObjectId::GetYbTableIdFromPB(req.table_id()), - metadata ? std::make_optional(*metadata) : std::nullopt, + std::move(txn_metadata), /* wait= */ true, context->GetClientDeadline()); } From c5b9ba9cd26d1f13c5a7501bf373b3b9dfd3441c Mon Sep 17 00:00:00 2001 From: eg <1139932+egladysh@users.noreply.github.com> Date: Sat, 9 May 2026 00:50:52 -0700 Subject: [PATCH 15/15] Address review comments: log bad status, fix non-ASCII chars, use conn_ --- src/yb/tserver/pg_client_session.cc | 6 +++--- .../yql/pgwrapper/pg_index_backfill-test.cc | 20 +++++++++---------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/yb/tserver/pg_client_session.cc b/src/yb/tserver/pg_client_session.cc index 33a1156ea23a..d963a631bf4e 100644 --- a/src/yb/tserver/pg_client_session.cc +++ b/src/yb/tserver/pg_client_session.cc @@ -2061,10 +2061,10 @@ class PgClientSession::Impl { true /* use_transaction */, false /* use_regular_transaction_block */, context->GetClientDeadline(), IsTxnUsingTableLocks(false)); std::optional txn_metadata; - if (meta.ok() && *meta) { + if (!meta.ok()) { + VLOG(1) << "BackfillIndex: failed to get DDL transaction metadata: " << meta.status(); + } else if (*meta) { txn_metadata = **meta; - } else { - VLOG(1) << "BackfillIndex: no DDL transaction metadata available for cancellation detection"; } return client_.BackfillIndex( PgObjectId::GetYbTableIdFromPB(req.table_id()), diff --git a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc index 45196a384683..a0eae1808ebb 100644 --- a/src/yb/yql/pgwrapper/pg_index_backfill-test.cc +++ b/src/yb/yql/pgwrapper/pg_index_backfill-test.cc @@ -3490,9 +3490,8 @@ class PgIndexBackfillCancellationTest : public PgIndexBackfillTest { }, 30s * kTimeMultiplier, "first BackfillIndex RPC to complete")); - auto killer_conn = VERIFY_RESULT(ConnectToDB(kDatabaseName)); auto terminated = VERIFY_RESULT( - killer_conn.FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid))); + conn_->FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid))); if (!terminated) { return STATUS(IllegalState, "pg_terminate_backend returned false"); } @@ -3559,7 +3558,7 @@ TEST_P(PgIndexBackfillCancellationWithoutFixTest, auto [rpcs_before, rpcs_after] = ASSERT_RESULT(RunBackfillAndKillMidway()); EXPECT_GT(rpcs_after, rpcs_before) << "Expected BackfillIndex RPCs to continue after pg_terminate_backend " - "(liveness check disabled — demonstrating pre-fix behavior), but they stopped"; + "(liveness check disabled -- demonstrating pre-fix behavior), but they stopped"; } // Tests that kill the CREATE INDEX backend BEFORE any tserver BackfillIndex RPCs have been @@ -3630,13 +3629,12 @@ class PgIndexBackfillCancellationEarlyKillTest : public PgIndexBackfillCancellat auto rpcs = VERIFY_RESULT(TotalBackfillRpcCalls(cluster_.get())); if (rpcs != 0) { return STATUS_FORMAT(IllegalState, - "BackfillIndex RPCs already started — kill window was missed ($0 RPCs). " + "BackfillIndex RPCs already started -- kill window was missed ($0 RPCs). " "Increase kAlterTableDelay or reduce system load.", rpcs); } - auto killer_conn = VERIFY_RESULT(ConnectToDB(kDatabaseName)); auto terminated = VERIFY_RESULT( - killer_conn.FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid))); + conn_->FetchRow(Format("SELECT pg_terminate_backend($0)", create_index_pid))); if (!terminated) { return STATUS(IllegalState, "pg_terminate_backend returned false"); } @@ -3645,20 +3643,20 @@ class PgIndexBackfillCancellationEarlyKillTest : public PgIndexBackfillCancellat return Status::OK(); } - // Each sleep inside UpdateIndexPermission lasts this long (two sleeps -> 2× total delay). + // Each sleep inside UpdateIndexPermission lasts this long (two sleeps -> 2x total delay). const MonoDelta kAlterTableDelay = 4s; }; INSTANTIATE_TEST_CASE_P(, PgIndexBackfillCancellationEarlyKillTest, ::testing::Bool()); // Positive test: with the fix, the DdlRequesterLivenessTask receives the requester_transaction -// from the placeholder BackfillTable and aborts backfill after ≤ 2 chunks. +// from the placeholder BackfillTable and aborts backfill after <= 2 chunks. TEST_P(PgIndexBackfillCancellationEarlyKillTest, YB_DISABLE_TEST_IN_SANITIZERS(BackfillStopsAfterEarlyBackendKill)) { ASSERT_OK(KillBackendBeforeBackfill()); - // Allow enough time for all 4 chunks (4 × kSlowdown = 8s) plus a generous margin. - // If the liveness task did not fire, all 4 complete; with the fix, ≤ 2 complete. + // Allow enough time for all 4 chunks (4 x kSlowdown = 8s) plus a generous margin. + // If the liveness task did not fire, all 4 complete; with the fix, <= 2 complete. SleepFor(kSlowdown * 4 + 5s); auto rpcs_final = ASSERT_RESULT(TotalBackfillRpcCalls(cluster_.get())); LOG(INFO) << "BackfillIndex RPC count after observation window: " << rpcs_final; @@ -3717,7 +3715,7 @@ TEST_P(PgIndexBackfillCancellationEarlyKillWithoutFixTest, // With liveness monitoring disabled (TEST_skip_ddl_requester_liveness_check) and DDL // verification disabled (TEST_skip_transaction_verification), all 4 chunks run to completion. - // Contrast with the positive test where ≤ 2 chunks complete before the liveness task fires + // Contrast with the positive test where <= 2 chunks complete before the liveness task fires // and aborts the backfill. EXPECT_GE(rpcs_final, 4) << "Expected 4 BackfillIndex RPCs (liveness check and DDL verification disabled), "