diff --git a/src/postgres/src/backend/access/yb_access/yb_scan.c b/src/postgres/src/backend/access/yb_access/yb_scan.c index 39f9bf986813..913eaa257104 100644 --- a/src/postgres/src/backend/access/yb_access/yb_scan.c +++ b/src/postgres/src/backend/access/yb_access/yb_scan.c @@ -5242,6 +5242,104 @@ YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode, return res; } +TM_Result +YBCLockTupleBatch(Relation relation, Datum *ybctids, int count, + RowMarkType mode, EState *estate, + int *locked_index) +{ + Assert(count > 0); + + const Oid relfile_oid = YbGetRelfileNodeId(relation); + + YbcPgStatement ybc_stmt = YbNewSelect(relation, NULL /* prepare_params */); + + /* + * Bind the first ybctid as the regular column value (needed for partition + * routing). All ybctids are also added as batch_arguments for the tserver + * to iterate through. + */ + YbcPgExpr ybctid_expr = YBCNewConstant(ybc_stmt, BYTEAOID, InvalidOid, + ybctids[0], false); + HandleYBStatus(YBCPgDmlBindColumn(ybc_stmt, YBTupleIdAttributeNumber, + ybctid_expr)); + + for (int i = 0; i < count; i++) + { + Datum d = ybctids[i]; + const char *data = VARDATA_ANY(DatumGetPointer(d)); + size_t len = VARSIZE_ANY_EXHDR(DatumGetPointer(d)); + + HandleYBStatus(YBCPgDmlAddBatchYbctidArg(ybc_stmt, data, len)); + } + + YbcPgExecParameters exec_params = {0}; + exec_params.limit_count = count; + exec_params.rowmark = mode; + exec_params.pg_wait_policy = LockWaitSkip; + exec_params.docdb_wait_policy = YBGetDocDBWaitPolicy(LockWaitSkip); + exec_params.stmt_in_txn_limit_ht_for_reads = + estate->yb_exec_params.stmt_in_txn_limit_ht_for_reads; + + HandleYBStatus(YBCPgExecSelect(ybc_stmt, &exec_params)); + + /* + * We must fetch at least once to trigger the RPC and populate the + * response protobuf (which carries first_locked_batch_arg_index). + * + * Like YBCLockTuple, catch conflict/skip-locking errors from the fetch + * rather than raising them — for SKIP LOCKED, these mean "all candidates + * were contended" and should return TM_WouldBlock, not abort the statement. + */ + bool has_data = false; + YbcStatus status = YBCPgDmlFetch(ybc_stmt, 0, NULL, NULL, NULL, &has_data); + + if (status) + { + const uint32_t err_code = YBCStatusPgsqlError(status); + + switch (err_code) + { + case ERRCODE_YB_TXN_CONFLICT: + case ERRCODE_YB_TXN_SKIP_LOCKING: + /* Conflict or skip during batch lock — treat as all-skipped. */ + YBCFreeStatus(status); + *locked_index = -1; + YBCPgDeleteStatement(ybc_stmt); + return TM_WouldBlock; + default: + HandleYBStatus(status); + break; + } + YBCFreeStatus(status); + } + + /* + * Read the first_locked_batch_arg_index from the response. + * The tserver tries each batch_argument in order and returns the index + * of the first one it successfully locked. + */ + int32_t winner = -1; + + HandleYBStatus(YBCPgDmlGetFirstLockedBatchArgIndex(ybc_stmt, &winner)); + + TM_Result res; + + if (winner >= 0 && winner < count) + { + *locked_index = winner; + YBCPgAddIntoForeignKeyReferenceCache(relfile_oid, ybctids[winner]); + res = TM_Ok; + } + else + { + *locked_index = -1; + res = TM_WouldBlock; + } + + YBCPgDeleteStatement(ybc_stmt); + return res; +} + void YBCFlushTupleLocks() { diff --git a/src/postgres/src/backend/executor/nodeLockRows.c b/src/postgres/src/backend/executor/nodeLockRows.c index f2254ce6f98b..33533ff1aee7 100644 --- a/src/postgres/src/backend/executor/nodeLockRows.c +++ b/src/postgres/src/backend/executor/nodeLockRows.c @@ -31,8 +31,204 @@ /* YB includes */ #include "access/yb_scan.h" +#include "utils/datum.h" +#include "utils/guc.h" +/* + * ExecLockRowsBatchSkipLocked + * + * Optimized batch path for YB SKIP LOCKED. Pre-fetches up to batch_size + * candidate tuples from the scan, sends all their ybctids in a single RPC, + * and the tserver locks the first unlocked one. + * + * Returns the winning slot (with the locked tuple restored) or NULL if all + * candidates in this batch were skipped (caller should retry) or the scan + * is exhausted. + * + * Candidates after the winner are saved into node->yb_batch_leftover_* so + * subsequent ExecLockRows calls can try them via the per-row path without + * losing rows consumed from the scan. + * + * *scan_exhausted is set to true when ExecProcNode returned NULL, meaning + * there are no more rows to try. + */ +static TupleTableSlot * +ExecLockRowsBatchSkipLocked(LockRowsState *node, + TupleTableSlot *first_slot, + ExecAuxRowMark *aerm, + ExecRowMark *erm, + EState *estate, + PlanState *outerPlan, + bool *scan_exhausted) +{ + int batch_size = yb_skip_locked_batch_size; + Datum *batch_ybctids; + HeapTuple *batch_tuples; + int batch_count = 0; + TupleTableSlot *slot = first_slot; + int locked_index; + TM_Result result; + + *scan_exhausted = false; + + batch_ybctids = (Datum *) palloc(sizeof(Datum) * batch_size); + batch_tuples = (HeapTuple *) palloc(sizeof(HeapTuple) * batch_size); + + /* + * Collect candidate ybctids. The first slot was already fetched by the + * caller; fetch up to batch_size - 1 more. + */ + for (;;) + { + Datum datum; + bool isNull; + + datum = ExecGetJunkAttribute(slot, aerm->ctidAttNo, &isNull); + if (isNull) + elog(ERROR, "ctid/ybctid is NULL"); + + batch_ybctids[batch_count] = datumCopy(datum, false, -1); + batch_tuples[batch_count] = ExecCopySlotHeapTuple(slot); + batch_count++; + + if (batch_count >= batch_size) + break; + + slot = ExecProcNode(outerPlan); + if (TupIsNull(slot)) + { + *scan_exhausted = true; + break; + } + } + + if (batch_count == 0) + { + pfree(batch_ybctids); + pfree(batch_tuples); + return NULL; + } + + /* Single batch RPC to lock the first available candidate. */ + result = YBCLockTupleBatch(erm->relation, batch_ybctids, batch_count, + erm->markType, estate, &locked_index); + + slot = NULL; + + if (result == TM_Ok && locked_index >= 0 && locked_index < batch_count) + { + /* + * Winner found. Restore the winning tuple into the outer plan's + * result slot so the caller can return it. + */ + TupleTableSlot *result_slot = first_slot; + + ExecForceStoreHeapTuple(batch_tuples[locked_index], result_slot, true); + slot = result_slot; + + /* Free candidates before the winner (they were skipped by tserver). */ + for (int i = 0; i < locked_index; i++) + { + pfree(DatumGetPointer(batch_ybctids[i])); + heap_freetuple(batch_tuples[i]); + } + /* Winner ybctid datum no longer needed (tuple is in the slot). */ + pfree(DatumGetPointer(batch_ybctids[locked_index])); + + /* + * Save candidates after the winner as leftovers. These were NOT tried + * by the tserver (it stops at the first success), so subsequent + * ExecLockRows calls will try them via the per-row path. + */ + int leftover_count = batch_count - locked_index - 1; + if (leftover_count > 0) + { + node->yb_batch_leftover_tuples = (HeapTuple *) + palloc(sizeof(HeapTuple) * leftover_count); + node->yb_batch_leftover_ybctids = (Datum *) + palloc(sizeof(Datum) * leftover_count); + for (int i = 0; i < leftover_count; i++) + { + int src = locked_index + 1 + i; + node->yb_batch_leftover_tuples[i] = batch_tuples[src]; + node->yb_batch_leftover_ybctids[i] = batch_ybctids[src]; + } + node->yb_batch_leftover_count = leftover_count; + node->yb_batch_leftover_idx = 0; + } + } + else + { + /* All candidates were skipped. Free everything. */ + for (int i = 0; i < batch_count; i++) + { + pfree(DatumGetPointer(batch_ybctids[i])); + heap_freetuple(batch_tuples[i]); + } + } + + pfree(batch_ybctids); + pfree(batch_tuples); + return slot; +} + +/* + * ExecLockRowsTryLeftover + * + * Try to lock the next leftover candidate from a previous batch RPC using + * the per-row YBCLockTuple path. Returns a slot if a row was locked, or + * NULL if the leftover was skipped (caller should try the next one or + * fall through to the scan). + */ +static TupleTableSlot * +ExecLockRowsTryLeftover(LockRowsState *node, + ExecRowMark *erm, + EState *estate, + TupleTableSlot *result_slot) +{ + while (node->yb_batch_leftover_idx < node->yb_batch_leftover_count) + { + int idx = node->yb_batch_leftover_idx++; + Datum ybctid = node->yb_batch_leftover_ybctids[idx]; + HeapTuple tup = node->yb_batch_leftover_tuples[idx]; + TM_Result res; + + res = YBCLockTuple(erm->relation, ybctid, erm->markType, + LockWaitSkip, estate); + + pfree(DatumGetPointer(ybctid)); + + if (res == TM_Ok) + { + ExecForceStoreHeapTuple(tup, result_slot, true); + + /* Free remaining leftovers if this is the last one we need. */ + /* (They'll be freed when the next call consumes or exhausts them.) */ + return result_slot; + } + + /* Skipped — free this tuple and try the next leftover. */ + heap_freetuple(tup); + } + + /* All leftovers exhausted. Clean up. */ + if (node->yb_batch_leftover_tuples) + { + pfree(node->yb_batch_leftover_tuples); + node->yb_batch_leftover_tuples = NULL; + } + if (node->yb_batch_leftover_ybctids) + { + pfree(node->yb_batch_leftover_ybctids); + node->yb_batch_leftover_ybctids = NULL; + } + node->yb_batch_leftover_count = 0; + node->yb_batch_leftover_idx = 0; + + return NULL; +} + /* ---------------------------------------------------------------- * ExecLockRows * ---------------------------------------------------------------- @@ -59,6 +255,26 @@ ExecLockRows(PlanState *pstate) * Get next tuple from subplan, if any. */ lnext: + /* + * If we have leftover candidates from a previous batch SKIP LOCKED RPC, + * try to lock them one-by-one before pulling new rows from the scan. + * This is needed for LIMIT > 1: the batch path only returns the first + * winner, and leftover candidates may still be lockable. + */ + if (node->yb_batch_leftover_count > node->yb_batch_leftover_idx) + { + ExecAuxRowMark *aerm = (ExecAuxRowMark *) linitial(node->lr_arowMarks); + ExecRowMark *erm = aerm->rowmark; + TupleTableSlot *leftover_result; + + /* Use the outer plan's result slot — same slot the batch path uses. */ + leftover_result = ExecLockRowsTryLeftover(node, erm, estate, + outerPlan->ps_ResultTupleSlot); + if (leftover_result != NULL) + return leftover_result; + /* All leftovers exhausted, fall through to scan for more rows. */ + } + slot = ExecProcNode(outerPlan); if (node->yb_are_row_marks_for_yb_rels && @@ -81,6 +297,39 @@ ExecLockRows(PlanState *pstate) /* We don't need EvalPlanQual unless we get updated tuple version(s) */ epq_needed = false; + /* + * YB batch SKIP LOCKED optimization: when we have exactly one YB row mark + * with SKIP LOCKED policy and batch size > 1, pre-fetch multiple candidates + * and lock the first available one in a single RPC. + */ + if (node->yb_are_row_marks_for_yb_rels && + yb_skip_locked_batch_size > 1 && + list_length(node->lr_arowMarks) == 1) + { + ExecAuxRowMark *aerm = (ExecAuxRowMark *) linitial(node->lr_arowMarks); + ExecRowMark *erm = aerm->rowmark; + + if (erm->waitPolicy == LockWaitSkip) + { + bool scan_exhausted; + TupleTableSlot *result; + + result = ExecLockRowsBatchSkipLocked(node, slot, aerm, erm, + estate, outerPlan, + &scan_exhausted); + if (result != NULL) + return result; + + /* All candidates were skipped. Try more if scan has rows left. */ + if (!scan_exhausted) + goto lnext; + + /* Scan exhausted, no rows locked. */ + EvalPlanQualEnd(&node->lr_epqstate); + return NULL; + } + } + /* * Attempt to lock the source tuple(s). (Note we only have locking * rowmarks in lr_arowMarks.) @@ -452,6 +701,21 @@ ExecInitLockRows(LockRows *node, EState *estate, int eflags) void ExecEndLockRows(LockRowsState *node) { + /* Free any leftover batch SKIP LOCKED candidates. */ + if (node->yb_batch_leftover_tuples) + { + for (int i = node->yb_batch_leftover_idx; + i < node->yb_batch_leftover_count; i++) + { + pfree(DatumGetPointer(node->yb_batch_leftover_ybctids[i])); + heap_freetuple(node->yb_batch_leftover_tuples[i]); + } + pfree(node->yb_batch_leftover_tuples); + pfree(node->yb_batch_leftover_ybctids); + node->yb_batch_leftover_tuples = NULL; + node->yb_batch_leftover_ybctids = NULL; + } + /* We may have shut down EPQ already, but no harm in another call */ EvalPlanQualEnd(&node->lr_epqstate); ExecEndNode(outerPlanState(node)); diff --git a/src/postgres/src/backend/utils/misc/guc.c b/src/postgres/src/backend/utils/misc/guc.c index ffd1aaabcf36..8bf014e7881f 100644 --- a/src/postgres/src/backend/utils/misc/guc.c +++ b/src/postgres/src/backend/utils/misc/guc.c @@ -178,6 +178,8 @@ static int yb_tcmalloc_sample_period = 1024 * 1024; /* 1MB */ uint64_t yb_conn_mgr_sighup_logical_client_version = 0; bool yb_conn_mgr_sighup_had_backend_guc_change = false; +int yb_skip_locked_batch_size = 32; + static int GUC_check_errcode_value; static List *reserved_class_prefix = NIL; @@ -4101,6 +4103,18 @@ static struct config_int ConfigureNamesInt[] = 1024, 1, INT_MAX, check_yb_explicit_row_locking_batch_size, NULL, NULL }, + { + {"yb_skip_locked_batch_size", PGC_USERSET, QUERY_TUNING_OTHER, + gettext_noop("Batch size for SKIP LOCKED candidate prefetch"), + gettext_noop("Number of candidate rows to send in a single RPC " + "when using SELECT ... FOR UPDATE SKIP LOCKED. " + "Set to 1 to disable batching."), + GUC_NOT_IN_SAMPLE + }, + &yb_skip_locked_batch_size, + 32, 1, 1024, + NULL, NULL, NULL + }, { {"default_statistics_target", PGC_USERSET, QUERY_TUNING_OTHER, gettext_noop("Sets the default statistics target."), diff --git a/src/postgres/src/include/access/yb_scan.h b/src/postgres/src/include/access/yb_scan.h index 04bbf615a237..fc2547f22882 100644 --- a/src/postgres/src/include/access/yb_scan.h +++ b/src/postgres/src/include/access/yb_scan.h @@ -378,6 +378,17 @@ extern void ybcIndexCostEstimate(struct PlannerInfo *root, IndexPath *path, extern TM_Result YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode, LockWaitPolicy wait_policy, EState *estate); +/* + * Try to lock one of several candidate rows in SKIP LOCKED mode. + * Sends all ybctids in a single RPC; the tserver tries each in order + * and locks the first unlocked one. + * Returns TM_Ok if a row was locked (*locked_index set to 0-based winner), + * or TM_WouldBlock if all were skipped (*locked_index set to -1). + */ +extern TM_Result YBCLockTupleBatch(Relation relation, Datum *ybctids, int count, + RowMarkType mode, EState *estate, + int *locked_index); + /* * Fetch a single row for given ybctid into a heap-tuple. * This API is needed for reading data from a catalog (system table). diff --git a/src/postgres/src/include/nodes/execnodes.h b/src/postgres/src/include/nodes/execnodes.h index d6cfea97b865..cdcc59533983 100644 --- a/src/postgres/src/include/nodes/execnodes.h +++ b/src/postgres/src/include/nodes/execnodes.h @@ -3052,6 +3052,17 @@ typedef struct LockRowsState bool yb_are_row_marks_for_yb_rels; /* lr_arowMarks relates to YB * relations */ + + /* + * Leftover candidates from a batch SKIP LOCKED RPC. When the batch path + * locks the first unlocked row, candidates after the winner are saved here + * so subsequent ExecLockRows calls can try them via the per-row path + * instead of losing them. + */ + HeapTuple *yb_batch_leftover_tuples; + Datum *yb_batch_leftover_ybctids; + int yb_batch_leftover_count; + int yb_batch_leftover_idx; } LockRowsState; /* ---------------- diff --git a/src/postgres/src/include/utils/guc.h b/src/postgres/src/include/utils/guc.h index f225b74a4acb..daf7a79a5c1c 100644 --- a/src/postgres/src/include/utils/guc.h +++ b/src/postgres/src/include/utils/guc.h @@ -307,6 +307,7 @@ extern PGDLLIMPORT int yb_bnl_batch_size; extern PGDLLIMPORT bool yb_bnl_optimize_first_batch; extern PGDLLIMPORT bool yb_bnl_enable_hashing; extern PGDLLIMPORT int yb_explicit_row_locking_batch_size; +extern PGDLLIMPORT int yb_skip_locked_batch_size; extern PGDLLIMPORT bool yb_lock_pk_single_rpc; extern PGDLLIMPORT int yb_toast_catcache_threshold; extern PGDLLIMPORT bool yb_enable_fkey_catcache; diff --git a/src/yb/common/pgsql_protocol.proto b/src/yb/common/pgsql_protocol.proto index 04a071b7bea3..5cffa6908289 100644 --- a/src/yb/common/pgsql_protocol.proto +++ b/src/yb/common/pgsql_protocol.proto @@ -727,6 +727,10 @@ message PgsqlResponsePB { // Metrics changes as a result of processing the request. optional PgsqlRequestMetricsPB metrics = 17; + + // For batch SKIP LOCKED: index (0-based) within batch_arguments of the first row that was + // successfully locked. Absent means no row was locked (all were skipped or the batch was empty). + optional int32 first_locked_batch_arg_index = 22; } message PgsqlAdvisoryLockPB { diff --git a/src/yb/docdb/pgsql_operation.cc b/src/yb/docdb/pgsql_operation.cc index 1af08b24653e..77840e94caf9 100644 --- a/src/yb/docdb/pgsql_operation.cc +++ b/src/yb/docdb/pgsql_operation.cc @@ -3172,6 +3172,29 @@ Status GetIntents( return Status::OK(); } +Status GetIntentsForBatchArg( + const PgsqlReadRequestPB& request, int batch_arg_index, const Schema& schema, + IsolationLevel level, LWKeyValueWriteBatchPB* out) { + DCHECK_GE(batch_arg_index, 0); + DCHECK_LT(batch_arg_index, request.batch_arguments_size()); + + const auto row_mark = request.has_row_mark_type() ? request.row_mark_type() : ROW_MARK_ABSENT; + if (IsValidRowMarkType(row_mark)) { + RSTATUS_DCHECK(request.has_wait_policy(), IllegalState, "wait policy is expected"); + out->set_wait_policy(request.wait_policy()); + } + + IntentInserter inserter(out); + DocKeyAccessor accessor(schema); + const IntentMode mode{ + level, row_mark, KeyOnlyRequested(IsOnlyKeyColumnsRequested(schema, request))}; + const auto& batch_argument = request.batch_arguments(batch_arg_index); + auto slice = VERIFY_RESULT(accessor.GetEncoded(batch_argument.ybctid())); + bool inlcudes_pk = static_cast(accessor.pk_is_known()); + inserter.Add(slice, inlcudes_pk, mode); + return Status::OK(); +} + PgsqlLockOperation::PgsqlLockOperation( std::reference_wrapper request, const TransactionOperationContext& txn_op_context) diff --git a/src/yb/docdb/pgsql_operation.h b/src/yb/docdb/pgsql_operation.h index b6f8f0708f48..f120259813a6 100644 --- a/src/yb/docdb/pgsql_operation.h +++ b/src/yb/docdb/pgsql_operation.h @@ -258,6 +258,12 @@ Status GetIntents( const PgsqlReadRequestMsg& request, const Schema& schema, IsolationLevel level, LWKeyValueWriteBatchPB* out); +// Variant of GetIntents that processes only a single batch_argument at the given index. +// Used for batch SKIP LOCKED: we try each candidate row's lock individually. +Status GetIntentsForBatchArg( + const PgsqlReadRequestMsg& request, int batch_arg_index, const Schema& schema, + IsolationLevel level, LWKeyValueWriteBatchPB* out); + class PgsqlLockOperation : public DocOperationBase { public: diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 1f15a57fa2e3..5a17314d7a06 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -4623,6 +4623,20 @@ Status Tablet::CreateReadIntents( return Status::OK(); } +Status Tablet::CreateReadIntentForBatchArg( + IsolationLevel level, + const PgsqlReadRequestMsg& pgsql_read, + int batch_arg_index, + docdb::LWKeyValueWriteBatchPB* write_batch) { + auto table_info = metadata_->primary_table_info(); + if (table_info == nullptr || table_info->table_id != pgsql_read.table_id()) { + table_info = VERIFY_RESULT(metadata_->GetTableInfo(pgsql_read.table_id())); + } + RETURN_NOT_OK(docdb::GetIntentsForBatchArg( + pgsql_read, batch_arg_index, table_info->schema(), level, write_batch)); + return Status::OK(); +} + bool Tablet::ShouldApplyWrite() { auto scoped_read_operation = CreateScopedRWOperationBlockingRocksDbShutdownStart(); if (!scoped_read_operation.ok()) { diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 4d6e777f9f9c..edc6281da447 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -697,6 +697,14 @@ class Tablet : public AbstractTablet, const PgsqlReadRequestMsgs& pgsql_batch, docdb::LWKeyValueWriteBatchPB* out); + // Creates read intents for a single batch_argument of a pgsql read request. + // Used for batch SKIP LOCKED: we try each candidate row individually. + Status CreateReadIntentForBatchArg( + IsolationLevel level, + const PgsqlReadRequestMsg& pgsql_read, + int batch_arg_index, + docdb::LWKeyValueWriteBatchPB* out); + uint64_t GetCurrentVersionSstFilesSize() const; uint64_t GetCurrentVersionSstFilesUncompressedSize() const; std::pair GetCurrentVersionSstFilesAllSizes() const; diff --git a/src/yb/tserver/read_query.cc b/src/yb/tserver/read_query.cc index c901858a3cd4..c6ebf6973564 100644 --- a/src/yb/tserver/read_query.cc +++ b/src/yb/tserver/read_query.cc @@ -15,6 +15,7 @@ #include "yb/common/row_mark.h" #include "yb/common/transaction.h" +#include "yb/common/transaction_error.h" #include "yb/gutil/bind.h" #include "yb/master/sys_catalog_constants.h" @@ -179,6 +180,18 @@ class ReadQuery : public std::enable_shared_from_this, public rpc::Th retained_self_ = nullptr; } + // Tries to lock the batch_argument at the given index for a batch SKIP LOCKED request. + // On success, sets first_locked_batch_arg_index in the pgsql response and proceeds to read. + // On kSkipLocking, recursively tries the next batch_argument. + void TryLockBatchArg( + int batch_arg_index, + int total_batch_args, + const PgsqlReadRequestMsg& pgsql_read, + IsolationLevel isolation_level, + int64_t leader_term, + tablet::TabletPeerPtr peer, + tablet::TabletPtr tablet_ptr); + TabletServerIf& server_; ReadTabletProvider& read_tablet_provider_; const ReadRequestMsg* req_; @@ -197,6 +210,9 @@ class ReadQuery : public std::enable_shared_from_this, public rpc::Th RequestScope request_scope_; std::shared_ptr retained_self_; std::shared_ptr tablet_consensus_info_; + + // For batch SKIP LOCKED: index of the winning batch_argument, or -1 if not applicable. + int first_locked_batch_arg_index_ = -1; }; bool ReadQuery::transactional() const { @@ -414,6 +430,29 @@ Status ReadQuery::DoPerform() { if (serializable_isolation || has_row_mark) { tserver::WriteResponseMsg* response = nullptr; + + // Check for batch SKIP LOCKED: single pgsql read with WAIT_SKIP and multiple batch_arguments. + // Instead of locking all rows at once (which fails atomically on first conflict), + // try each candidate row individually and return the first one that succeeds. + bool is_batch_skip_locked = false; + if (has_row_mark && !serializable_isolation && req_->pgsql_batch_size() == 1) { + const auto& pgsql_read = req_->pgsql_batch(0); + if (pgsql_read.has_wait_policy() && + pgsql_read.wait_policy() == WAIT_SKIP && + pgsql_read.batch_arguments_size() > 1) { + is_batch_skip_locked = true; + } + } + + if (is_batch_skip_locked) { + // Batch SKIP LOCKED: iterate through candidates one at a time. + const auto& pgsql_read = req_->pgsql_batch(0); + TryLockBatchArg( + 0, pgsql_read.batch_arguments_size(), pgsql_read, + isolation_level, leader_peer.leader_term, leader_peer.peer, leader_peer.tablet); + return Status::OK(); + } + const bool use_async_write = req_->use_async_write(); if (use_async_write) { response = req_->arena().ArenaObjectFactory(); @@ -734,11 +773,39 @@ Result ReadQuery::DoReadImpl() { if (!req_->pgsql_batch().empty()) { size_t total_num_rows_read = 0; - for (const auto& pgsql_read_req : req_->pgsql_batch()) { + for (int pgsql_idx = 0; pgsql_idx < req_->pgsql_batch_size(); ++pgsql_idx) { + const auto& pgsql_read_req = req_->pgsql_batch(pgsql_idx); + + // For batch SKIP LOCKED: create a modified request for the read phase. + // If we locked a row (>= 0), include only the winning batch_argument. + // If all were skipped (-1), clear batch_arguments to return zero rows. + PgsqlReadRequestMsg* modified_req = nullptr; + const PgsqlReadRequestMsg* effective_req = &pgsql_read_req; + if (pgsql_idx == 0 && pgsql_read_req.batch_arguments_size() > 1 && + pgsql_read_req.has_wait_policy() && pgsql_read_req.wait_policy() == WAIT_SKIP) { + if (first_locked_batch_arg_index_ >= 0) { + modified_req = req_->arena().NewArenaObject(pgsql_read_req); + modified_req->clear_batch_arguments(); + int arg_index = 0; + for (const auto& batch_argument : pgsql_read_req.batch_arguments()) { + if (arg_index++ == first_locked_batch_arg_index_) { + *modified_req->add_batch_arguments() = batch_argument; + break; + } + } + effective_req = modified_req; + } else if (first_locked_batch_arg_index_ == -1) { + // All candidates were skipped -- return empty result. + modified_req = req_->arena().NewArenaObject(pgsql_read_req); + modified_req->clear_batch_arguments(); + effective_req = modified_req; + } + } + tablet::PgsqlReadRequestResult result(resp_->arena(), &context_.sidecars().Start()); TRACE("Start HandlePgsqlReadRequest"); RETURN_NOT_OK(abstract_tablet_->HandlePgsqlReadRequest( - read_operation_data, !allow_retry_ /* is_explicit_request_read_time */, pgsql_read_req, + read_operation_data, !allow_retry_ /* is_explicit_request_read_time */, *effective_req, req_->transaction(), req_->subtransaction(), &result)); total_num_rows_read += result.num_rows_read; @@ -747,6 +814,17 @@ Result ReadQuery::DoReadImpl() { if (result.read_restart_data.is_valid()) { return FormReadRestartInfo(result.read_restart_data); } + // For batch SKIP LOCKED: tell pggate that ALL batch args were consumed (tried) + // even though we only read data for the winning one. This prevents + // CompleteProcessResponse from seeing batch_arg_count < batch_arguments_size + // and triggering additional Perform RPCs for the "remaining" args. + if (pgsql_idx == 0 && pgsql_read_req.batch_arguments_size() > 1 && + pgsql_read_req.has_wait_policy() && pgsql_read_req.wait_policy() == WAIT_SKIP) { + result.response->set_batch_arg_count(pgsql_read_req.batch_arguments_size()); + if (first_locked_batch_arg_index_ >= 0) { + result.response->set_first_locked_batch_arg_index(first_locked_batch_arg_index_); + } + } result.response->set_rows_data_sidecar( narrow_cast(context_.sidecars().Complete())); resp_->mutable_pgsql_batch()->push_back_ref(result.response); @@ -767,6 +845,85 @@ Result ReadQuery::DoReadImpl() { return ReadRestartInfo(); } +void ReadQuery::TryLockBatchArg( + int batch_arg_index, + int total_batch_args, + const PgsqlReadRequestMsg& pgsql_read, + IsolationLevel isolation_level, + int64_t leader_term, + tablet::TabletPeerPtr peer, + tablet::TabletPtr tablet_ptr) { + if (batch_arg_index >= total_batch_args) { + // All candidates were skipped. Proceed to read phase with no locked row. + // The response will have no first_locked_batch_arg_index set, and the read + // will return zero rows, signaling to postgres that all were skipped. + first_locked_batch_arg_index_ = -1; + retained_self_ = shared_from_this(); + peer->Enqueue(this); + return; + } + + // Build a WriteQuery with intents for just this one batch_argument. + auto query = std::make_unique( + leader_term, context_.GetClientDeadline(), peer.get(), + tablet_ptr, nullptr /* rpc_context */, nullptr /* response */); + + auto& write = *query->operation().AllocateRequest(); + auto& write_batch = *write.mutable_write_batch(); + *write_batch.mutable_transaction() = req_->transaction(); + + // Set row mark type from the pgsql read request. + auto row_mark = GetRowMarkTypeFromPB(pgsql_read); + if (IsValidRowMarkType(row_mark)) { + write_batch.set_row_mark_type(row_mark); + query->set_read_time(read_time_); + } + + write.ref_unused_tablet_id(""); // For backward compatibility. + write_batch.set_deprecated_may_have_metadata(true); + write.set_batch_idx(req_->batch_idx()); + if (req_->has_subtransaction() && req_->subtransaction().has_subtransaction_id()) { + write_batch.mutable_subtransaction()->set_subtransaction_id( + req_->subtransaction().subtransaction_id()); + } + if (req_->has_start_time_micros()) { + query->SetRequestStartUs(req_->start_time_micros()); + } + + // Create intents for just this single batch_argument. + auto status = tablet_ptr->CreateReadIntentForBatchArg( + isolation_level, pgsql_read, batch_arg_index, &write_batch); + if (!status.ok()) { + RespondFailure(status); + return; + } + + query->AdjustYsqlQueryTransactionality(req_->pgsql_batch_size()); + + query->set_callback( + [self = shared_from_this(), peer, tablet_ptr, batch_arg_index, total_batch_args, + pgsql_read = &pgsql_read, isolation_level, leader_term](const Status& status) { + if (status.ok()) { + // Lock acquired successfully for this batch_argument. + self->first_locked_batch_arg_index_ = batch_arg_index; + self->retained_self_ = self; + peer->Enqueue(self.get()); + } else if (TransactionError(status).value() != TransactionErrorCode::kNone) { + // For SKIP LOCKED, any transaction-level failure to lock a row (kSkipLocking, + // kConflict, kAborted, kDeadlock, etc.) means we cannot acquire this row. + // Skip it and try the next candidate rather than aborting the entire batch. + self->TryLockBatchArg( + batch_arg_index + 1, total_batch_args, *pgsql_read, + isolation_level, leader_term, peer, tablet_ptr); + } else { + // Non-transaction error -- propagate it. + self->RespondFailure(status); + } + }); + + peer->WriteAsync(std::move(query)); +} + } // namespace void PerformRead( diff --git a/src/yb/yql/pggate/pg_dml_read.cc b/src/yb/yql/pggate/pg_dml_read.cc index b13467034b34..7218409d8cb0 100644 --- a/src/yb/yql/pggate/pg_dml_read.cc +++ b/src/yb/yql/pggate/pg_dml_read.cc @@ -1039,4 +1039,24 @@ bool PgDmlRead::ActualValueForIsForSecondaryIndexArg(bool is_for_secondary_index return is_for_secondary_index && !IsPgSelectIndex(); } +void PgDmlRead::AddBatchYbctidArg(Slice ybctid) { + auto* batch_arg = read_req_->add_batch_arguments(); + batch_arg->mutable_ybctid()->mutable_value()->dup_binary_value(ybctid); + + // Also set ybctid_column_value to the smallest ybctid for backward compatibility. + if (!read_req_->has_ybctid_column_value() || + ybctid < read_req_->ybctid_column_value().value().binary_value()) { + read_req_->mutable_ybctid_column_value()->mutable_value()->ref_binary_value( + batch_arg->ybctid().value().binary_value()); + } +} + +Result PgDmlRead::GetFirstLockedBatchArgIndex() const { + if (!doc_op_) { + return STATUS(IllegalState, "No doc_op available"); + } + auto* read_op = down_cast(doc_op_.get()); + return read_op->GetFirstLockedBatchArgIndex(); +} + } // namespace yb::pggate diff --git a/src/yb/yql/pggate/pg_dml_read.h b/src/yb/yql/pggate/pg_dml_read.h index 99b2329bbc35..e7807c73208b 100644 --- a/src/yb/yql/pggate/pg_dml_read.h +++ b/src/yb/yql/pggate/pg_dml_read.h @@ -136,6 +136,14 @@ class PgDmlRead : public PgDml { [[nodiscard]] const LWPgsqlReadRequestPB* read_req() const { return read_req_.get(); } + // Add a ybctid as a batch_argument on the underlying read request. + // Used for batch SKIP LOCKED: sends multiple candidate ybctids in a single RPC. + void AddBatchYbctidArg(Slice ybctid); + + // Read first_locked_batch_arg_index from the response after execution. + // Returns -1 if the field is not present (i.e., not a batch SKIP LOCKED response). + Result GetFirstLockedBatchArgIndex() const; + [[nodiscard]] bool IsReadFromYsqlCatalog() const; [[nodiscard]] bool IsIndexOrderedScan() const; diff --git a/src/yb/yql/pggate/pg_doc_op.cc b/src/yb/yql/pggate/pg_doc_op.cc index 52a62f187e17..541f409bdc4e 100644 --- a/src/yb/yql/pggate/pg_doc_op.cc +++ b/src/yb/yql/pggate/pg_doc_op.cc @@ -1396,6 +1396,17 @@ void PgDocReadOp::ResetInactivePgsqlOps() { } } +int32_t PgDocReadOp::GetFirstLockedBatchArgIndex() const { + if (pgsql_ops_.empty() || !pgsql_ops_[0]->response()) { + return -1; + } + const auto* resp = pgsql_ops_[0]->response(); + if (!resp->has_first_locked_batch_arg_index()) { + return -1; + } + return resp->first_locked_batch_arg_index(); +} + Status PgDocReadOp::ResetPgsqlOps() { SCHECK(!HasActiveOps(), IllegalState, "Can't reset operations when some of them are active"); // Discard outstanding results, if any diff --git a/src/yb/yql/pggate/pg_doc_op.h b/src/yb/yql/pggate/pg_doc_op.h index 56f76a90e84c..fb2e133a194b 100644 --- a/src/yb/yql/pggate/pg_doc_op.h +++ b/src/yb/yql/pggate/pg_doc_op.h @@ -492,6 +492,10 @@ class PgDocReadOp : public PgDocOp { Status ResetPgsqlOps(); + // Get first_locked_batch_arg_index from the first operation's response. + // Returns -1 if not present or no response available. + int32_t GetFirstLockedBatchArgIndex() const; + protected: Status CompleteProcessResponse() override; diff --git a/src/yb/yql/pggate/pggate.cc b/src/yb/yql/pggate/pggate.cc index 06a14fa810df..2ff2220a720e 100644 --- a/src/yb/yql/pggate/pggate.cc +++ b/src/yb/yql/pggate/pggate.cc @@ -1539,6 +1539,15 @@ Status PgApiImpl::DmlFetch( natts, values, isnulls, syscols, has_data); } +Status PgApiImpl::DmlAddBatchYbctidArg(PgStatement* handle, Slice ybctid) { + VERIFY_RESULT_REF(GetStatementAs(handle)).AddBatchYbctidArg(ybctid); + return Status::OK(); +} + +Result PgApiImpl::DmlGetFirstLockedBatchArgIndex(PgStatement* handle) { + return VERIFY_RESULT_REF(GetStatementAs(handle)).GetFirstLockedBatchArgIndex(); +} + Result PgApiImpl::BuildTupleId(const YbcPgYBTupleIdDescriptor& descr) { return tuple_id_builder_.Build(pg_session_.get(), descr); } diff --git a/src/yb/yql/pggate/pggate.h b/src/yb/yql/pggate/pggate.h index 0cfd1e37cdd9..757b0d1ae118 100644 --- a/src/yb/yql/pggate/pggate.h +++ b/src/yb/yql/pggate/pggate.h @@ -532,6 +532,12 @@ class PgApiImpl { Status DmlFetch(PgStatement *handle, int32_t natts, uint64_t *values, bool *isnulls, YbcPgSysColumns *syscols, bool *has_data); + // Add a ybctid as a batch_argument on a read statement (for batch SKIP LOCKED). + Status DmlAddBatchYbctidArg(PgStatement *handle, Slice ybctid); + + // Read first_locked_batch_arg_index from a read statement's response. + Result DmlGetFirstLockedBatchArgIndex(PgStatement *handle); + // Utility method that checks stmt type and calls exec insert, update, or delete internally. Status DmlExecWriteOp(PgStatement *handle, int32_t *rows_affected_count); diff --git a/src/yb/yql/pggate/ybc_pggate.cc b/src/yb/yql/pggate/ybc_pggate.cc index b6dc7ab59bba..27dfb83fddbb 100644 --- a/src/yb/yql/pggate/ybc_pggate.cc +++ b/src/yb/yql/pggate/ybc_pggate.cc @@ -1549,6 +1549,19 @@ YbcStatus YBCPgAdjustOperationsBuffering(int multiple) { return ToYBCStatus(pgapi->AdjustOperationsBuffering(multiple)); } +YbcStatus YBCPgDmlAddBatchYbctidArg(YbcPgStatement handle, const char *ybctid, size_t ybctid_len) { + return ToYBCStatus(pgapi->DmlAddBatchYbctidArg(handle, Slice(ybctid, ybctid_len))); +} + +YbcStatus YBCPgDmlGetFirstLockedBatchArgIndex(YbcPgStatement handle, int32_t *index) { + auto result = pgapi->DmlGetFirstLockedBatchArgIndex(handle); + if (!result.ok()) { + return ToYBCStatus(result.status()); + } + *index = *result; + return YBCStatusOK(); +} + YbcStatus YBCPgDmlExecWriteOp(YbcPgStatement handle, int32_t *rows_affected_count) { return ToYBCStatus(pgapi->DmlExecWriteOp(handle, rows_affected_count)); } diff --git a/src/yb/yql/pggate/ybc_pggate.h b/src/yb/yql/pggate/ybc_pggate.h index 63f77e811dfb..8c34666ccb50 100644 --- a/src/yb/yql/pggate/ybc_pggate.h +++ b/src/yb/yql/pggate/ybc_pggate.h @@ -607,6 +607,13 @@ YbcStatus YBCPgDmlHnswSetReadOptions(YbcPgStatement handle, int ef_search); YbcStatus YBCPgDmlFetch(YbcPgStatement handle, int32_t natts, uint64_t *values, bool *isnulls, YbcPgSysColumns *syscols, bool *has_data); +// Add a ybctid as a batch_argument on a read statement (for batch SKIP LOCKED). +YbcStatus YBCPgDmlAddBatchYbctidArg(YbcPgStatement handle, const char *ybctid, size_t ybctid_len); + +// Read first_locked_batch_arg_index from response after execution. +// Returns -1 if not present. +YbcStatus YBCPgDmlGetFirstLockedBatchArgIndex(YbcPgStatement handle, int32_t *index); + // Utility method that checks stmt type and calls either exec insert, update, or delete internally. YbcStatus YBCPgDmlExecWriteOp(YbcPgStatement handle, int32_t *rows_affected_count);