Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions src/postgres/src/backend/access/yb_access/yb_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -5242,6 +5242,104 @@ YBCLockTuple(Relation relation, Datum ybctid, RowMarkType mode,
return res;
}

TM_Result
YBCLockTupleBatch(Relation relation, Datum *ybctids, int count,
RowMarkType mode, EState *estate,
int *locked_index)
{
Assert(count > 0);

const Oid relfile_oid = YbGetRelfileNodeId(relation);

YbcPgStatement ybc_stmt = YbNewSelect(relation, NULL /* prepare_params */);

/*
* Bind the first ybctid as the regular column value (needed for partition
* routing). All ybctids are also added as batch_arguments for the tserver
* to iterate through.
*/
YbcPgExpr ybctid_expr = YBCNewConstant(ybc_stmt, BYTEAOID, InvalidOid,
ybctids[0], false);
HandleYBStatus(YBCPgDmlBindColumn(ybc_stmt, YBTupleIdAttributeNumber,
ybctid_expr));

for (int i = 0; i < count; i++)
{
Datum d = ybctids[i];
const char *data = VARDATA_ANY(DatumGetPointer(d));
size_t len = VARSIZE_ANY_EXHDR(DatumGetPointer(d));

HandleYBStatus(YBCPgDmlAddBatchYbctidArg(ybc_stmt, data, len));
}

YbcPgExecParameters exec_params = {0};
exec_params.limit_count = count;
exec_params.rowmark = mode;
exec_params.pg_wait_policy = LockWaitSkip;
exec_params.docdb_wait_policy = YBGetDocDBWaitPolicy(LockWaitSkip);
exec_params.stmt_in_txn_limit_ht_for_reads =
estate->yb_exec_params.stmt_in_txn_limit_ht_for_reads;

HandleYBStatus(YBCPgExecSelect(ybc_stmt, &exec_params));

/*
* We must fetch at least once to trigger the RPC and populate the
* response protobuf (which carries first_locked_batch_arg_index).
*
* Like YBCLockTuple, catch conflict/skip-locking errors from the fetch
* rather than raising them — for SKIP LOCKED, these mean "all candidates
* were contended" and should return TM_WouldBlock, not abort the statement.
*/
bool has_data = false;
YbcStatus status = YBCPgDmlFetch(ybc_stmt, 0, NULL, NULL, NULL, &has_data);

if (status)
{
const uint32_t err_code = YBCStatusPgsqlError(status);

switch (err_code)
{
case ERRCODE_YB_TXN_CONFLICT:
case ERRCODE_YB_TXN_SKIP_LOCKING:
/* Conflict or skip during batch lock — treat as all-skipped. */
YBCFreeStatus(status);
*locked_index = -1;
YBCPgDeleteStatement(ybc_stmt);
return TM_WouldBlock;
default:
HandleYBStatus(status);
break;
}
YBCFreeStatus(status);
}

/*
* Read the first_locked_batch_arg_index from the response.
* The tserver tries each batch_argument in order and returns the index
* of the first one it successfully locked.
*/
int32_t winner = -1;

HandleYBStatus(YBCPgDmlGetFirstLockedBatchArgIndex(ybc_stmt, &winner));

TM_Result res;

if (winner >= 0 && winner < count)
{
*locked_index = winner;
YBCPgAddIntoForeignKeyReferenceCache(relfile_oid, ybctids[winner]);
res = TM_Ok;
}
else
{
*locked_index = -1;
res = TM_WouldBlock;
}

YBCPgDeleteStatement(ybc_stmt);
return res;
}

void
YBCFlushTupleLocks()
{
Expand Down
264 changes: 264 additions & 0 deletions src/postgres/src/backend/executor/nodeLockRows.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,204 @@

/* YB includes */
#include "access/yb_scan.h"
#include "utils/datum.h"
#include "utils/guc.h"


/*
* ExecLockRowsBatchSkipLocked
*
* Optimized batch path for YB SKIP LOCKED. Pre-fetches up to batch_size
* candidate tuples from the scan, sends all their ybctids in a single RPC,
* and the tserver locks the first unlocked one.
*
* Returns the winning slot (with the locked tuple restored) or NULL if all
* candidates in this batch were skipped (caller should retry) or the scan
* is exhausted.
*
* Candidates after the winner are saved into node->yb_batch_leftover_* so
* subsequent ExecLockRows calls can try them via the per-row path without
* losing rows consumed from the scan.
*
* *scan_exhausted is set to true when ExecProcNode returned NULL, meaning
* there are no more rows to try.
*/
static TupleTableSlot *
ExecLockRowsBatchSkipLocked(LockRowsState *node,
TupleTableSlot *first_slot,
ExecAuxRowMark *aerm,
ExecRowMark *erm,
EState *estate,
PlanState *outerPlan,
bool *scan_exhausted)
{
int batch_size = yb_skip_locked_batch_size;
Datum *batch_ybctids;
HeapTuple *batch_tuples;
int batch_count = 0;
TupleTableSlot *slot = first_slot;
int locked_index;
TM_Result result;

*scan_exhausted = false;

batch_ybctids = (Datum *) palloc(sizeof(Datum) * batch_size);
batch_tuples = (HeapTuple *) palloc(sizeof(HeapTuple) * batch_size);

/*
* Collect candidate ybctids. The first slot was already fetched by the
* caller; fetch up to batch_size - 1 more.
*/
for (;;)
{
Datum datum;
bool isNull;

datum = ExecGetJunkAttribute(slot, aerm->ctidAttNo, &isNull);
if (isNull)
elog(ERROR, "ctid/ybctid is NULL");

batch_ybctids[batch_count] = datumCopy(datum, false, -1);
batch_tuples[batch_count] = ExecCopySlotHeapTuple(slot);
batch_count++;

if (batch_count >= batch_size)
break;

slot = ExecProcNode(outerPlan);
if (TupIsNull(slot))
{
*scan_exhausted = true;
break;
}
}

if (batch_count == 0)
{
pfree(batch_ybctids);
pfree(batch_tuples);
return NULL;
}

/* Single batch RPC to lock the first available candidate. */
result = YBCLockTupleBatch(erm->relation, batch_ybctids, batch_count,
erm->markType, estate, &locked_index);

slot = NULL;

if (result == TM_Ok && locked_index >= 0 && locked_index < batch_count)
{
/*
* Winner found. Restore the winning tuple into the outer plan's
* result slot so the caller can return it.
*/
TupleTableSlot *result_slot = first_slot;

ExecForceStoreHeapTuple(batch_tuples[locked_index], result_slot, true);
slot = result_slot;

/* Free candidates before the winner (they were skipped by tserver). */
for (int i = 0; i < locked_index; i++)
{
pfree(DatumGetPointer(batch_ybctids[i]));
heap_freetuple(batch_tuples[i]);
}
/* Winner ybctid datum no longer needed (tuple is in the slot). */
pfree(DatumGetPointer(batch_ybctids[locked_index]));

/*
* Save candidates after the winner as leftovers. These were NOT tried
* by the tserver (it stops at the first success), so subsequent
* ExecLockRows calls will try them via the per-row path.
*/
int leftover_count = batch_count - locked_index - 1;
if (leftover_count > 0)
{
node->yb_batch_leftover_tuples = (HeapTuple *)
palloc(sizeof(HeapTuple) * leftover_count);
node->yb_batch_leftover_ybctids = (Datum *)
palloc(sizeof(Datum) * leftover_count);
for (int i = 0; i < leftover_count; i++)
{
int src = locked_index + 1 + i;
node->yb_batch_leftover_tuples[i] = batch_tuples[src];
node->yb_batch_leftover_ybctids[i] = batch_ybctids[src];
}
node->yb_batch_leftover_count = leftover_count;
node->yb_batch_leftover_idx = 0;
}
}
else
{
/* All candidates were skipped. Free everything. */
for (int i = 0; i < batch_count; i++)
{
pfree(DatumGetPointer(batch_ybctids[i]));
heap_freetuple(batch_tuples[i]);
}
}

pfree(batch_ybctids);
pfree(batch_tuples);
return slot;
}

/*
* ExecLockRowsTryLeftover
*
* Try to lock the next leftover candidate from a previous batch RPC using
* the per-row YBCLockTuple path. Returns a slot if a row was locked, or
* NULL if the leftover was skipped (caller should try the next one or
* fall through to the scan).
*/
static TupleTableSlot *
ExecLockRowsTryLeftover(LockRowsState *node,
ExecRowMark *erm,
EState *estate,
TupleTableSlot *result_slot)
{
while (node->yb_batch_leftover_idx < node->yb_batch_leftover_count)
{
int idx = node->yb_batch_leftover_idx++;
Datum ybctid = node->yb_batch_leftover_ybctids[idx];
HeapTuple tup = node->yb_batch_leftover_tuples[idx];
TM_Result res;

res = YBCLockTuple(erm->relation, ybctid, erm->markType,
LockWaitSkip, estate);

pfree(DatumGetPointer(ybctid));

if (res == TM_Ok)
{
ExecForceStoreHeapTuple(tup, result_slot, true);

/* Free remaining leftovers if this is the last one we need. */
/* (They'll be freed when the next call consumes or exhausts them.) */
return result_slot;
}

/* Skipped — free this tuple and try the next leftover. */
heap_freetuple(tup);
}

/* All leftovers exhausted. Clean up. */
if (node->yb_batch_leftover_tuples)
{
pfree(node->yb_batch_leftover_tuples);
node->yb_batch_leftover_tuples = NULL;
}
if (node->yb_batch_leftover_ybctids)
{
pfree(node->yb_batch_leftover_ybctids);
node->yb_batch_leftover_ybctids = NULL;
}
node->yb_batch_leftover_count = 0;
node->yb_batch_leftover_idx = 0;

return NULL;
}

/* ----------------------------------------------------------------
* ExecLockRows
* ----------------------------------------------------------------
Expand All @@ -59,6 +255,26 @@ ExecLockRows(PlanState *pstate)
* Get next tuple from subplan, if any.
*/
lnext:
/*
* If we have leftover candidates from a previous batch SKIP LOCKED RPC,
* try to lock them one-by-one before pulling new rows from the scan.
* This is needed for LIMIT > 1: the batch path only returns the first
* winner, and leftover candidates may still be lockable.
*/
if (node->yb_batch_leftover_count > node->yb_batch_leftover_idx)
{
ExecAuxRowMark *aerm = (ExecAuxRowMark *) linitial(node->lr_arowMarks);
ExecRowMark *erm = aerm->rowmark;
TupleTableSlot *leftover_result;

/* Use the outer plan's result slot — same slot the batch path uses. */
leftover_result = ExecLockRowsTryLeftover(node, erm, estate,
outerPlan->ps_ResultTupleSlot);
if (leftover_result != NULL)
return leftover_result;
/* All leftovers exhausted, fall through to scan for more rows. */
}

slot = ExecProcNode(outerPlan);

if (node->yb_are_row_marks_for_yb_rels &&
Expand All @@ -81,6 +297,39 @@ ExecLockRows(PlanState *pstate)
/* We don't need EvalPlanQual unless we get updated tuple version(s) */
epq_needed = false;

/*
* YB batch SKIP LOCKED optimization: when we have exactly one YB row mark
* with SKIP LOCKED policy and batch size > 1, pre-fetch multiple candidates
* and lock the first available one in a single RPC.
*/
if (node->yb_are_row_marks_for_yb_rels &&
yb_skip_locked_batch_size > 1 &&
list_length(node->lr_arowMarks) == 1)
{
ExecAuxRowMark *aerm = (ExecAuxRowMark *) linitial(node->lr_arowMarks);
ExecRowMark *erm = aerm->rowmark;

if (erm->waitPolicy == LockWaitSkip)
{
bool scan_exhausted;
TupleTableSlot *result;

result = ExecLockRowsBatchSkipLocked(node, slot, aerm, erm,
estate, outerPlan,
&scan_exhausted);
if (result != NULL)
return result;

/* All candidates were skipped. Try more if scan has rows left. */
if (!scan_exhausted)
goto lnext;

/* Scan exhausted, no rows locked. */
EvalPlanQualEnd(&node->lr_epqstate);
return NULL;
}
}

/*
* Attempt to lock the source tuple(s). (Note we only have locking
* rowmarks in lr_arowMarks.)
Expand Down Expand Up @@ -452,6 +701,21 @@ ExecInitLockRows(LockRows *node, EState *estate, int eflags)
void
ExecEndLockRows(LockRowsState *node)
{
/* Free any leftover batch SKIP LOCKED candidates. */
if (node->yb_batch_leftover_tuples)
{
for (int i = node->yb_batch_leftover_idx;
i < node->yb_batch_leftover_count; i++)
{
pfree(DatumGetPointer(node->yb_batch_leftover_ybctids[i]));
heap_freetuple(node->yb_batch_leftover_tuples[i]);
}
pfree(node->yb_batch_leftover_tuples);
pfree(node->yb_batch_leftover_ybctids);
node->yb_batch_leftover_tuples = NULL;
node->yb_batch_leftover_ybctids = NULL;
}

/* We may have shut down EPQ already, but no harm in another call */
EvalPlanQualEnd(&node->lr_epqstate);
ExecEndNode(outerPlanState(node));
Expand Down
Loading