Skip to content

Commit 76ea931

Browse files
Support multiple range
1 parent f4cf27e commit 76ea931

File tree

5 files changed

+157
-27
lines changed

5 files changed

+157
-27
lines changed

fdbclient/ServerKnobs.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
623623
// Block cache key-value checksum. Checksum is validated during read, so has non-trivial impact on read performance.
624624
init( ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
625625
init( ROCKSDB_ENABLE_NONDETERMINISM, false );
626+
init( SHARDED_ROCKSDB_ALLOW_MULTIPLE_RANGES, false );
626627
init( SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH, false );
627628
init( SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO, 0.01 ); if (isSimulated) SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO = deterministicRandom()->random01();
628629
init( SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB

fdbclient/include/fdbclient/ServerKnobs.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -607,6 +607,7 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
607607
// Note that turning this on in simulation could lead to non-deterministic runs
608608
// since we rely on rocksdb metadata. This knob also applies to sharded rocks
609609
// storage engine.
610+
bool SHARDED_ROCKSDB_ALLOW_MULTIPLE_RANGES;
610611
bool SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH;
611612
int SHARDED_ROCKSDB_MEMTABLE_MAX_RANGE_DELETIONS;
612613
double SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO;

fdbserver/KeyValueStoreShardedRocksDB.actor.cpp

Lines changed: 147 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,7 +1546,7 @@ class ShardManager {
15461546
for (auto it = ranges.begin(); it != ranges.end(); ++it) {
15471547
if (it.value()) {
15481548
if (it.value()->physicalShard->id == id) {
1549-
TraceEvent(SevError, "ShardedRocksDBAddRange")
1549+
TraceEvent(SevWarn, "ShardedRocksDBAddRange")
15501550
.detail("ErrorType", "RangeAlreadyExist")
15511551
.detail("IntersectingRange", it->range())
15521552
.detail("DataShardRange", it->value()->range)
@@ -1564,15 +1564,42 @@ class ShardManager {
15641564
}
15651565
}
15661566

1567-
auto currentCfOptions = active ? rState->getCFOptions() : rState->getCFOptionsForInactiveShard();
1568-
auto [it, inserted] = physicalShards.emplace(id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1569-
std::shared_ptr<PhysicalShard>& shard = it->second;
1567+
auto it = physicalShards.find(id);
1568+
std::shared_ptr<PhysicalShard> physicalShard = nullptr;
1569+
if (it != physicalShards.end()) {
1570+
physicalShard = it->second;
1571+
// TODO: consider coalescing continous key ranges.
1572+
if (!SERVER_KNOBS->SHARDED_ROCKSDB_ALLOW_MULTIPLE_RANGES) {
1573+
bool continous = physicalShard->dataShards.empty();
1574+
std::string rangeStr = "";
1575+
for (auto&[_, shard]: physicalShard->dataShards) {
1576+
rangeStr += shard->range.toString() + ", ";
1577+
if (shard->range.begin < range.begin && shard->range.end == range.begin) {
1578+
continous = true;
1579+
break;
1580+
}
1581+
if (shard->range.begin > range.begin && range.end == shard->range.begin) {
1582+
continous = true;
1583+
break;
1584+
}
1585+
}
1586+
if (!continous) {
1587+
// When multiple shards are merged into a single shard, the storage server might already own a piece
1588+
// of the resulting shard. Because intra storage server move is disabled, the merge data move could
1589+
// create multiple segments in a single physcial shard.
1590+
TraceEvent("AddMultipleRanges").detail("NewRange", range).detail("OtherRanges", rangeStr).setMaxFieldLength(1000);
1591+
}
1592+
}
1593+
} else {
1594+
auto currentCfOptions = active ? rState->getCFOptions() : rState->getCFOptionsForInactiveShard();
1595+
auto [it, inserted] = physicalShards.emplace(id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1596+
physicalShard = it->second;
1597+
}
15701598

15711599
activePhysicalShardIds.emplace(id);
1572-
1573-
auto dataShard = std::make_unique<DataShard>(range, shard.get());
1600+
auto dataShard = std::make_unique<DataShard>(range, physicalShard.get());
15741601
dataShardMap.insert(range, dataShard.get());
1575-
shard->dataShards[range.begin.toString()] = std::move(dataShard);
1602+
physicalShard->dataShards[range.begin.toString()] = std::move(dataShard);
15761603

15771604
validate();
15781605

@@ -1581,7 +1608,7 @@ class ShardManager {
15811608
.detail("ShardId", id)
15821609
.detail("Active", active);
15831610

1584-
return shard.get();
1611+
return physicalShard.get();
15851612
}
15861613

15871614
std::vector<std::string> removeRange(KeyRange range) {
@@ -1636,6 +1663,7 @@ class ShardManager {
16361663

16371664
// Range modification could result in more than one segments. Remove the original segment key here.
16381665
existingShard->dataShards.erase(shardRange.begin.toString());
1666+
int count = 0;
16391667
if (shardRange.begin < range.begin) {
16401668
auto dataShard =
16411669
std::make_unique<DataShard>(KeyRange(KeyRangeRef(shardRange.begin, range.begin)), existingShard);
@@ -1646,6 +1674,7 @@ class ShardManager {
16461674

16471675
existingShard->dataShards[shardRange.begin.toString()] = std::move(dataShard);
16481676
logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1677+
count++;
16491678
}
16501679

16511680
if (shardRange.end > range.end) {
@@ -1658,6 +1687,17 @@ class ShardManager {
16581687

16591688
existingShard->dataShards[range.end.toString()] = std::move(dataShard);
16601689
logShardEvent(existingShard->id, shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1690+
count++;
1691+
}
1692+
1693+
if (count > 1) {
1694+
// During shard split, a shard could be split into multiple key ranges. One of the key ranges will
1695+
// remain on the storage server, other key ranges will be moved to new server. Depending on the order of
1696+
// executing the split data moves, a shard could be break into multiple pieces. Eventually a single
1697+
// continous key range will remain on the physical shard. Data consistency is guaranteed.
1698+
//
1699+
// For team based shard placement, we expect multiple data shards to be located on the same physical shard.
1700+
TraceEvent("RangeSplit").detail("OriginalRange", shardRange).detail("RemovedRange", range);
16611701
}
16621702
}
16631703

@@ -1986,28 +2026,37 @@ class ShardManager {
19862026
}
19872027

19882028
TraceEvent(SevVerbose, "ShardedRocksValidateShardManager", this->logId);
2029+
int totalDataShards = 0;
2030+
int expectedDataShards = 0;
19892031
for (auto s = dataShardMap.ranges().begin(); s != dataShardMap.ranges().end(); ++s) {
19902032
TraceEvent e(SevVerbose, "ShardedRocksValidateDataShardMap", this->logId);
19912033
e.detail("Range", s->range());
19922034
const DataShard* shard = s->value();
19932035
e.detail("ShardAddress", reinterpret_cast<std::uintptr_t>(shard));
1994-
if (shard != nullptr) {
1995-
e.detail("PhysicalShard", shard->physicalShard->id);
1996-
} else {
1997-
e.detail("Shard", "Empty");
2036+
if (shard == nullptr) {
2037+
e.detail("RangeUnassigned", "True");
2038+
continue;
19982039
}
1999-
if (shard != nullptr) {
2000-
if (shard->range != static_cast<KeyRangeRef>(s->range())) {
2001-
TraceEvent(SevWarnAlways, "ShardRangeMismatch").detail("Range", s->range());
2002-
}
2003-
2004-
ASSERT(shard->range == static_cast<KeyRangeRef>(s->range()));
2005-
ASSERT(shard->physicalShard != nullptr);
2006-
auto it = shard->physicalShard->dataShards.find(shard->range.begin.toString());
2007-
ASSERT(it != shard->physicalShard->dataShards.end());
2008-
ASSERT(it->second.get() == shard);
2040+
totalDataShards++;
2041+
if (shard->range != static_cast<KeyRangeRef>(s->range())) {
2042+
TraceEvent(SevWarnAlways, "ShardRangeMismatch")
2043+
.detail("Range", s->range())
2044+
.detail("DataShardRange", shard->range)
2045+
.detail("PhysicalShardId", shard->physicalShard->id);
20092046
}
2047+
2048+
ASSERT(shard->range == static_cast<KeyRangeRef>(s->range()));
2049+
ASSERT(shard->physicalShard != nullptr);
2050+
auto it = shard->physicalShard->dataShards.find(shard->range.begin.toString());
2051+
ASSERT(it != shard->physicalShard->dataShards.end());
2052+
ASSERT(it->second.get() == shard);
2053+
}
2054+
2055+
for (auto [shardId, physicalShard] : physicalShards) {
2056+
ASSERT(physicalShard);
2057+
expectedDataShards += physicalShard->dataShards.size();
20102058
}
2059+
ASSERT_EQ(expectedDataShards, totalDataShards);
20112060
}
20122061

20132062
private:
@@ -4403,6 +4452,81 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
44034452
return Void();
44044453
}
44054454

4455+
TEST_CASE("noSim/ShardedRocksDBRangeOps/RemoveSplitRange") {
4456+
state std::string rocksDBTestDir = "sharded-rocksdb-kvs-test-db";
4457+
platform::eraseDirectoryRecursive(rocksDBTestDir);
4458+
4459+
state ShardedRocksDBKeyValueStore* rocksdbStore =
4460+
new ShardedRocksDBKeyValueStore(rocksDBTestDir, deterministicRandom()->randomUniqueID());
4461+
state IKeyValueStore* kvStore = rocksdbStore;
4462+
wait(kvStore->init());
4463+
4464+
// Add two ranges to the same shard.
4465+
{
4466+
std::vector<Future<Void>> addRangeFutures;
4467+
addRangeFutures.push_back(kvStore->addRange(KeyRangeRef("a"_sr, "d"_sr), "shard-1"));
4468+
addRangeFutures.push_back(kvStore->addRange(KeyRangeRef("g"_sr, "n"_sr), "shard-1"));
4469+
4470+
wait(waitForAll(addRangeFutures));
4471+
}
4472+
4473+
state std::set<std::string> originalKeys = { "a", "b", "c", "g", "h", "m" };
4474+
state std::set<std::string> currentKeys = originalKeys;
4475+
for (auto key : originalKeys) {
4476+
kvStore->set(KeyValueRef(key, key));
4477+
}
4478+
wait(kvStore->commit());
4479+
4480+
state std::string key;
4481+
for (key : currentKeys) {
4482+
Optional<Value> val = wait(kvStore->readValue(key));
4483+
ASSERT(val.present());
4484+
ASSERT(val.get().toString() == key);
4485+
}
4486+
4487+
// Remove single range.
4488+
std::vector<std::string> shardsToCleanUp;
4489+
auto shardIds = kvStore->removeRange(KeyRangeRef("b"_sr, "c"_sr));
4490+
// Remove range didn't create empty shard.
4491+
ASSERT_EQ(shardIds.size(), 0);
4492+
4493+
currentKeys.erase("b");
4494+
for (key : originalKeys) {
4495+
Optional<Value> val = wait(kvStore->readValue(key));
4496+
if (currentKeys.contains(key)) {
4497+
ASSERT(val.present());
4498+
ASSERT(val.get().toString() == key);
4499+
} else {
4500+
ASSERT(!val.present());
4501+
}
4502+
}
4503+
4504+
// Remove range spanning on multple sub range.
4505+
auto shardIds = kvStore->removeRange(KeyRangeRef("c"_sr, "k"_sr));
4506+
ASSERT(shardIds.empty());
4507+
4508+
currentKeys.erase("c");
4509+
currentKeys.erase("g");
4510+
currentKeys.erase("h");
4511+
for (key : originalKeys) {
4512+
Optional<Value> val = wait(kvStore->readValue(key));
4513+
if (currentKeys.contains(key)) {
4514+
ASSERT(val.present());
4515+
ASSERT(val.get().toString() == key);
4516+
} else {
4517+
ASSERT(!val.present());
4518+
}
4519+
}
4520+
4521+
{
4522+
Future<Void> closed = kvStore->onClosed();
4523+
kvStore->dispose();
4524+
wait(closed);
4525+
}
4526+
ASSERT(!directoryExists(rocksDBTestDir));
4527+
return Void();
4528+
}
4529+
44064530
TEST_CASE("noSim/ShardedRocksDBCheckpoint/CheckpointBasic") {
44074531
state std::string rocksDBTestDir = "sharded-rocks-checkpoint-restore";
44084532
state std::map<Key, Value> kvs({ { "a"_sr, "TestValueA"_sr },
@@ -4777,7 +4901,7 @@ ACTOR Future<Void> testWrites(IKeyValueStore* kvStore, int writeCount) {
47774901
state int i = 0;
47784902

47794903
while (i < writeCount) {
4780-
state int endCount = deterministicRandom()->randomInt(i, i + 1000);
4904+
state int endCount = deterministicRandom()->randomInt(i+1, i + 1000);
47814905
state std::string beginKey = format("key-%6d", i);
47824906
state std::string endKey = format("key-%6d", endCount);
47834907
wait(kvStore->addRange(KeyRangeRef(beginKey, endKey), deterministicRandom()->randomUniqueID().toString()));

fdbserver/storageserver.actor.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11146,7 +11146,8 @@ void changeServerKeysWithPhysicalShards(StorageServer* data,
1114611146
.detail("Range", keys)
1114711147
.detail("NowAssigned", nowAssigned)
1114811148
.detail("Version", cVer)
11149-
.detail("ResultingShard", newShard.toString());
11149+
.detail("ResultingShard", newShard.toString())
11150+
.detail("ShardIdDifferent", newShard.id != newShard.desiredId);
1115011151
} else if (currentShard->adding) {
1115111152
if (nowAssigned) {
1115211153
TraceEvent(sev, "PhysicalShardStateError")
@@ -11246,9 +11247,10 @@ void changeServerKeysWithPhysicalShards(StorageServer* data,
1124611247
.detail("NewShard", updatedShards.back().toString());
1124711248
} else if (!dataAvailable) {
1124811249
if (version == data->initialClusterVersion - 1) {
11249-
TraceEvent(sevDm, "CSKWithPhysicalShardsSeedRange", data->thisServerID)
11250+
TraceEvent(SevInfo, "CSKWithPhysicalShardsSeedRange", data->thisServerID)
1125011251
.detail("ShardID", desiredId)
11251-
.detail("Range", range);
11252+
.detail("Range", range)
11253+
.detail("Version", cVer);
1125211254
changeNewestAvailable.emplace_back(range, latestVersion);
1125311255
updatedShards.push_back(
1125411256
StorageServerShard(range, version, desiredId, desiredId, StorageServerShard::ReadWrite));

tests/noSim/ShardedRocksDBTest.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
rocksdb_disable_auto_compactions = true
33
rocksdb_suggest_compact_clear_range = false
44
rocksdb_empty_range_check = false
5+
sharded_rocksdb_validate_mapping_ratio=1.0
6+
sharded_rocksdb_allow_multiple_ranges=true
57

68
[[test]]
79
testTitle = 'UnitTests'
@@ -11,4 +13,4 @@ rocksdb_empty_range_check = false
1113
[[test.workload]]
1214
testName = 'UnitTests'
1315
maxTestCases = 10
14-
testsMatching = 'noSim/ShardedRocksDB/'
16+
testsMatching = 'noSim/ShardedRocksDB/'

0 commit comments

Comments
 (0)