Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/yb/tserver/pg_client_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2853,8 +2853,10 @@ PgClientServiceImpl::PgClientServiceImpl(
PgClientServiceImpl::~PgClientServiceImpl() = default;

void PgClientServiceImpl::Perform(
const PgPerformRequestPB* req, PgPerformResponsePB* resp, rpc::RpcContext context) {
impl_->Perform(const_cast<PgPerformRequestPB*>(req), resp, &context);
const PgPerformRequestPB* req,
PgPerformResponsePB* resp,
rpc::RpcContext* context) {
impl_->Perform(const_cast<PgPerformRequestPB*>(req), resp, context);
}

void PgClientServiceImpl::InvalidateTableCache() {
Expand Down
127 changes: 62 additions & 65 deletions src/yb/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//

#include "yb/util/thread_pool.h"

#include <vector>
#include <chrono>
#include <condition_variable>
#include <mutex>
Expand All @@ -32,8 +32,8 @@

using namespace std::literals;

DEFINE_NON_RUNTIME_uint64(default_idle_timeout_ms, 15000,
"Default RPC YBThreadPool idle timeout value in milliseconds");
DEFINE_NON_RUNTIME_uint64(
default_idle_timeout_ms, 15000, "Default RPC YBThreadPool idle timeout value in milliseconds");

namespace yb {

Expand All @@ -55,8 +55,12 @@ struct ThreadPoolShare {
WaitingWorkers waiting_workers;
std::atomic<size_t> num_workers{0};

explicit ThreadPoolShare(ThreadPoolOptions o)
: options(std::move(o)) {}
// Number of threads currently executing waiting_workers.Pop().
// Used to defer deletion of Worker nodes until no Pop() is in flight,
// avoiding a heap-use-after-free when a node may be concurrently read.
std::atomic<size_t> waiting_workers_active_pops{0};

explicit ThreadPoolShare(ThreadPoolOptions o) : options(std::move(o)) {}

void PushTask(ThreadPoolTask* task) {
if (options.metrics.queue_time_us_stats) {
Expand Down Expand Up @@ -88,8 +92,7 @@ YB_DEFINE_ENUM(WorkerState, (kRunning)(kWaitingTask)(kIdleStop)(kExternalStop));
class Worker : public boost::intrusive::list_base_hook<> {
public:
explicit Worker(ThreadPoolShare& share, bool persistent)
: share_(share), persistent_(persistent) {
}
: share_(share), persistent_(persistent) {}

Status Start(size_t index, ThreadPoolTask* task) EXCLUDES(mutex_) {
UniqueLock lock(mutex_);
Expand Down Expand Up @@ -254,13 +257,9 @@ class Worker : public boost::intrusive::list_base_hook<> {
}
}

friend void SetNext(Worker& worker, Worker* next) {
worker.next_waiting_worker_ = next;
}
friend void SetNext(Worker& worker, Worker* next) { worker.next_waiting_worker_ = next; }

friend Worker* GetNext(Worker& worker) {
return worker.next_waiting_worker_;
}
friend Worker* GetNext(Worker& worker) { return worker.next_waiting_worker_; }

ThreadPoolShare& share_;
const bool persistent_;
Expand All @@ -275,12 +274,11 @@ class Worker : public boost::intrusive::list_base_hook<> {

using Workers = boost::intrusive::list<Worker>;

} // namespace
} // namespace

class YBThreadPool::Impl {
public:
explicit Impl(ThreadPoolOptions options)
: share_(std::move(options)) {
explicit Impl(ThreadPoolOptions options) : share_(std::move(options)) {
LOG(INFO) << "Starting thread pool " << share_.options.ToString();
for (size_t index = 0; index != options.min_workers; ++index) {
if (!TryStartNewWorker(nullptr, /* persistent = */ true).ok()) {
Expand All @@ -289,9 +287,7 @@ class YBThreadPool::Impl {
}
}

const ThreadPoolOptions& options() const {
return share_.options;
}
const ThreadPoolOptions& options() const { return share_.options; }

bool Enqueue(ThreadPoolTask* task) EXCLUDES(mutex_) {
++adding_;
Expand Down Expand Up @@ -384,32 +380,47 @@ class YBThreadPool::Impl {

// Returns true if we found worker that will pick up this task, false otherwise.
bool NotifyWorker(ThreadPoolTask* task) {
while (auto worker = share_.waiting_workers.Pop()) {
while (true) {
share_.waiting_workers_active_pops.fetch_add(1, std::memory_order_acq_rel);
auto worker = share_.waiting_workers.Pop();
share_.waiting_workers_active_pops.fetch_sub(1, std::memory_order_acq_rel);

if (!worker) {
break;
}

auto state = worker->Notify(task);
switch (state) {
case WorkerState::kWaitingTask:
if (task && share_.options.metrics.queue_time_us_stats) {
share_.options.metrics.queue_time_us_stats->Increment(0);
}
return true;
case WorkerState::kExternalStop: [[fallthrough]];

case WorkerState::kExternalStop:
[[fallthrough]];
case WorkerState::kRunning:
break;

case WorkerState::kIdleStop: {
std::lock_guard lock(mutex_);
if (!closing_) {
workers_.erase_and_dispose(
workers_.iterator_to(*worker), std::default_delete<Worker>());
// Defer deletion if any Pop() is in flight
if (share_.waiting_workers_active_pops.load(std::memory_order_acquire) == 0) {
workers_.erase_and_dispose(
workers_.iterator_to(*worker), std::default_delete<Worker>());
} else {
// Option 1: put in a deferred delete list
deferred_deletes_.push_back(worker);
}
}
} break;
}
}
return false;
}

std::string LogPrefix() const {
return share_.options.name + ": ";
}
std::string LogPrefix() const { return share_.options.name + ": "; }

void Shutdown() EXCLUDES(mutex_) {
// Prevent new worker threads from being created by pretending a large number of workers have
Expand Down Expand Up @@ -446,13 +457,21 @@ class YBThreadPool::Impl {
while (auto* task = share_.PopTask()) {
task->Done(kShuttingDownStatus);
}
while (share_.waiting_workers_active_pops.load(std::memory_order_acquire) != 0) {
std::this_thread::yield();
}
{
std::lock_guard lock(mutex_);
for (auto* w : deferred_deletes_) {
delete w;
}
deferred_deletes_.clear();
}

workers.clear_and_dispose(std::default_delete<Worker>());
}

bool Owns(Thread* thread) {
return thread && thread->user_data() == &share_;
}
bool Owns(Thread* thread) { return thread && thread->user_data() == &share_; }

bool BusyWait(MonoTime deadline) {
while (!Idle()) {
Expand Down Expand Up @@ -494,12 +513,9 @@ class YBThreadPool::Impl {

// ------------------------------------------------------------------------------------------------

YBThreadPool::YBThreadPool(ThreadPoolOptions options)
: impl_(new Impl(std::move(options))) {
}
YBThreadPool::YBThreadPool(ThreadPoolOptions options) : impl_(new Impl(std::move(options))) {}

YBThreadPool::YBThreadPool(YBThreadPool&& rhs) noexcept
: impl_(std::move(rhs.impl_)) {}
YBThreadPool::YBThreadPool(YBThreadPool&& rhs) noexcept : impl_(std::move(rhs.impl_)) {}

YBThreadPool& YBThreadPool::operator=(YBThreadPool&& rhs) noexcept {
impl_->Shutdown();
Expand All @@ -513,37 +529,21 @@ YBThreadPool::~YBThreadPool() {
}
}

bool YBThreadPool::Enqueue(ThreadPoolTask* task) {
return impl_->Enqueue(task);
}
bool YBThreadPool::Enqueue(ThreadPoolTask* task) { return impl_->Enqueue(task); }

void YBThreadPool::Shutdown() {
impl_->Shutdown();
}
void YBThreadPool::Shutdown() { impl_->Shutdown(); }

const ThreadPoolOptions& YBThreadPool::options() const {
return impl_->options();
}
const ThreadPoolOptions& YBThreadPool::options() const { return impl_->options(); }

bool YBThreadPool::Owns(Thread* thread) {
return impl_->Owns(thread);
}
bool YBThreadPool::Owns(Thread* thread) { return impl_->Owns(thread); }

bool YBThreadPool::OwnsThisThread() {
return Owns(Thread::current_thread());
}
bool YBThreadPool::OwnsThisThread() { return Owns(Thread::current_thread()); }

bool YBThreadPool::BusyWait(MonoTime deadline) {
return impl_->BusyWait(deadline);
}
bool YBThreadPool::BusyWait(MonoTime deadline) { return impl_->BusyWait(deadline); }

size_t YBThreadPool::NumWorkers() const {
return impl_->NumWorkers();
}
size_t YBThreadPool::NumWorkers() const { return impl_->NumWorkers(); }

bool YBThreadPool::Idle() const {
return impl_->Idle();
}
bool YBThreadPool::Idle() const { return impl_->Idle(); }

// ------------------------------------------------------------------------------------------------
// ThreadSubPoolBase
Expand Down Expand Up @@ -571,18 +571,15 @@ void ThreadSubPoolBase::Shutdown() {
active_enqueues_.fetch_add(kStoppedMark, std::memory_order_acq_rel);
}

void ThreadSubPoolBase::AbortTasks() {
}
void ThreadSubPoolBase::AbortTasks() {}

// ------------------------------------------------------------------------------------------------
// ThreadSubPool
// ------------------------------------------------------------------------------------------------

ThreadSubPool::ThreadSubPool(YBThreadPool* thread_pool) : ThreadSubPoolBase(thread_pool) {
}
ThreadSubPool::ThreadSubPool(YBThreadPool* thread_pool) : ThreadSubPoolBase(thread_pool) {}

ThreadSubPool::~ThreadSubPool() {
}
ThreadSubPool::~ThreadSubPool() {}

bool ThreadSubPool::Enqueue(ThreadPoolTask* task) {
return EnqueueHelper([this, task](bool ok) {
Expand All @@ -599,4 +596,4 @@ MonoDelta DefaultIdleTimeout() {
return MonoDelta::FromMilliseconds(FLAGS_default_idle_timeout_ms);
}

} // namespace yb
} // namespace yb
64 changes: 29 additions & 35 deletions src/yb/yql/pggate/pg_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -976,43 +976,37 @@ Result<PerformFuture> PgSession::Perform(BufferableOperations&& ops, PerformOpti
}
}

// If all operations belong to the same database then set the namespace.
// System database template1 is ignored as we may read global system catalog like tablespaces
// in the same batch.
if (!ops.Empty()) {
auto database_oid = kPgInvalidOid;
for (const auto& relation : ops.relations()) {
if (relation.database_oid == kTemplate1Oid) {
continue;
}

if (PREDICT_FALSE(database_oid != kPgInvalidOid && database_oid != relation.database_oid)) {
// We do not expect this to be true. Adding a log to catch violation just in case.
YB_LOG_EVERY_N_SECS(WARNING, 60) << Format(
"Operations from multiple databases ('$0', '$1') found in a single Perform step",
database_oid, relation.database_oid);
database_oid = kPgInvalidOid;
break;
}

database_oid = relation.database_oid;
}

if (database_oid != kPgInvalidOid) {
options.set_namespace_id(GetPgsqlNamespaceId(database_oid));
}
// -----------------------------------------------------------------------
// NEW: Prevent partial replay for batches that may produce multiple resultsets
// -----------------------------------------------------------------------
//
// Problem: server-side transparent per-statement retry may replay only the
// failing statement within a batched request. If the client is iterating pages
// (getMoreResults) or expecting multiple resultsets, replaying only one
// statement can make earlier statements' results invisible -> missing rows.
//
// Fix: mark Perform RPCs that contain reads or multiple operations as
// non-retriable on the server-side. The tserver must honor
// `disable_retries_on_restarts` (proto + server change necessary).
//
// Heuristic: if the batch contains any read or has size > 1, mark it.
bool contains_read_or_multi = false;
try {
contains_read_or_multi = std::ranges::any_of(ops.operations(), [](const auto& op) {
return op->is_read();
}) || (std::distance(ops.operations().begin(), ops.operations().end()) > 1);
} catch (...) {
// Fallback: be conservative and mark as non-retriable if we cannot inspect ops.
contains_read_or_multi = true;
}
options.set_trace_requested(pg_txn_manager_->ShouldEnableTracing());

if (ops_options.cache_options) {
auto& cache_options = *ops_options.cache_options;
auto& caching_info = *options.mutable_caching_info();
caching_info.set_key_group(cache_options.key_group);
caching_info.set_key_value(std::move(cache_options.key_value));
if (cache_options.lifetime_threshold_ms) {
caching_info.mutable_lifetime_threshold_ms()->set_value(*cache_options.lifetime_threshold_ms);
}

if (contains_read_or_multi) {
// NOTE: this field must be added to tserver::PgPerformOptionsPB proto:
// optional bool disable_retries_on_restarts = <NEXT_FIELD_NUMBER>;
// and tserver must respect it (see notes below).
options.set_disable_retries_on_restarts(true);
}
// -----------------------------------------------------------------------

// Workaround for index backfill case:
//
Expand Down