@@ -1546,7 +1546,7 @@ class ShardManager {
1546
1546
for (auto it = ranges.begin (); it != ranges.end (); ++it) {
1547
1547
if (it.value ()) {
1548
1548
if (it.value ()->physicalShard ->id == id) {
1549
- TraceEvent (SevError , " ShardedRocksDBAddRange" )
1549
+ TraceEvent (SevWarn , " ShardedRocksDBAddRange" )
1550
1550
.detail (" ErrorType" , " RangeAlreadyExist" )
1551
1551
.detail (" IntersectingRange" , it->range ())
1552
1552
.detail (" DataShardRange" , it->value ()->range )
@@ -1564,15 +1564,34 @@ class ShardManager {
1564
1564
}
1565
1565
}
1566
1566
1567
- auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1568
- auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1569
- std::shared_ptr<PhysicalShard>& shard = it->second ;
1567
+ auto it = physicalShards.find (id);
1568
+ std::shared_ptr<PhysicalShard> physicalShard = nullptr ;
1569
+ if (it != physicalShards.end ()) {
1570
+ physicalShard = it->second ;
1571
+ if (SERVER_KNOBS->SHARDED_ROCKSDB_ALLOW_MULTIPLE_RANGES ) {
1572
+ bool continous = false ;
1573
+ for (auto &[_, shard]: physicalShard->dataShards ) {
1574
+ if (shard->range .begin < range.begin && shard->range .end == range.begin ) {
1575
+ continous = true ;
1576
+ break ;
1577
+ }
1578
+ if (shard->range .begin > range.begin && range.end == shard->range .begin ) {
1579
+ continous = true ;
1580
+ break ;
1581
+ }
1582
+ }
1583
+ ASSERT_WE_THINK (continous);
1584
+ }
1585
+ } else {
1586
+ auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1587
+ auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1588
+ physicalShard = it->second ;
1589
+ }
1570
1590
1571
1591
activePhysicalShardIds.emplace (id);
1572
-
1573
- auto dataShard = std::make_unique<DataShard>(range, shard.get ());
1592
+ auto dataShard = std::make_unique<DataShard>(range, physicalShard.get ());
1574
1593
dataShardMap.insert (range, dataShard.get ());
1575
- shard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1594
+ physicalShard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1576
1595
1577
1596
validate ();
1578
1597
@@ -1581,7 +1600,7 @@ class ShardManager {
1581
1600
.detail (" ShardId" , id)
1582
1601
.detail (" Active" , active);
1583
1602
1584
- return shard .get ();
1603
+ return physicalShard .get ();
1585
1604
}
1586
1605
1587
1606
std::vector<std::string> removeRange (KeyRange range) {
@@ -1636,6 +1655,7 @@ class ShardManager {
1636
1655
1637
1656
// Range modification could result in more than one segments. Remove the original segment key here.
1638
1657
existingShard->dataShards .erase (shardRange.begin .toString ());
1658
+ int count = 0 ;
1639
1659
if (shardRange.begin < range.begin ) {
1640
1660
auto dataShard =
1641
1661
std::make_unique<DataShard>(KeyRange (KeyRangeRef (shardRange.begin , range.begin )), existingShard);
@@ -1646,6 +1666,7 @@ class ShardManager {
1646
1666
1647
1667
existingShard->dataShards [shardRange.begin .toString ()] = std::move (dataShard);
1648
1668
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1669
+ count++;
1649
1670
}
1650
1671
1651
1672
if (shardRange.end > range.end ) {
@@ -1658,6 +1679,17 @@ class ShardManager {
1658
1679
1659
1680
existingShard->dataShards [range.end .toString ()] = std::move (dataShard);
1660
1681
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1682
+ count++;
1683
+ }
1684
+
1685
+ if (count > 1 ) {
1686
+ // During shard split, a shard could be split into multiple key ranges. One of the key ranges will
1687
+ // remain on the storage server, other key ranges will be moved to new server. Depending on the order of
1688
+ // executing the split data moves, a shard could be break into multiple pieces. Eventually a single
1689
+ // continous key range will remain on the physical shard. Data consistency is guaranteed.
1690
+ //
1691
+ // For team based shard placement, we expect multiple data shards to be located on the same physical shard.
1692
+ TraceEvent (" RangeSplit" ).detail (" OriginalRange" , shardRange).detail (" RemovedRange" , range);
1661
1693
}
1662
1694
}
1663
1695
@@ -1986,28 +2018,37 @@ class ShardManager {
1986
2018
}
1987
2019
1988
2020
TraceEvent (SevVerbose, " ShardedRocksValidateShardManager" , this ->logId );
2021
+ int totalDataShards = 0 ;
2022
+ int expectedDataShards = 0 ;
1989
2023
for (auto s = dataShardMap.ranges ().begin (); s != dataShardMap.ranges ().end (); ++s) {
1990
2024
TraceEvent e (SevVerbose, " ShardedRocksValidateDataShardMap" , this ->logId );
1991
2025
e.detail (" Range" , s->range ());
1992
2026
const DataShard* shard = s->value ();
1993
2027
e.detail (" ShardAddress" , reinterpret_cast <std::uintptr_t >(shard));
1994
- if (shard != nullptr ) {
1995
- e.detail (" PhysicalShard" , shard->physicalShard ->id );
1996
- } else {
1997
- e.detail (" Shard" , " Empty" );
2028
+ if (shard == nullptr ) {
2029
+ e.detail (" RangeUnassigned" , " True" );
2030
+ continue ;
1998
2031
}
1999
- if (shard != nullptr ) {
2000
- if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2001
- TraceEvent (SevWarnAlways, " ShardRangeMismatch" ).detail (" Range" , s->range ());
2002
- }
2003
-
2004
- ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2005
- ASSERT (shard->physicalShard != nullptr );
2006
- auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2007
- ASSERT (it != shard->physicalShard ->dataShards .end ());
2008
- ASSERT (it->second .get () == shard);
2032
+ totalDataShards++;
2033
+ if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2034
+ TraceEvent (SevWarnAlways, " ShardRangeMismatch" )
2035
+ .detail (" Range" , s->range ())
2036
+ .detail (" DataShardRange" , shard->range )
2037
+ .detail (" PhysicalShardId" , shard->physicalShard ->id );
2009
2038
}
2039
+
2040
+ ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2041
+ ASSERT (shard->physicalShard != nullptr );
2042
+ auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2043
+ ASSERT (it != shard->physicalShard ->dataShards .end ());
2044
+ ASSERT (it->second .get () == shard);
2010
2045
}
2046
+
2047
+ for (auto [shardId, physicalShard] : physicalShards) {
2048
+ ASSERT (physicalShard);
2049
+ expectedDataShards += physicalShard->dataShards .size ();
2050
+ }
2051
+ ASSERT_EQ (expectedDataShards, totalDataShards);
2011
2052
}
2012
2053
2013
2054
private:
@@ -4403,6 +4444,81 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
4403
4444
return Void ();
4404
4445
}
4405
4446
4447
+ TEST_CASE (" noSim/ShardedRocksDBRangeOps/RemoveSplitRange" ) {
4448
+ state std::string rocksDBTestDir = " sharded-rocksdb-kvs-test-db" ;
4449
+ platform::eraseDirectoryRecursive (rocksDBTestDir);
4450
+
4451
+ state ShardedRocksDBKeyValueStore* rocksdbStore =
4452
+ new ShardedRocksDBKeyValueStore (rocksDBTestDir, deterministicRandom ()->randomUniqueID ());
4453
+ state IKeyValueStore* kvStore = rocksdbStore;
4454
+ wait (kvStore->init ());
4455
+
4456
+ // Add two ranges to the same shard.
4457
+ {
4458
+ std::vector<Future<Void>> addRangeFutures;
4459
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" a" _sr, " d" _sr), " shard-1" ));
4460
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" g" _sr, " n" _sr), " shard-1" ));
4461
+
4462
+ wait (waitForAll (addRangeFutures));
4463
+ }
4464
+
4465
+ state std::set<std::string> originalKeys = { " a" , " b" , " c" , " g" , " h" , " m" };
4466
+ state std::set<std::string> currentKeys = originalKeys;
4467
+ for (auto key : originalKeys) {
4468
+ kvStore->set (KeyValueRef (key, key));
4469
+ }
4470
+ wait (kvStore->commit ());
4471
+
4472
+ state std::string key;
4473
+ for (key : currentKeys) {
4474
+ Optional<Value> val = wait (kvStore->readValue (key));
4475
+ ASSERT (val.present ());
4476
+ ASSERT (val.get ().toString () == key);
4477
+ }
4478
+
4479
+ // Remove single range.
4480
+ std::vector<std::string> shardsToCleanUp;
4481
+ auto shardIds = kvStore->removeRange (KeyRangeRef (" b" _sr, " c" _sr));
4482
+ // Remove range didn't create empty shard.
4483
+ ASSERT_EQ (shardIds.size (), 0 );
4484
+
4485
+ currentKeys.erase (" b" );
4486
+ for (key : originalKeys) {
4487
+ Optional<Value> val = wait (kvStore->readValue (key));
4488
+ if (currentKeys.contains (key)) {
4489
+ ASSERT (val.present ());
4490
+ ASSERT (val.get ().toString () == key);
4491
+ } else {
4492
+ ASSERT (!val.present ());
4493
+ }
4494
+ }
4495
+
4496
+ // Remove range spanning on multple sub range.
4497
+ auto shardIds = kvStore->removeRange (KeyRangeRef (" c" _sr, " k" _sr));
4498
+ ASSERT (shardIds.empty ());
4499
+
4500
+ currentKeys.erase (" c" );
4501
+ currentKeys.erase (" g" );
4502
+ currentKeys.erase (" h" );
4503
+ for (key : originalKeys) {
4504
+ Optional<Value> val = wait (kvStore->readValue (key));
4505
+ if (currentKeys.contains (key)) {
4506
+ ASSERT (val.present ());
4507
+ ASSERT (val.get ().toString () == key);
4508
+ } else {
4509
+ ASSERT (!val.present ());
4510
+ }
4511
+ }
4512
+
4513
+ {
4514
+ Future<Void> closed = kvStore->onClosed ();
4515
+ kvStore->dispose ();
4516
+ wait (closed);
4517
+ }
4518
+ ASSERT (!directoryExists (rocksDBTestDir));
4519
+ return Void ();
4520
+ }
4521
+
4406
4522
TEST_CASE (" noSim/ShardedRocksDBCheckpoint/CheckpointBasic" ) {
4407
4523
state std::string rocksDBTestDir = " sharded-rocks-checkpoint-restore" ;
4408
4524
state std::map<Key, Value> kvs ({ { " a" _sr, " TestValueA" _sr },
@@ -4777,7 +4893,7 @@ ACTOR Future<Void> testWrites(IKeyValueStore* kvStore, int writeCount) {
4777
4893
state int i = 0 ;
4778
4894
4779
4895
while (i < writeCount) {
4780
- state int endCount = deterministicRandom ()->randomInt (i, i + 1000 );
4896
+ state int endCount = deterministicRandom ()->randomInt (i+ 1 , i + 1000 );
4781
4897
state std::string beginKey = format (" key-%6d" , i);
4782
4898
state std::string endKey = format (" key-%6d" , endCount);
4783
4899
wait (kvStore->addRange (KeyRangeRef (beginKey, endKey), deterministicRandom ()->randomUniqueID ().toString ()));
0 commit comments