diff --git a/src/yb/tserver/pg_response_cache.cc b/src/yb/tserver/pg_response_cache.cc index a164be40176a..adcc2e1509aa 100644 --- a/src/yb/tserver/pg_response_cache.cc +++ b/src/yb/tserver/pg_response_cache.cc @@ -30,6 +30,7 @@ #include "yb/rpc/sidecars.h" #include "yb/util/async_util.h" +#include "yb/util/enums.h" #include "yb/util/flags.h" #include "yb/util/flags/flag_tags.h" #include "yb/util/logging.h" @@ -172,6 +173,8 @@ class MetricUpdater { DISALLOW_COPY_AND_ASSIGN(MetricUpdater); }; +YB_DEFINE_ENUM(DataState, (kNotReady)(kReadyOK)(kReadyFailure)); + class Data { public: Data(uint64_t version, @@ -191,23 +194,24 @@ class Data { ReadHybridTime::SingleTime(HybridTime::FromMicros( FLAGS_TEST_pg_response_cache_catalog_read_time_usec))); } - auto failed = !IsOk(value); + auto new_state = DataState::kReadyFailure; size_t sz = 0; - if (!failed) { + if (IsOk(value)) { for (const auto& data : value.rows_data) { sz += data.size(); } + new_state = DataState::kReadyOK; } decltype(waiters_) waiters; // Since response_ is not changed after assignment, we could store pointer to it to make // thread safety analysis happy, and then use it. - const PgResponseCache::Response* response; + const PgResponseCache::Response* response = nullptr; { std::lock_guard lock(mutex_); response_ = std::move(value); waiters.swap(waiters_); response = &*response_; - failed_ = failed; + state_.store(new_state, std::memory_order_release); } for (auto waiter : waiters) { waiter->Apply(*response); @@ -220,8 +224,20 @@ class Data { } } - [[nodiscard]] bool IsValid(CoarseTimePoint now, uint64_t version) { - return version == version_ && now < readiness_deadline_ && !failed_; + [[nodiscard]] bool IsValid(CoarseTimePoint now, uint64_t version) const { + if (PREDICT_FALSE(version != version_)) { + return false; + } + const auto state = state_.load(std::memory_order_acquire); + switch (state) { + case DataState::kNotReady: + return now < readiness_deadline_; + case DataState::kReadyOK: + return true; + case DataState::kReadyFailure: + return false; + } + FATAL_INVALID_ENUM_VALUE(DataState, state); } [[nodiscard]] std::optional RegisterWaiter( @@ -248,7 +264,7 @@ class Data { const CoarseTimePoint creation_time_; const CoarseTimePoint readiness_deadline_; std::mutex mutex_; - std::atomic failed_{false}; + std::atomic state_{DataState::kNotReady}; bool running_ GUARDED_BY(mutex_) = false; std::optional response_ GUARDED_BY(mutex_); std::vector waiters_ GUARDED_BY(mutex_); diff --git a/src/yb/yql/pgwrapper/pg_catalog_perf-test.cc b/src/yb/yql/pgwrapper/pg_catalog_perf-test.cc index 4f187a38e5d8..2226ca6e64e2 100644 --- a/src/yb/yql/pgwrapper/pg_catalog_perf-test.cc +++ b/src/yb/yql/pgwrapper/pg_catalog_perf-test.cc @@ -61,6 +61,8 @@ DECLARE_uint32(pg_cache_response_renew_soft_lifetime_limit_ms); DECLARE_uint64(pg_response_cache_size_bytes); DECLARE_uint32(pg_response_cache_size_percentage); DECLARE_int32(pgsql_proxy_webserver_port); +DECLARE_int32(ysql_client_read_write_timeout_ms); +DECLARE_int32(pg_client_extra_timeout_ms); using namespace std::literals; @@ -541,6 +543,40 @@ TEST_F_EX(PgCatalogPerfTest, ASSERT_EQ(metrics.cache.hits, 5); } +class PgCatalogShortRpcDeadlineTest : public PgCatalogWithUnlimitedCachePerfTest { + protected: + static constexpr int kRpcDeadlineMs = 2000; + static constexpr int kPgClientExtraTimeoutMs = 1000; + + void SetUp() override { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_client_read_write_timeout_ms) = kRpcDeadlineMs; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_pg_client_extra_timeout_ms) = kPgClientExtraTimeoutMs; + PgCatalogWithUnlimitedCachePerfTest::SetUp(); + } +}; + +// Test that the response cache is used even after the RPC deadline has passed. +TEST_F_EX(PgCatalogPerfTest, + ResponseCacheValidPastReadinessDeadline, + PgCatalogShortRpcDeadlineTest) { + // Warm up catalog's response cache. + ASSERT_RESULT(Connect()); + + const auto total_deadline_ms = + PgCatalogShortRpcDeadlineTest::kRpcDeadlineMs + + PgCatalogShortRpcDeadlineTest::kPgClientExtraTimeoutMs; + std::this_thread::sleep_for(std::chrono::milliseconds( + total_deadline_ms + ReleaseVsDebugVsAsanVsTsan(4000, 8000, 10000, 16000))); + + // Ensure new connections use the response cache. + auto metrics = ASSERT_RESULT(metrics_->Delta([this] { + RETURN_NOT_OK(Connect()); + return static_cast(Status::OK()); + })); + ASSERT_GT(metrics.cache.queries, 0); + ASSERT_EQ(metrics.cache.queries, metrics.cache.hits); +} + // The test checks response cache renewing process in case of 'Snapshot too old' error. // This error is possible in the following situation: // - several days ago at time T1 first connection was established to DB