Skip to content

Commit d8a834c

Browse files
address comments
1 parent 5dc043d commit d8a834c

4 files changed

Lines changed: 17 additions & 9 deletions

File tree

src/server/server.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2210,3 +2210,5 @@ AuthResult Server::AuthenticateUser(const std::string &user_password, std::strin
22102210
*ns = kDefaultNamespace;
22112211
return AuthResult::IS_ADMIN;
22122212
}
2213+
2214+
TaskRunner *Server::GetTaskRunner() { return &task_runner_; }

src/server/server.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ class Server {
197197
void AdjustWorkerThreads();
198198

199199
Status AddMaster(const std::string &host, uint32_t port, bool force_reconnect);
200+
201+
TaskRunner *GetTaskRunner() { return &task_runner_; }
200202
Status RemoveMaster();
201203
Status AddSlave(redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq);
202204
void DisconnectSlaves();
@@ -434,6 +436,9 @@ class Server {
434436
std::thread cron_thread_;
435437
std::thread compaction_checker_thread_;
436438
TaskRunner task_runner_;
439+
440+
public:
441+
TaskRunner *GetTaskRunner() { return &task_runner_; }
437442
std::vector<std::unique_ptr<WorkerThread>> worker_threads_;
438443
std::unique_ptr<ReplicationThread> replication_thread_;
439444
tbb::concurrent_queue<std::unique_ptr<WorkerThread>> recycle_worker_threads_;

src/types/redis_hash.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <thread>
3030
#include <utility>
3131

32+
#include "common/task_runner.h"
3233
#include "db_util.h"
3334
#include "logging.h"
3435
#include "parse_util.h"
@@ -680,6 +681,7 @@ rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice &user_key,
680681
}
681682

682683
void Hash::asyncRepairHash(const std::string &ns_key, const Slice &field, const HashMetadata &metadata) const {
684+
// Use the server's task runner to schedule the repair task instead of spawning a thread.
683685
auto repair_task = [storage = storage_, ns_key, field_str = field.ToString(), version = metadata.version,
684686
size = metadata.size]() {
685687
if (size == 0) {
@@ -689,27 +691,20 @@ void Hash::asyncRepairHash(const std::string &ns_key, const Slice &field, const
689691
engine::Context ctx(storage);
690692
std::string sub_key = InternalKey(ns_key, field_str, version, storage->IsSlotIdEncoded()).Encode();
691693

692-
// Use a retry loop to implement optimistic locking. This is to atomically
693-
// decrement the hash size and prevent a race condition when multiple threads
694-
// are repairing the same hash.
695694
for (int i = 0; i < 10; ++i) {
696695
HashMetadata current_metadata(false);
697696
std::string metadata_bytes;
698697
auto s = storage->Get(ctx, ctx.GetReadOptions(), storage->GetCFHandle(ColumnFamilyID::Metadata), ns_key,
699698
&metadata_bytes);
700699
if (!s.ok()) {
701-
// If metadata is not found, another repair might have cleaned it up. Stop.
702700
if (s.IsNotFound()) return;
703-
// For other errors, log and stop.
704701
error("Failed to get metadata for async repair: {}", s.ToString());
705702
return;
706703
}
707704
if (!current_metadata.Decode(metadata_bytes).ok()) {
708705
error("Failed to decode metadata for async repair");
709706
return;
710707
}
711-
712-
// If size is already 0, no need to decrement.
713708
if (current_metadata.size == 0) return;
714709

715710
auto batch = storage->GetWriteBatchBase();
@@ -728,7 +723,10 @@ void Hash::asyncRepairHash(const std::string &ns_key, const Slice &field, const
728723
error("Failed to async repair hash field after multiple retries");
729724
};
730725

731-
std::thread(repair_task).detach();
726+
// Use the server's task runner and TryPublish
727+
if (server_ && server_->GetTaskRunner()) {
728+
server_->GetTaskRunner()->TryPublish(std::move(repair_task));
729+
}
732730
}
733731

734732
} // namespace redis

src/types/redis_hash.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
#include "common/range_spec.h"
3030
#include "encoding.h"
31+
#include "server/server.h"
3132
#include "storage/redis_db.h"
3233
#include "storage/redis_metadata.h"
3334
#include "time_util.h"
@@ -151,7 +152,8 @@ namespace redis {
151152

152153
class Hash : public SubKeyScanner {
153154
public:
154-
Hash(engine::Storage *storage, const std::string &ns) : SubKeyScanner(storage, ns) {}
155+
Hash(engine::Storage *storage, const std::string &ns, Server *server = nullptr)
156+
: SubKeyScanner(storage, ns), server_(server) {}
155157

156158
rocksdb::Status Size(engine::Context &ctx, const Slice &user_key, uint64_t *size);
157159
rocksdb::Status Get(engine::Context &ctx, const Slice &user_key, const Slice &field, std::string *value);
@@ -199,6 +201,7 @@ class Hash : public SubKeyScanner {
199201
private:
200202
rocksdb::Status GetMetadata(engine::Context &ctx, const Slice &ns_key, HashMetadata *metadata);
201203
void asyncRepairHash(const std::string &ns_key, const Slice &field, const HashMetadata &metadata) const;
204+
Server *server_ = nullptr;
202205

203206
friend struct FieldValueRetriever;
204207
};

0 commit comments

Comments
 (0)