Skip to content

Commit 7232d23

Browse files
vyavdoshenkoromange
authored andcommitted
server(fix) : SCAN command reduce latency (#5151)
Fixed: #5169
1 parent 350b28d commit 7232d23

2 files changed

Lines changed: 13 additions & 10 deletions

File tree

src/server/db_slice.cc

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,7 +1169,7 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
11691169
auto& db = db_arr_[cntx.db_index];
11701170
auto expire_it = db->expire.Find(it->first);
11711171

1172-
if (IsValid(expire_it)) {
1172+
if (IsValid(expire_it) || !shard_owner()->shard_lock()->Check(IntentLock::Mode::EXCLUSIVE)) {
11731173
// TODO: to employ multi-generation update of expire-base and the underlying values.
11741174
time_t expire_time = ExpireTime(expire_it);
11751175

@@ -1183,9 +1183,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
11831183
<< ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace();
11841184
}
11851185

1186-
DCHECK(shard_owner()->shard_lock()->Check(IntentLock::Mode::EXCLUSIVE))
1187-
<< util::fb2::GetStacktrace();
1188-
11891186
string scratch;
11901187
string_view key = it->first.GetSlice(&scratch);
11911188

src/server/generic_family.cc

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ extern "C" {
1414
#include "redis/crc64.h"
1515
}
1616

17+
#include "base/cycle_clock.h"
1718
#include "base/flags.h"
1819
#include "base/logging.h"
1920
#include "redis/rdb.h"
@@ -33,6 +34,7 @@ extern "C" {
3334
#include "server/set_family.h"
3435
#include "server/tiered_storage.h"
3536
#include "server/transaction.h"
37+
#include "util/fibers/fibers.h"
3638
#include "util/fibers/future.h"
3739
#include "util/varz.h"
3840

@@ -647,14 +649,16 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor,
647649
PrimeTable::Cursor cur = *cursor;
648650
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index);
649651

650-
const auto start = absl::Now();
652+
const auto start_cycles = base::CycleClock::Now();
651653
// Don't allow it to monopolize cpu time.
652-
const absl::Duration timeout = absl::Milliseconds(10);
654+
// Approximately 15 microseconds.
655+
const uint64_t timeout_cycles = base::CycleClock::Frequency() >> 16;
653656

654657
do {
655658
cur = prime_table->Traverse(
656659
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, vec); });
657-
} while (cur && cnt < scan_opts.limit && (absl::Now() - start) < timeout);
660+
} while (cur && cnt < scan_opts.limit &&
661+
(base::CycleClock::Now() - start_cycles) < timeout_cycles);
658662

659663
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value();
660664
*cursor = cur.value();
@@ -666,7 +670,7 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys
666670

667671
EngineShardSet* ess = shard_set;
668672
unsigned shard_count = ess->size();
669-
constexpr uint64_t kMaxScanTimeMs = 100;
673+
constexpr uint64_t kMaxScanTimeMs = 25;
670674

671675
// Dash table returns a cursor with its right byte empty. We will use it
672676
// for encoding shard index. For now scan has a limitation of 255 shards.
@@ -686,10 +690,12 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys
686690
};
687691

688692
// Avoid deadlocking, if called from shard queue script
689-
if (EngineShard::tlocal() && EngineShard::tlocal()->shard_id() == sid)
693+
if (EngineShard::tlocal() && EngineShard::tlocal()->shard_id() == sid) {
690694
cb();
691-
else
695+
util::ThisFiber::Yield();
696+
} else {
692697
ess->Await(sid, cb);
698+
}
693699

694700
if (cursor == 0) {
695701
++sid;

0 commit comments

Comments
 (0)