Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions src/yb/tserver/pg_response_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "yb/rpc/sidecars.h"

#include "yb/util/async_util.h"
#include "yb/util/enums.h"
#include "yb/util/flags.h"
#include "yb/util/flags/flag_tags.h"
#include "yb/util/logging.h"
Expand Down Expand Up @@ -172,6 +173,8 @@ class MetricUpdater {
DISALLOW_COPY_AND_ASSIGN(MetricUpdater);
};

YB_DEFINE_ENUM(DataState, (kNotReady)(kReadyOK)(kReadyFailure));

class Data {
public:
Data(uint64_t version,
Expand All @@ -191,23 +194,24 @@ class Data {
ReadHybridTime::SingleTime(HybridTime::FromMicros(
FLAGS_TEST_pg_response_cache_catalog_read_time_usec)));
}
auto failed = !IsOk(value);
auto new_state = DataState::kReadyFailure;
size_t sz = 0;
if (!failed) {
if (IsOk(value)) {
for (const auto& data : value.rows_data) {
sz += data.size();
}
new_state = DataState::kReadyOK;
}
decltype(waiters_) waiters;
// Since response_ is not changed after assignment, we could store pointer to it to make
// thread safety analysis happy, and then use it.
const PgResponseCache::Response* response;
const PgResponseCache::Response* response = nullptr;
{
std::lock_guard lock(mutex_);
response_ = std::move(value);
waiters.swap(waiters_);
response = &*response_;
failed_ = failed;
state_.store(new_state, std::memory_order_release);
}
for (auto waiter : waiters) {
waiter->Apply(*response);
Expand All @@ -220,8 +224,20 @@ class Data {
}
}

[[nodiscard]] bool IsValid(CoarseTimePoint now, uint64_t version) {
return version == version_ && now < readiness_deadline_ && !failed_;
[[nodiscard]] bool IsValid(CoarseTimePoint now, uint64_t version) const {
if (PREDICT_FALSE(version != version_)) {
return false;
}
const auto state = state_.load(std::memory_order_acquire);
switch (state) {
case DataState::kNotReady:
return now < readiness_deadline_;
case DataState::kReadyOK:
return true;
case DataState::kReadyFailure:
return false;
}
FATAL_INVALID_ENUM_VALUE(DataState, state);
}

[[nodiscard]] std::optional<const PgResponseCache::Response*> RegisterWaiter(
Expand All @@ -248,7 +264,7 @@ class Data {
const CoarseTimePoint creation_time_;
const CoarseTimePoint readiness_deadline_;
std::mutex mutex_;
std::atomic<bool> failed_{false};
std::atomic<DataState> state_{DataState::kNotReady};
bool running_ GUARDED_BY(mutex_) = false;
std::optional<PgResponseCache::Response> response_ GUARDED_BY(mutex_);
std::vector<PgResponseCacheWaiterPtr> waiters_ GUARDED_BY(mutex_);
Expand Down
36 changes: 36 additions & 0 deletions src/yb/yql/pgwrapper/pg_catalog_perf-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ DECLARE_uint32(pg_cache_response_renew_soft_lifetime_limit_ms);
DECLARE_uint64(pg_response_cache_size_bytes);
DECLARE_uint32(pg_response_cache_size_percentage);
DECLARE_int32(pgsql_proxy_webserver_port);
DECLARE_int32(ysql_client_read_write_timeout_ms);
DECLARE_int32(pg_client_extra_timeout_ms);

using namespace std::literals;

Expand Down Expand Up @@ -541,6 +543,40 @@ TEST_F_EX(PgCatalogPerfTest,
ASSERT_EQ(metrics.cache.hits, 5);
}

class PgCatalogShortRpcDeadlineTest : public PgCatalogWithUnlimitedCachePerfTest {
protected:
static constexpr int kRpcDeadlineMs = 2000;
static constexpr int kPgClientExtraTimeoutMs = 1000;

void SetUp() override {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_client_read_write_timeout_ms) = kRpcDeadlineMs;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_pg_client_extra_timeout_ms) = kPgClientExtraTimeoutMs;
PgCatalogWithUnlimitedCachePerfTest::SetUp();
}
};

// Test that the response cache is used even after the RPC deadline has passed.
TEST_F_EX(PgCatalogPerfTest,
ResponseCacheValidPastReadinessDeadline,
PgCatalogShortRpcDeadlineTest) {
// Warm up catalog's response cache.
ASSERT_RESULT(Connect());

const auto total_deadline_ms =
PgCatalogShortRpcDeadlineTest::kRpcDeadlineMs +
PgCatalogShortRpcDeadlineTest::kPgClientExtraTimeoutMs;
std::this_thread::sleep_for(std::chrono::milliseconds(
total_deadline_ms + ReleaseVsDebugVsAsanVsTsan(4000, 8000, 10000, 16000)));

// Ensure new connections use the response cache.
auto metrics = ASSERT_RESULT(metrics_->Delta([this] {
RETURN_NOT_OK(Connect());
return static_cast<Status>(Status::OK());
}));
ASSERT_GT(metrics.cache.queries, 0);
ASSERT_EQ(metrics.cache.queries, metrics.cache.hits);
}

// The test checks response cache renewing process in case of 'Snapshot too old' error.
// This error is possible in the following situation:
// - several days ago at time T1 first connection was established to DB
Expand Down