Skip to content

Commit 06ae8dd

Browse files
authored
[Store] Add opt-in grouped object routing semantics (#2180)
1 parent 52e62c3 commit 06ae8dd

12 files changed

Lines changed: 1554 additions & 264 deletions

File tree

docs/source/python-api-reference/mooncake-store.md

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,8 +586,36 @@ config.preferred_segment = self.get_hostname()
586586
587587
```python
588588
config = ReplicateConfig()
589-
config.prefer_alloc_in_same_node = "True
589+
config.prefer_alloc_in_same_node = "True"
590590
```
591+
592+
#### group_ids
593+
**Type:** `List[str] | None`
594+
**Default:** `None`
595+
**Description:** Optionally assigns object metadata to routing groups during writes. When this field is unset, Mooncake Store preserves the default ungrouped behavior. When it is set, each group ID maps to the object at the same position in the write request. Empty string (`""`) explicitly stores that object as ungrouped.
596+
597+
For batch write APIs, the number of group IDs must match the number of keys:
598+
599+
```python
600+
config = ReplicateConfig()
601+
config.group_ids = ["session-a", "", "session-b"]
602+
603+
store.put_batch(
604+
["key-a", "key-b", "key-c"],
605+
[b"value-a", b"value-b", b"value-c"],
606+
config,
607+
)
608+
```
609+
610+
For a single-object write, provide one group ID:
611+
612+
```python
613+
config = ReplicateConfig()
614+
config.group_ids = ["session-a"]
615+
616+
store.put("key-a", b"value-a", config)
617+
```
618+
591619
---
592620
593621
## Unified Parallel Tensor IO API

mooncake-integration/store/store_py.cpp

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,56 @@ class MooncakeStorePyWrapper {
677677
ReplicateConfig{}); // Default config
678678
}
679679

680+
ReplicateConfig MakeIndexedConfig(
681+
const ReplicateConfig &config,
682+
const std::vector<size_t> &original_indices) const {
683+
if (!config.group_ids.has_value()) {
684+
return config;
685+
}
686+
687+
ReplicateConfig indexed_config = config;
688+
std::vector<std::string> group_ids;
689+
group_ids.reserve(original_indices.size());
690+
for (size_t index : original_indices) {
691+
group_ids.push_back(config.group_ids->at(index));
692+
}
693+
indexed_config.group_ids = std::move(group_ids);
694+
return indexed_config;
695+
}
696+
697+
ReplicateConfig MakeRepeatedIndexedConfig(
698+
const ReplicateConfig &config,
699+
const std::vector<size_t> &original_indices, int repeat_count) const {
700+
if (!config.group_ids.has_value()) {
701+
return config;
702+
}
703+
704+
ReplicateConfig indexed_config = config;
705+
std::vector<std::string> group_ids;
706+
group_ids.reserve(original_indices.size() *
707+
static_cast<size_t>(repeat_count));
708+
for (size_t index : original_indices) {
709+
for (int i = 0; i < repeat_count; ++i) {
710+
group_ids.push_back(config.group_ids->at(index));
711+
}
712+
}
713+
indexed_config.group_ids = std::move(group_ids);
714+
return indexed_config;
715+
}
716+
717+
std::vector<int> ValidateGroupIdsForBatchConfig(
718+
const ReplicateConfig &config, size_t key_count,
719+
const char *operation_name) const {
720+
if (config.group_ids.has_value() &&
721+
config.group_ids->size() != key_count) {
722+
LOG(ERROR) << operation_name
723+
<< ": group_ids size must match keys size";
724+
return std::vector<int>(key_count,
725+
to_py_ret(ErrorCode::INVALID_PARAMS));
726+
}
727+
return {};
728+
}
729+
680730
int put_tensor_with_tp_impl(
681731
const std::string &key, pybind11::object tensor,
682732
const ReplicateConfig &config = ReplicateConfig{}, int tp_rank = 0,
@@ -725,11 +775,12 @@ class MooncakeStorePyWrapper {
725775
const ReplicateConfig &config = ReplicateConfig{}) {
726776
return batch_write_tensor_impl(
727777
keys, infos, config, "put",
728-
[this, &config](const std::vector<std::string> &write_keys,
729-
const std::vector<void *> &buffer_ptrs,
730-
const std::vector<size_t> &buffer_sizes) {
778+
[this](const std::vector<std::string> &write_keys,
779+
const std::vector<void *> &buffer_ptrs,
780+
const std::vector<size_t> &buffer_sizes,
781+
const ReplicateConfig &write_config) {
731782
return store_->batch_put_from(write_keys, buffer_ptrs,
732-
buffer_sizes, config);
783+
buffer_sizes, write_config);
733784
});
734785
}
735786

@@ -769,6 +820,11 @@ class MooncakeStorePyWrapper {
769820
std::vector<size_t> processed_indices;
770821
std::vector<int> final_results(base_keys.size(),
771822
to_py_ret(ErrorCode::INVALID_PARAMS));
823+
auto group_ids_error = ValidateGroupIdsForBatchConfig(
824+
config, base_keys.size(), "batch_put_tensor_with_tp");
825+
if (!group_ids_error.empty()) {
826+
return group_ids_error;
827+
}
772828
try {
773829
// Chunking phase (GIL Held)
774830
for (size_t i = 0; i < base_keys.size(); ++i) {
@@ -799,8 +855,10 @@ class MooncakeStorePyWrapper {
799855
if (all_chunk_keys.empty()) return final_results;
800856

801857
// Reuse the standard batch_put implementation
802-
std::vector<int> chunk_results =
803-
batch_put_tensor_impl(all_chunk_keys, all_chunks_list, config);
858+
ReplicateConfig chunk_config =
859+
MakeRepeatedIndexedConfig(config, processed_indices, tp_size);
860+
std::vector<int> chunk_results = batch_put_tensor_impl(
861+
all_chunk_keys, all_chunks_list, chunk_config);
804862

805863
// Aggregate results
806864
for (size_t i = 0; i < processed_indices.size(); ++i) {
@@ -1234,6 +1292,12 @@ class MooncakeStorePyWrapper {
12341292
const std::vector<std::string> &keys,
12351293
const pybind11::list &tensors_list,
12361294
const ReplicateConfig &config = ReplicateConfig{}) {
1295+
auto group_ids_error = ValidateGroupIdsForBatchConfig(
1296+
config, keys.size(), "batch_upsert_tensor");
1297+
if (!group_ids_error.empty()) {
1298+
return group_ids_error;
1299+
}
1300+
12371301
std::vector<PyTensorInfo> infos(keys.size());
12381302
std::vector<int> results(keys.size(), 0);
12391303

@@ -1287,8 +1351,10 @@ class MooncakeStorePyWrapper {
12871351
}
12881352

12891353
if (!valid_keys.empty()) {
1354+
ReplicateConfig write_config =
1355+
MakeIndexedConfig(config, original_indices);
12901356
std::vector<int> op_results = store_->batch_upsert_from(
1291-
valid_keys, buffer_ptrs, buffer_sizes, config);
1357+
valid_keys, buffer_ptrs, buffer_sizes, write_config);
12921358
for (size_t i = 0; i < op_results.size(); ++i) {
12931359
results[original_indices[i]] = op_results[i];
12941360
}
@@ -1603,6 +1669,7 @@ PYBIND11_MODULE(store, m) {
16031669
.def_readwrite("prefer_alloc_in_same_node",
16041670
&ReplicateConfig::prefer_alloc_in_same_node)
16051671
.def_readwrite("data_type", &ReplicateConfig::data_type)
1672+
.def_readwrite("group_ids", &ReplicateConfig::group_ids)
16061673
.def("__str__", [](const ReplicateConfig &config) {
16071674
std::ostringstream oss;
16081675
oss << config;

mooncake-integration/store/store_py_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,7 @@ bool is_default_replicate_config(const ReplicateConfig &config) {
848848
return config.replica_num == 1 && !config.with_soft_pin &&
849849
!config.with_hard_pin && config.preferred_segments.empty() &&
850850
config.preferred_segment.empty() &&
851-
!config.prefer_alloc_in_same_node;
851+
!config.prefer_alloc_in_same_node && !config.group_ids.has_value();
852852
}
853853

854854
std::optional<ParallelAxisSpec> parse_parallel_axis_spec(

mooncake-integration/store/store_py_parallel_write.h

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,12 @@ std::vector<int> batch_write_tensor_impl(const std::vector<std::string> &keys,
2020
const ReplicateConfig &config,
2121
const char *operation_name,
2222
BatchWriteFromFn &&batch_write_from) {
23+
auto group_ids_error =
24+
ValidateGroupIdsForBatchConfig(config, keys.size(), operation_name);
25+
if (!group_ids_error.empty()) {
26+
return group_ids_error;
27+
}
28+
2329
std::vector<int> results(keys.size(), 0);
2430

2531
{
@@ -65,8 +71,10 @@ std::vector<int> batch_write_tensor_impl(const std::vector<std::string> &keys,
6571
}
6672

6773
if (!valid_keys.empty()) {
68-
std::vector<int> op_results =
69-
batch_write_from(valid_keys, buffer_ptrs, buffer_sizes);
74+
ReplicateConfig write_config =
75+
MakeIndexedConfig(config, original_indices);
76+
std::vector<int> op_results = batch_write_from(
77+
valid_keys, buffer_ptrs, buffer_sizes, write_config);
7078
for (size_t i = 0; i < op_results.size(); ++i) {
7179
results[original_indices[i]] = op_results[i];
7280
}
@@ -905,6 +913,12 @@ std::vector<int> batch_put_tensor_with_parallelism(
905913
const py::object &parallelisms = py::none(),
906914
const ReplicateConfig &config = ReplicateConfig{},
907915
const py::object &writer_partitions = py::none()) {
916+
auto group_ids_error = ValidateGroupIdsForBatchConfig(
917+
config, keys.size(), "batch_put_tensor_with_parallelism");
918+
if (!group_ids_error.empty()) {
919+
return group_ids_error;
920+
}
921+
908922
return execute_batch_parallelism_write_requests(
909923
keys, tensors_list.size(), parallelisms, writer_partitions,
910924
"batch_put_tensor_with_parallelism",
@@ -921,14 +935,16 @@ std::vector<int> batch_put_tensor_with_parallelism(
921935
},
922936
[this, &keys, &tensors_list, &config](size_t i,
923937
const py::handle &parallelism) {
938+
ReplicateConfig key_config = config.ForSingleKey(i);
924939
return put_tensor_with_parallelism(
925940
keys[i], tensors_list[i],
926-
py::reinterpret_borrow<py::object>(parallelism), config);
941+
py::reinterpret_borrow<py::object>(parallelism), key_config);
927942
},
928943
[this, &keys, &tensors_list, &config](
929944
size_t i, const py::handle &writer_partition) {
945+
ReplicateConfig key_config = config.ForSingleKey(i);
930946
return put_tensor_with_parallelism(
931-
keys[i], tensors_list[i], py::none(), config,
947+
keys[i], tensors_list[i], py::none(), key_config,
932948
py::reinterpret_borrow<py::object>(writer_partition));
933949
});
934950
}
@@ -1029,6 +1045,12 @@ std::vector<int> batch_put_tensor_with_parallelism_from(
10291045
const py::object &parallelisms = py::none(),
10301046
const ReplicateConfig &config = ReplicateConfig{},
10311047
const py::object &writer_partitions = py::none()) {
1048+
auto group_ids_error = ValidateGroupIdsForBatchConfig(
1049+
config, keys.size(), "batch_put_tensor_with_parallelism_from");
1050+
if (!group_ids_error.empty()) {
1051+
return group_ids_error;
1052+
}
1053+
10321054
return execute_batch_parallelism_write_requests(
10331055
keys, buffer_ptrs.size(), parallelisms, writer_partitions,
10341056
"batch_put_tensor_with_parallelism_from",
@@ -1070,14 +1092,16 @@ std::vector<int> batch_put_tensor_with_parallelism_from(
10701092
},
10711093
[this, &keys, &buffer_ptrs, &sizes, &config](
10721094
size_t i, const py::handle &parallelism) {
1095+
ReplicateConfig key_config = config.ForSingleKey(i);
10731096
return put_tensor_with_parallelism_from(
10741097
keys[i], buffer_ptrs[i], sizes[i],
1075-
py::reinterpret_borrow<py::object>(parallelism), config);
1098+
py::reinterpret_borrow<py::object>(parallelism), key_config);
10761099
},
10771100
[this, &keys, &buffer_ptrs, &sizes, &config](
10781101
size_t i, const py::handle &writer_partition) {
1102+
ReplicateConfig key_config = config.ForSingleKey(i);
10791103
return put_tensor_with_parallelism_from(
1080-
keys[i], buffer_ptrs[i], sizes[i], py::none(), config,
1104+
keys[i], buffer_ptrs[i], sizes[i], py::none(), key_config,
10811105
py::reinterpret_borrow<py::object>(writer_partition));
10821106
});
10831107
}
@@ -1345,6 +1369,12 @@ std::vector<int> batch_upsert_tensor_with_parallelism(
13451369
const py::object &parallelisms = py::none(),
13461370
const ReplicateConfig &config = ReplicateConfig{},
13471371
const py::object &writer_partitions = py::none()) {
1372+
auto group_ids_error = ValidateGroupIdsForBatchConfig(
1373+
config, keys.size(), "batch_upsert_tensor_with_parallelism");
1374+
if (!group_ids_error.empty()) {
1375+
return group_ids_error;
1376+
}
1377+
13481378
return execute_batch_parallelism_write_requests(
13491379
keys, tensors_list.size(), parallelisms, writer_partitions,
13501380
"batch_upsert_tensor_with_parallelism",
@@ -1361,14 +1391,16 @@ std::vector<int> batch_upsert_tensor_with_parallelism(
13611391
},
13621392
[this, &keys, &tensors_list, &config](size_t i,
13631393
const py::handle &parallelism) {
1394+
ReplicateConfig key_config = config.ForSingleKey(i);
13641395
return upsert_tensor_with_parallelism(
13651396
keys[i], tensors_list[i],
1366-
py::reinterpret_borrow<py::object>(parallelism), config);
1397+
py::reinterpret_borrow<py::object>(parallelism), key_config);
13671398
},
13681399
[this, &keys, &tensors_list, &config](
13691400
size_t i, const py::handle &writer_partition) {
1401+
ReplicateConfig key_config = config.ForSingleKey(i);
13701402
return upsert_tensor_with_parallelism(
1371-
keys[i], tensors_list[i], py::none(), config,
1403+
keys[i], tensors_list[i], py::none(), key_config,
13721404
py::reinterpret_borrow<py::object>(writer_partition));
13731405
});
13741406
}
@@ -1379,6 +1411,12 @@ std::vector<int> batch_upsert_tensor_with_parallelism_from(
13791411
const py::object &parallelisms = py::none(),
13801412
const ReplicateConfig &config = ReplicateConfig{},
13811413
const py::object &writer_partitions = py::none()) {
1414+
auto group_ids_error = ValidateGroupIdsForBatchConfig(
1415+
config, keys.size(), "batch_upsert_tensor_with_parallelism_from");
1416+
if (!group_ids_error.empty()) {
1417+
return group_ids_error;
1418+
}
1419+
13821420
return execute_batch_parallelism_write_requests(
13831421
keys, buffer_ptrs.size(), parallelisms, writer_partitions,
13841422
"batch_upsert_tensor_with_parallelism_from",
@@ -1430,14 +1468,16 @@ std::vector<int> batch_upsert_tensor_with_parallelism_from(
14301468
},
14311469
[this, &keys, &buffer_ptrs, &sizes, &config](
14321470
size_t i, const py::handle &parallelism) {
1471+
ReplicateConfig key_config = config.ForSingleKey(i);
14331472
return upsert_tensor_with_parallelism_from(
14341473
keys[i], buffer_ptrs[i], sizes[i],
1435-
py::reinterpret_borrow<py::object>(parallelism), config);
1474+
py::reinterpret_borrow<py::object>(parallelism), key_config);
14361475
},
14371476
[this, &keys, &buffer_ptrs, &sizes, &config](
14381477
size_t i, const py::handle &writer_partition) {
1478+
ReplicateConfig key_config = config.ForSingleKey(i);
14391479
return upsert_tensor_with_parallelism_from(
1440-
keys[i], buffer_ptrs[i], sizes[i], py::none(), config,
1480+
keys[i], buffer_ptrs[i], sizes[i], py::none(), key_config,
14411481
py::reinterpret_borrow<py::object>(writer_partition));
14421482
});
14431483
}

0 commit comments

Comments
 (0)