Skip to content

Commit 8dd98d1

Browse files
authored
[perf] Reduce directory lock conflicts during batch dumps in PosixStore (#711)
…ref (#707) Reduce directory lock conflicts during batch dumps in PosixStore, improve its dump performance. - Split the unified concurrency configuration item (`stream_number`) into two independent concurrency configurations (`posix_lookup_concurrency` and `posix_data_trans_concurrency`), allowing for individual adjustment of the `Lookup` concurrency. - The directory creation action during `Dump` is moved to the `Setup` stage, reducing metadata operations during `Dump`. - Write multiple files during the `Dump` process to different shard directories to reduce directory conflicts during concurrent file writes.
1 parent 7444a35 commit 8dd98d1

11 files changed

Lines changed: 106 additions & 43 deletions

ucm/store/posix/cc/global_config.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@ struct Config {
3636
size_t shardSize{0};
3737
size_t blockSize{0};
3838
bool ioDirect{false};
39-
size_t streamNumber{8};
39+
size_t dataTransConcurrency{8};
40+
size_t lookupConcurrency{8};
4041
size_t timeoutMs{30000};
41-
size_t dataDirShardBytes{4};
42+
size_t dataDirShardBytes{3};
4243
};
4344

4445
} // namespace UC::PosixStore

ucm/store/posix/cc/posix_store.cc

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,12 @@ class PosixStoreImpl {
6363
if (config.deviceId < -1) {
6464
return Status::InvalidParam("invalid device({})", config.deviceId);
6565
}
66-
if (config.streamNumber == 0) {
67-
return Status::InvalidParam("invalid stream number({})", config.streamNumber);
66+
if (config.dataTransConcurrency == 0 || config.lookupConcurrency == 0) {
67+
return Status::InvalidParam("invalid concurrency({},{})", config.dataTransConcurrency,
68+
config.lookupConcurrency);
69+
}
70+
if (config.dataDirShardBytes > 5) {
71+
return Status::InvalidParam("invalid shard bytes({})", config.dataDirShardBytes);
6872
}
6973
if (config.deviceId == -1) { return Status::OK(); }
7074
if (config.tensorSize == 0 || config.shardSize < config.tensorSize ||
@@ -87,7 +91,8 @@ class PosixStoreImpl {
8791
UC_INFO("Set {}::ShardSize to {}.", ns, config.shardSize);
8892
UC_INFO("Set {}::BlockSize to {}.", ns, config.blockSize);
8993
UC_INFO("Set {}::IoDirect to {}.", ns, config.ioDirect);
90-
UC_INFO("Set {}::StreamNumber to {}.", ns, config.streamNumber);
94+
UC_INFO("Set {}::DataTransConcurrency to {}.", ns, config.dataTransConcurrency);
95+
UC_INFO("Set {}::LookupConcurrency to {}.", ns, config.lookupConcurrency);
9196
UC_INFO("Set {}::TimeoutMs to {}.", ns, config.timeoutMs);
9297
UC_INFO("Set {}::DataDirShardBytes to {}.", ns, config.dataDirShardBytes);
9398
}

ucm/store/posix/cc/space_layout.cc

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,32 @@
3030
namespace UC::PosixStore {
3131

3232
static const std::string DATA_ROOT = "data";
33-
static const std::string TEMP_ROOT = ".temp";
33+
static const std::string ACTIVATED_FILE_EXTENSION = ".tmp";
3434

3535
inline std::string DataFileName(const Detail::BlockId& blockId)
3636
{
3737
return fmt::format("{:02x}", fmt::join(blockId, ""));
3838
}
3939

40+
std::vector<std::string> GenerateHexStrings(const size_t n)
41+
{
42+
if (n == 0) [[unlikely]] { return {}; }
43+
size_t nCombinations = 1ULL << (n * 4);
44+
std::vector<std::string> result;
45+
result.reserve(nCombinations);
46+
constexpr char hexChars[] = "0123456789abcdef";
47+
for (size_t i = 0; i < nCombinations; ++i) {
48+
std::string s(n, '0');
49+
auto temp = i;
50+
for (int j = n - 1; j >= 0; --j) {
51+
s[j] = hexChars[temp & 0xF];
52+
temp >>= 4;
53+
}
54+
result.push_back(s);
55+
}
56+
return result;
57+
}
58+
4059
Status SpaceLayout::Setup(const Config& config)
4160
{
4261
dataDirShardBytes_ = config.dataDirShardBytes;
@@ -52,33 +71,27 @@ std::string SpaceLayout::DataFilePath(const Detail::BlockId& blockId, bool activ
5271
{
5372
const auto& backend = StorageBackend(blockId);
5473
const auto& file = DataFileName(blockId);
55-
const auto& shard = activated ? TEMP_ROOT : (dataDirShard_ ? FileShardName(file) : DATA_ROOT);
56-
return fmt::format("{}{}/{}", backend, shard, file);
74+
const auto& shard = dataDirShard_ ? FileShardName(file) : DATA_ROOT;
75+
if (!activated) { return fmt::format("{}{}/{}", backend, shard, file); }
76+
return fmt::format("{}{}/{}{}", backend, shard, file, ACTIVATED_FILE_EXTENSION);
5777
}
5878

5979
Status SpaceLayout::CommitFile(const Detail::BlockId& blockId, bool success) const
6080
{
61-
const auto& backend = StorageBackend(blockId);
62-
const auto& file = DataFileName(blockId);
63-
const auto& activated = fmt::format("{}{}/{}", backend, TEMP_ROOT, file);
81+
const auto& activated = DataFilePath(blockId, true);
6482
auto s = Status::OK();
6583
if (success) {
66-
const auto& shard = dataDirShard_ ? FileShardName(file) : DATA_ROOT;
67-
const auto& archived = fmt::format("{}{}/{}", backend, shard, file);
68-
if (dataDirShard_) { s = PosixFile{backend + shard}.MkDir(); }
69-
if (s == Status::OK() || s == Status::DuplicateKey()) {
70-
s = PosixFile{activated}.Rename(archived);
71-
}
84+
const auto& archived = DataFilePath(blockId, false);
85+
s = PosixFile{activated}.Rename(archived);
7286
}
7387
if (!success || s.Failure()) { PosixFile{activated}.Remove(); }
7488
return s;
7589
}
7690

7791
std::vector<std::string> SpaceLayout::RelativeRoots() const
7892
{
79-
std::vector<std::string> roots{TEMP_ROOT};
80-
if (!dataDirShard_) { roots.push_back(DATA_ROOT); }
81-
return roots;
93+
if (dataDirShard_) { return GenerateHexStrings(dataDirShardBytes_); }
94+
return {DATA_ROOT};
8295
}
8396

8497
Status SpaceLayout::AddStorageBackend(const std::string& path)

ucm/store/posix/cc/space_manager.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Status SpaceManager::Setup(const Config& config)
3535
lookupSrv_.SetWorkerFn([this](LookupContext& ctx, auto&) { OnLookup(ctx); })
3636
.SetWorkerTimeoutFn([this](LookupContext& ctx, auto) { OnLookupTimeout(ctx); },
3737
config.timeoutMs)
38-
.SetNWorker(config.streamNumber)
38+
.SetNWorker(config.lookupConcurrency)
3939
.Run();
4040
if (!success) [[unlikely]] { return Status::Error("failed to run lookup service thread pool"); }
4141
return Status::OK();

ucm/store/posix/cc/trans_queue.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ Status TransQueue::Setup(const Config& config, TaskIdSet* failureSet, const Spac
3535
shardSize_ = config.shardSize;
3636
nShardPerBlock_ = config.blockSize / config.shardSize;
3737
ioDirect_ = config.ioDirect;
38-
auto success = pool_.SetNWorker(config.streamNumber)
38+
auto success = pool_.SetNWorker(config.dataTransConcurrency)
3939
.SetWorkerFn([this](auto& ios, auto&) { Worker(ios); })
4040
.Run();
4141
if (!success) [[unlikely]] {
42-
return Status::Error(fmt::format("workers({}) start failed", config.streamNumber));
42+
return Status::Error(fmt::format("workers({}) start failed", config.dataTransConcurrency));
4343
}
4444
return Status::OK();
4545
}

ucm/store/posix/connector.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ def __init__(self, config: Dict[str, object]) -> None:
4848
"shard_size": "shardSize",
4949
"block_size": "blockSize",
5050
"io_direct": "ioDirect",
51-
"stream_number": "streamNumber",
51+
"posix_data_trans_concurrency": "dataTransConcurrency",
52+
"posix_lookup_concurrency": "lookupConcurrency",
5253
"timeout_ms": "timeoutMs",
5354
"data_dir_shard_bytes": "dataDirShardBytes",
5455
}
@@ -154,7 +155,7 @@ def check(self, task: Task) -> bool:
154155
config["shard_size"] = block_size
155156
config["block_size"] = block_size
156157
config["io_direct"] = True
157-
config["stream_number"] = 16
158+
config["posix_data_trans_concurrency"] = 16
158159
store = UcmPosixStore(config)
159160
block_num = 1024
160161
block_ids = [secrets.token_bytes(16) for _ in range(block_num)]

ucm/store/posix/cpy/posix_store.py.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ PYBIND11_MODULE(ucmposixstore, module)
4242
config.def_readwrite("shardSize", &Config::shardSize);
4343
config.def_readwrite("blockSize", &Config::blockSize);
4444
config.def_readwrite("ioDirect", &Config::ioDirect);
45-
config.def_readwrite("streamNumber", &Config::streamNumber);
45+
config.def_readwrite("dataTransConcurrency", &Config::dataTransConcurrency);
46+
config.def_readwrite("lookupConcurrency", &Config::lookupConcurrency);
4647
config.def_readwrite("timeoutMs", &Config::timeoutMs);
4748
config.def_readwrite("dataDirShardBytes", &Config::dataDirShardBytes);
4849
store.def(py::init<>());

ucm/store/test/case/posix/posix_space_manager_test.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ TEST_F(UCPosixSpaceManagerTest, DataFilePath)
7272
ASSERT_EQ(s, UC::Status::OK());
7373
auto blockId = UC::Test::Detail::TypesHelper::MakeBlockId("a1b2c3d4e5f6789012345678901234ab");
7474
auto activated = spaceMgr.GetLayout()->DataFilePath(blockId, true);
75-
ASSERT_EQ(activated, fmt::format("{}.temp/{:02x}", this->Path(), fmt::join(blockId, "")));
75+
ASSERT_EQ(activated, fmt::format("{}data/{:02x}.tmp", this->Path(), fmt::join(blockId, "")));
7676
ASSERT_EQ(PosixFile{activated}.Access(PosixFile::AccessMode::EXIST), UC::Status::NotFound());
7777
ASSERT_EQ(PosixFile{activated}.Open(PosixFile::OpenFlag::CREATE), UC::Status::OK());
7878
ASSERT_EQ(PosixFile{activated}.Access(PosixFile::AccessMode::EXIST), UC::Status::OK());
@@ -95,8 +95,10 @@ TEST_F(UCPosixSpaceManagerTest, ShardFilePath)
9595
auto s = spaceMgr.Setup(config);
9696
ASSERT_EQ(s, UC::Status::OK());
9797
auto blockId = UC::Test::Detail::TypesHelper::MakeBlockIdRandomly();
98+
const auto& file = fmt::format("{:02x}", fmt::join(blockId, ""));
99+
const auto& shard = file.substr(0, config.dataDirShardBytes);
98100
auto activated = spaceMgr.GetLayout()->DataFilePath(blockId, true);
99-
ASSERT_EQ(activated, fmt::format("{}.temp/{:02x}", this->Path(), fmt::join(blockId, "")));
101+
ASSERT_EQ(activated, fmt::format("{}{}/{}.tmp", this->Path(), shard, file));
100102
ASSERT_EQ(PosixFile{activated}.Access(PosixFile::AccessMode::EXIST), UC::Status::NotFound());
101103
ASSERT_EQ(PosixFile{activated}.Open(PosixFile::OpenFlag::CREATE), UC::Status::OK());
102104
ASSERT_EQ(PosixFile{activated}.Access(PosixFile::AccessMode::EXIST), UC::Status::OK());
@@ -105,8 +107,6 @@ TEST_F(UCPosixSpaceManagerTest, ShardFilePath)
105107
ASSERT_EQ(spaceMgr.Lookup(&blockId, 1).Value(), std::vector<uint8_t>{true});
106108
ASSERT_EQ(PosixFile{activated}.Access(PosixFile::AccessMode::EXIST), UC::Status::NotFound());
107109
auto archived = spaceMgr.GetLayout()->DataFilePath(blockId, false);
108-
const auto& file = fmt::format("{:02x}", fmt::join(blockId, ""));
109-
const auto& shard = file.substr(0, config.dataDirShardBytes);
110110
ASSERT_EQ(archived, fmt::format("{}{}/{}", this->Path(), shard, file));
111111
ASSERT_EQ(PosixFile{archived}.Access(PosixFile::AccessMode::EXIST), UC::Status::OK());
112112
}

ucm/store/test/case/posix/posix_store_test.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ TEST_F(UCPosixStoreTest, SetupWithInvalidParam)
5050
config.tensorSize = 4096;
5151
config.shardSize = config.tensorSize;
5252
config.blockSize = config.shardSize;
53-
config.streamNumber = 0;
53+
config.dataTransConcurrency = 0;
5454
PosixStore store;
5555
ASSERT_EQ(store.Setup(config), UC::Status::InvalidParam());
5656
}

ucm/store/test/e2e/cache_on_posix_test.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,19 @@ def e2e_test(
5353
request_size: int,
5454
device_id: int,
5555
):
56+
# make block id randomly
5657
chunk_block_ids = [secrets.token_bytes(16) for _ in range(request_size)]
58+
# fully lookup at 0% hit
59+
tp = time.perf_counter()
5760
founds = scheduler.lookup(chunk_block_ids)
61+
cost_fully_lookup1 = time.perf_counter() - tp
5862
assert not any(founds)
59-
assert scheduler.lookup_on_prefix(chunk_block_ids) == -1
63+
# prefix lookup at 0% hit
64+
tp = time.perf_counter()
65+
found_idx = scheduler.lookup_on_prefix(chunk_block_ids)
66+
cost_prefix_lookup1 = time.perf_counter() - tp
67+
assert found_idx == -1
68+
# make tensor randomly
6069
shard_indexes = [0 for _ in range(request_size)]
6170
src_tensors = [
6271
[
@@ -69,15 +78,43 @@ def e2e_test(
6978
]
7079
for _ in range(request_size)
7180
]
81+
# dump data to store
82+
tp = time.perf_counter()
7283
task = worker.dump(chunk_block_ids, shard_indexes, src_tensors)
7384
worker.wait(task)
85+
cost_dump = time.perf_counter() - tp
86+
# fully lookup at 100% hit
87+
tp = time.perf_counter()
7488
founds = scheduler.lookup(chunk_block_ids)
89+
cost_fully_lookup2 = time.perf_counter() - tp
7590
assert all(founds)
76-
assert scheduler.lookup_on_prefix(chunk_block_ids) + 1 == request_size
91+
# prefix lookup at 100% hit
92+
tp = time.perf_counter()
93+
found_idx = scheduler.lookup_on_prefix(chunk_block_ids)
94+
cost_prefix_lookup2 = time.perf_counter() - tp
95+
assert found_idx + 1 == request_size
96+
# make tensor buffer for fetching
7797
dst_tensors = [[torch.empty_like(t) for t in row] for row in src_tensors]
98+
# fetch data from store
99+
tp = time.perf_counter()
78100
task = worker.load(chunk_block_ids, shard_indexes, dst_tensors)
79101
worker.wait(task)
102+
cost_load = time.perf_counter() - tp
103+
# compare data
80104
cmp_and_print_diff(src_tensors, dst_tensors)
105+
# show cost
106+
data_size = tensor_size * layer_size * chunk_size * request_size
107+
bw_dump = data_size / cost_dump
108+
bw_load = data_size / cost_load
109+
print(
110+
f"[{tensor_size}-{layer_size}-{chunk_size}-{request_size}] "
111+
f"fully_lookup1={cost_fully_lookup1 * 1e3:.3f}ms, "
112+
f"prefix_lookup1={cost_prefix_lookup1 * 1e3:.3f}ms, "
113+
f"fully_lookup2={cost_fully_lookup2 * 1e3:.3f}ms, "
114+
f"prefix_lookup2={cost_prefix_lookup2 * 1e3:.3f}ms, "
115+
f"dump={cost_dump * 1e3:.3f}ms, load={cost_load * 1e3:.3f}ms, "
116+
f"bw_dump={bw_dump / 1e9:.3f}GB/s, bw_load={bw_load / 1e9:.3f}GB/s."
117+
)
81118

82119

83120
def main():
@@ -100,7 +137,8 @@ def main():
100137
config["waiting_queue_depth"] = 16
101138
config["running_queue_depth"] = 1024
102139
config["io_direct"] = True
103-
config["stream_number"] = 16
140+
config["posix_data_trans_concurrency"] = 32
141+
config["posix_lookup_concurrency"] = 32
104142
worker = UcmPipelineStore(config | {"device_id": device_id})
105143
scheduler = UcmPipelineStore(config)
106144
test_batch_number = 512
@@ -118,5 +156,5 @@ def main():
118156

119157

120158
if __name__ == "__main__":
121-
os.environ["UC_LOGGER_LEVEL"] = "debug"
159+
os.environ["UC_LOGGER_LEVEL"] = "info"
122160
main()

0 commit comments

Comments
 (0)