Skip to content

Commit da70ac9

Browse files
authored
[Store] L2->L1 promotion-on-hit (#2071)
1 parent 684e5f0 commit da70ac9

23 files changed

Lines changed: 3895 additions & 33 deletions

mooncake-integration/store/store_py.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1945,6 +1945,13 @@ PYBIND11_MODULE(store, m) {
19451945
})
19461946
.def("get", &mooncake::MooncakeStorePyWrapper::get)
19471947
.def("get_batch", &mooncake::MooncakeStorePyWrapper::get_batch)
1948+
.def("get_offload_rpc_read_count",
1949+
[](MooncakeStorePyWrapper &self) -> int64_t {
1950+
auto real_client =
1951+
std::dynamic_pointer_cast<RealClient>(self.store_);
1952+
return real_client ? real_client->get_offload_rpc_read_count()
1953+
: 0;
1954+
})
19481955
.def(
19491956
"get_buffer",
19501957
[](MooncakeStorePyWrapper &self, const std::string &key) {

mooncake-store/include/client_service.h

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class QueryResult {
6262
*/
6363
class Client {
6464
public:
65-
~Client();
65+
virtual ~Client();
6666

6767
const UUID& getClientId() const { return client_id_; }
6868

@@ -410,6 +410,49 @@ class Client {
410410
tl::expected<void, ErrorCode> ReportSsdCapacity(
411411
int64_t ssd_total_capacity_bytes);
412412

413+
/**
414+
* @brief Heartbeat-driven pull of pending L2->L1 promotion work for this
415+
* client. Mirror of OffloadObjectHeartbeat. Returns key->size pairs the
416+
* caller (FileStorage) must read from local SSD and stage as MEMORY
417+
* replicas via PromotionAllocStart + NotifyPromotionSuccess.
418+
*/
419+
// Virtual to enable subclassing in unit tests.
420+
virtual tl::expected<void, ErrorCode> PromotionObjectHeartbeat(
421+
std::unordered_map<std::string, int64_t>& promotion_objects);
422+
423+
/**
424+
* @brief Stage a PROCESSING MEMORY replica for an existing key during
425+
* L2->L1 promotion. Returns the new replica's descriptor that the caller
426+
* writes via Transfer Engine before calling NotifyPromotionSuccess.
427+
*/
428+
virtual tl::expected<PromotionAllocStartResponse, ErrorCode>
429+
PromotionAllocStart(const std::string& key, uint64_t size,
430+
const std::vector<std::string>& preferred_segments);
431+
432+
/**
433+
* @brief Commit a staged MEMORY replica to COMPLETE; called after the
434+
* client has written the bytes via Transfer Engine.
435+
*/
436+
virtual tl::expected<void, ErrorCode> NotifyPromotionSuccess(
437+
const std::string& key);
438+
439+
/**
440+
* @brief Release master-side promotion task after a client-side failure
441+
* between PromotionAllocStart and the transfer's completion. Idempotent.
442+
*/
443+
virtual tl::expected<void, ErrorCode> NotifyPromotionFailure(
444+
const std::string& key);
445+
446+
/**
447+
* @brief Write `slices` into the memory replica described by
448+
* `memory_descriptor` via Transfer Engine. Used by FileStorage to fill a
449+
* PROCESSING memory replica staged by PromotionAllocStart before calling
450+
* NotifyPromotionSuccess.
451+
*/
452+
virtual ErrorCode PromotionWrite(
453+
const Replica::Descriptor& memory_descriptor,
454+
std::vector<Slice>& slices);
455+
413456
/**
414457
* @brief Performs a batched read of multiple objects using a
415458
* high-throughput Transfer Engine.
@@ -577,14 +620,16 @@ class Client {
577620

578621
bool IsReplicaOnLocalMemory(const Replica::Descriptor& replica);
579622

580-
private:
623+
protected:
581624
/**
582-
* @brief Private constructor to enforce creation through Create() method
625+
* @brief Constructor exposed to subclasses for testing only; production
626+
* code must go through Create().
583627
*/
584628
Client(const std::string& local_hostname,
585629
const std::string& metadata_connstring, const std::string& protocol,
586630
const std::map<std::string, std::string>& labels = {});
587631

632+
private:
588633
/**
589634
* @brief Internal helper functions for initialization and data transfer
590635
*/

mooncake-store/include/file_storage.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class FileStorage {
5151

5252
private:
5353
friend class FileStorageTest;
54+
friend class FileStoragePromotionTest;
5455
struct AllocatedBatch {
5556
uint64_t batch_id;
5657
std::vector<BufferHandle> handles;
@@ -82,10 +83,24 @@ class FileStorage {
8283
* client.
8384
* 2. Receives feedback on which objects should be offloaded.
8485
* 3. Triggers asynchronous offloading of pending objects.
86+
* 4. Pulls and processes any pending L2->L1 promotion tasks queued by the
87+
* master (mirror of step 1+2 in the reverse direction).
8588
* @return tl::expected<void, ErrorCode> indicating operation status.
8689
*/
8790
tl::expected<void, ErrorCode> Heartbeat();
8891

92+
/**
93+
* @brief Drives the L2->L1 promotion pipeline for one heartbeat tick.
94+
* Pulls promotion work from the master, stages a MEMORY replica for each
95+
* key, copies the bytes from local SSD into that replica, and notifies the
96+
* master on success. A failure on any single key is logged and skipped;
97+
* the master-side reaper decrements the source replica's refcnt and
98+
* erases the task entry on TTL expiry, and any orphaned PROCESSING
99+
* MEMORY replica is reaped via the standard discarded-replicas path.
100+
* @return tl::expected<void, ErrorCode> indicating operation status.
101+
*/
102+
tl::expected<void, ErrorCode> ProcessPromotionTasks();
103+
89104
tl::expected<bool, ErrorCode> IsEnableOffloading();
90105

91106
tl::expected<void, ErrorCode> BatchLoad(
@@ -123,4 +138,4 @@ class FileStorage {
123138
std::thread client_buffer_gc_thread_;
124139
};
125140

126-
} // namespace mooncake
141+
} // namespace mooncake

mooncake-store/include/master_client.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,41 @@ class MasterClient {
419419
const UUID& client_id, const std::vector<std::string>& keys,
420420
const std::vector<StorageObjectMetadata>& metadatas);
421421

422+
/**
423+
* @brief Heartbeat-driven pull of pending L2->L1 promotion work for a
424+
* client. Returns key->size pairs the caller should read from local
425+
* SSD and stage as MEMORY replicas via PromotionAllocStart +
426+
* NotifyPromotionSuccess.
427+
*/
428+
[[nodiscard]] tl::expected<std::unordered_map<std::string, int64_t>,
429+
ErrorCode>
430+
PromotionObjectHeartbeat(const UUID& client_id);
431+
432+
/**
433+
* @brief Stage a PROCESSING MEMORY replica for an existing key during
434+
* promotion. Returns the new replica's descriptor that the caller writes
435+
* via Transfer Engine.
436+
*/
437+
[[nodiscard]] tl::expected<PromotionAllocStartResponse, ErrorCode>
438+
PromotionAllocStart(const UUID& client_id, const std::string& key,
439+
uint64_t size,
440+
const std::vector<std::string>& preferred_segments);
441+
442+
/**
443+
* @brief Release master-side promotion task state after a client-side
444+
* failure that prevents the holder from calling NotifyPromotionSuccess.
445+
* Idempotent; returns OK if the task was already swept by the reaper.
446+
*/
447+
[[nodiscard]] tl::expected<void, ErrorCode> NotifyPromotionFailure(
448+
const UUID& client_id, const std::string& key);
449+
450+
/**
451+
* @brief Commit a staged MEMORY replica to COMPLETE; called after the
452+
* client has written the bytes via Transfer Engine.
453+
*/
454+
[[nodiscard]] tl::expected<void, ErrorCode> NotifyPromotionSuccess(
455+
const UUID& client_id, const std::string& key);
456+
422457
/**
423458
* @brief Start a copy operation
424459
* @param key Object key

mooncake-store/include/master_config.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ struct MasterConfig {
103103
// Offload-on-evict: defer LOCAL_DISK offload to eviction time
104104
bool offload_on_evict = false;
105105
bool offload_force_evict = false;
106+
107+
// Promotion-on-hit: when Get observes a LOCAL_DISK-only key, queue an
108+
// async copy back to MEMORY so the next Get is fast.
109+
bool promotion_on_hit = false;
110+
uint32_t promotion_admission_threshold = 2;
111+
uint32_t promotion_queue_limit = 50000;
106112
};
107113

108114
class MasterServiceSupervisorConfig {
@@ -173,6 +179,9 @@ class MasterServiceSupervisorConfig {
173179
bool enable_cxl = false;
174180
bool offload_on_evict = false;
175181
bool offload_force_evict = false;
182+
bool promotion_on_hit = false;
183+
uint32_t promotion_admission_threshold = 2;
184+
uint32_t promotion_queue_limit = 50000;
176185
MasterServiceSupervisorConfig() = default;
177186

178187
// From MasterConfig
@@ -197,6 +206,9 @@ class MasterServiceSupervisorConfig {
197206
enable_offload = config.enable_offload;
198207
offload_on_evict = config.offload_on_evict;
199208
offload_force_evict = config.offload_force_evict;
209+
promotion_on_hit = config.promotion_on_hit;
210+
promotion_admission_threshold = config.promotion_admission_threshold;
211+
promotion_queue_limit = config.promotion_queue_limit;
200212
rpc_port = static_cast<int>(config.rpc_port);
201213
rpc_thread_num = static_cast<size_t>(config.rpc_thread_num);
202214

@@ -335,6 +347,9 @@ class WrappedMasterServiceConfig {
335347
bool enable_offload = false;
336348
bool offload_on_evict = false;
337349
bool offload_force_evict = false;
350+
bool promotion_on_hit = false;
351+
uint32_t promotion_admission_threshold = 2;
352+
uint32_t promotion_queue_limit = 50000;
338353
std::string ha_backend_type = "etcd";
339354
std::string ha_backend_connstring;
340355
std::string cluster_id = DEFAULT_CLUSTER_ID;
@@ -399,6 +414,9 @@ class WrappedMasterServiceConfig {
399414
enable_offload = config.enable_offload;
400415
offload_on_evict = config.offload_on_evict;
401416
offload_force_evict = config.offload_force_evict;
417+
promotion_on_hit = config.promotion_on_hit;
418+
promotion_admission_threshold = config.promotion_admission_threshold;
419+
promotion_queue_limit = config.promotion_queue_limit;
402420
ha_backend_type = config.ha_backend_type;
403421
ha_backend_connstring = ResolveConfiguredHABackendConnstring(
404422
ha_backend_type, config.ha_backend_connstring,
@@ -481,6 +499,9 @@ class WrappedMasterServiceConfig {
481499
enable_offload = config.enable_offload;
482500
offload_on_evict = config.offload_on_evict;
483501
offload_force_evict = config.offload_force_evict;
502+
promotion_on_hit = config.promotion_on_hit;
503+
promotion_admission_threshold = config.promotion_admission_threshold;
504+
promotion_queue_limit = config.promotion_queue_limit;
484505
ha_backend_type = config.ha_backend_type;
485506
ha_backend_connstring = ResolveConfiguredHABackendConnstring(
486507
ha_backend_type, config.ha_backend_connstring,
@@ -862,6 +883,9 @@ class MasterServiceConfig {
862883
bool enable_offload = false;
863884
bool offload_on_evict = false;
864885
bool offload_force_evict = false;
886+
bool promotion_on_hit = false;
887+
uint32_t promotion_admission_threshold = 2;
888+
uint32_t promotion_queue_limit = 50000;
865889
std::string ha_backend_type = "etcd";
866890
std::string ha_backend_connstring;
867891
std::string cluster_id = DEFAULT_CLUSTER_ID;
@@ -922,6 +946,9 @@ class MasterServiceConfig {
922946
enable_offload = config.enable_offload;
923947
offload_on_evict = config.offload_on_evict;
924948
offload_force_evict = config.offload_force_evict;
949+
promotion_on_hit = config.promotion_on_hit;
950+
promotion_admission_threshold = config.promotion_admission_threshold;
951+
promotion_queue_limit = config.promotion_queue_limit;
925952
ha_backend_type = config.ha_backend_type;
926953
ha_backend_connstring = config.ha_backend_connstring;
927954
cluster_id = config.cluster_id;

0 commit comments

Comments
 (0)