@@ -1546,7 +1546,7 @@ class ShardManager {
1546
1546
for (auto it = ranges.begin (); it != ranges.end (); ++it) {
1547
1547
if (it.value ()) {
1548
1548
if (it.value ()->physicalShard ->id == id) {
1549
- TraceEvent (SevError , " ShardedRocksDBAddRange" )
1549
+ TraceEvent (SevWarn , " ShardedRocksDBAddRange" )
1550
1550
.detail (" ErrorType" , " RangeAlreadyExist" )
1551
1551
.detail (" IntersectingRange" , it->range ())
1552
1552
.detail (" DataShardRange" , it->value ()->range )
@@ -1564,15 +1564,42 @@ class ShardManager {
1564
1564
}
1565
1565
}
1566
1566
1567
- auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1568
- auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1569
- std::shared_ptr<PhysicalShard>& shard = it->second ;
1567
+ auto it = physicalShards.find (id);
1568
+ std::shared_ptr<PhysicalShard> physicalShard = nullptr ;
1569
+ if (it != physicalShards.end ()) {
1570
+ physicalShard = it->second ;
1571
+ // TODO: consider coalescing continous key ranges.
1572
+ if (!SERVER_KNOBS->SHARDED_ROCKSDB_ALLOW_MULTIPLE_RANGES ) {
1573
+ bool continous = physicalShard->dataShards .empty ();
1574
+ std::string rangeStr = " " ;
1575
+ for (auto &[_, shard]: physicalShard->dataShards ) {
1576
+ rangeStr += shard->range .toString () + " , " ;
1577
+ if (shard->range .begin < range.begin && shard->range .end == range.begin ) {
1578
+ continous = true ;
1579
+ break ;
1580
+ }
1581
+ if (shard->range .begin > range.begin && range.end == shard->range .begin ) {
1582
+ continous = true ;
1583
+ break ;
1584
+ }
1585
+ }
1586
+ if (!continous) {
1587
+ // When multiple shards are merged into a single shard, the storage server might already own a piece
1588
+ // of the resulting shard. Because intra storage server move is disabled, the merge data move could
1589
+ // create multiple segments in a single physcial shard.
1590
+ TraceEvent (" AddMultipleRanges" ).detail (" NewRange" , range).detail (" OtherRanges" , rangeStr);
1591
+ }
1592
+ }
1593
+ } else {
1594
+ auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1595
+ auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1596
+ physicalShard = it->second ;
1597
+ }
1570
1598
1571
1599
activePhysicalShardIds.emplace (id);
1572
-
1573
- auto dataShard = std::make_unique<DataShard>(range, shard.get ());
1600
+ auto dataShard = std::make_unique<DataShard>(range, physicalShard.get ());
1574
1601
dataShardMap.insert (range, dataShard.get ());
1575
- shard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1602
+ physicalShard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1576
1603
1577
1604
validate ();
1578
1605
@@ -1581,7 +1608,7 @@ class ShardManager {
1581
1608
.detail (" ShardId" , id)
1582
1609
.detail (" Active" , active);
1583
1610
1584
- return shard .get ();
1611
+ return physicalShard .get ();
1585
1612
}
1586
1613
1587
1614
std::vector<std::string> removeRange (KeyRange range) {
@@ -1636,6 +1663,7 @@ class ShardManager {
1636
1663
1637
1664
// Range modification could result in more than one segments. Remove the original segment key here.
1638
1665
existingShard->dataShards .erase (shardRange.begin .toString ());
1666
+ int count = 0 ;
1639
1667
if (shardRange.begin < range.begin ) {
1640
1668
auto dataShard =
1641
1669
std::make_unique<DataShard>(KeyRange (KeyRangeRef (shardRange.begin , range.begin )), existingShard);
@@ -1646,6 +1674,7 @@ class ShardManager {
1646
1674
1647
1675
existingShard->dataShards [shardRange.begin .toString ()] = std::move (dataShard);
1648
1676
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1677
+ count++;
1649
1678
}
1650
1679
1651
1680
if (shardRange.end > range.end ) {
@@ -1658,6 +1687,17 @@ class ShardManager {
1658
1687
1659
1688
existingShard->dataShards [range.end .toString ()] = std::move (dataShard);
1660
1689
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1690
+ count++;
1691
+ }
1692
+
1693
+ if (count > 1 ) {
1694
+ // During shard split, a shard could be split into multiple key ranges. One of the key ranges will
1695
+ // remain on the storage server, other key ranges will be moved to new server. Depending on the order of
1696
+ // executing the split data moves, a shard could be break into multiple pieces. Eventually a single
1697
+ // continous key range will remain on the physical shard. Data consistency is guaranteed.
1698
+ //
1699
+ // For team based shard placement, we expect multiple data shards to be located on the same physical shard.
1700
+ TraceEvent (" RangeSplit" ).detail (" OriginalRange" , shardRange).detail (" RemovedRange" , range);
1661
1701
}
1662
1702
}
1663
1703
@@ -1986,28 +2026,37 @@ class ShardManager {
1986
2026
}
1987
2027
1988
2028
TraceEvent (SevVerbose, " ShardedRocksValidateShardManager" , this ->logId );
2029
+ int totalDataShards = 0 ;
2030
+ int expectedDataShards = 0 ;
1989
2031
for (auto s = dataShardMap.ranges ().begin (); s != dataShardMap.ranges ().end (); ++s) {
1990
2032
TraceEvent e (SevVerbose, " ShardedRocksValidateDataShardMap" , this ->logId );
1991
2033
e.detail (" Range" , s->range ());
1992
2034
const DataShard* shard = s->value ();
1993
2035
e.detail (" ShardAddress" , reinterpret_cast <std::uintptr_t >(shard));
1994
- if (shard != nullptr ) {
1995
- e.detail (" PhysicalShard" , shard->physicalShard ->id );
1996
- } else {
1997
- e.detail (" Shard" , " Empty" );
2036
+ if (shard == nullptr ) {
2037
+ e.detail (" RangeUnassigned" , " True" );
2038
+ continue ;
1998
2039
}
1999
- if (shard != nullptr ) {
2000
- if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2001
- TraceEvent (SevWarnAlways, " ShardRangeMismatch" ).detail (" Range" , s->range ());
2002
- }
2003
-
2004
- ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2005
- ASSERT (shard->physicalShard != nullptr );
2006
- auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2007
- ASSERT (it != shard->physicalShard ->dataShards .end ());
2008
- ASSERT (it->second .get () == shard);
2040
+ totalDataShards++;
2041
+ if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2042
+ TraceEvent (SevWarnAlways, " ShardRangeMismatch" )
2043
+ .detail (" Range" , s->range ())
2044
+ .detail (" DataShardRange" , shard->range )
2045
+ .detail (" PhysicalShardId" , shard->physicalShard ->id );
2009
2046
}
2047
+
2048
+ ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2049
+ ASSERT (shard->physicalShard != nullptr );
2050
+ auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2051
+ ASSERT (it != shard->physicalShard ->dataShards .end ());
2052
+ ASSERT (it->second .get () == shard);
2053
+ }
2054
+
2055
+ for (auto [shardId, physicalShard] : physicalShards) {
2056
+ ASSERT (physicalShard);
2057
+ expectedDataShards += physicalShard->dataShards .size ();
2010
2058
}
2059
+ ASSERT_EQ (expectedDataShards, totalDataShards);
2011
2060
}
2012
2061
2013
2062
private:
@@ -4403,6 +4452,81 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
4403
4452
return Void ();
4404
4453
}
4405
4454
4455
+ TEST_CASE (" noSim/ShardedRocksDBRangeOps/RemoveSplitRange" ) {
4456
+ state std::string rocksDBTestDir = " sharded-rocksdb-kvs-test-db" ;
4457
+ platform::eraseDirectoryRecursive (rocksDBTestDir);
4458
+
4459
+ state ShardedRocksDBKeyValueStore* rocksdbStore =
4460
+ new ShardedRocksDBKeyValueStore (rocksDBTestDir, deterministicRandom ()->randomUniqueID ());
4461
+ state IKeyValueStore* kvStore = rocksdbStore;
4462
+ wait (kvStore->init ());
4463
+
4464
+ // Add two ranges to the same shard.
4465
+ {
4466
+ std::vector<Future<Void>> addRangeFutures;
4467
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" a" _sr, " d" _sr), " shard-1" ));
4468
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" g" _sr, " n" _sr), " shard-1" ));
4469
+
4470
+ wait (waitForAll (addRangeFutures));
4471
+ }
4472
+
4473
+ state std::set<std::string> originalKeys = { " a" , " b" , " c" , " g" , " h" , " m" };
4474
+ state std::set<std::string> currentKeys = originalKeys;
4475
+ for (auto key : originalKeys) {
4476
+ kvStore->set (KeyValueRef (key, key));
4477
+ }
4478
+ wait (kvStore->commit ());
4479
+
4480
+ state std::string key;
4481
+ for (key : currentKeys) {
4482
+ Optional<Value> val = wait (kvStore->readValue (key));
4483
+ ASSERT (val.present ());
4484
+ ASSERT (val.get ().toString () == key);
4485
+ }
4486
+
4487
+ // Remove single range.
4488
+ std::vector<std::string> shardsToCleanUp;
4489
+ auto shardIds = kvStore->removeRange (KeyRangeRef (" b" _sr, " c" _sr));
4490
+ // Remove range didn't create empty shard.
4491
+ ASSERT_EQ (shardIds.size (), 0 );
4492
+
4493
+ currentKeys.erase (" b" );
4494
+ for (key : originalKeys) {
4495
+ Optional<Value> val = wait (kvStore->readValue (key));
4496
+ if (currentKeys.contains (key)) {
4497
+ ASSERT (val.present ());
4498
+ ASSERT (val.get ().toString () == key);
4499
+ } else {
4500
+ ASSERT (!val.present ());
4501
+ }
4502
+ }
4503
+
4504
+ // Remove range spanning on multple sub range.
4505
+ auto shardIds = kvStore->removeRange (KeyRangeRef (" c" _sr, " k" _sr));
4506
+ ASSERT (shardIds.empty ());
4507
+
4508
+ currentKeys.erase (" c" );
4509
+ currentKeys.erase (" g" );
4510
+ currentKeys.erase (" h" );
4511
+ for (key : originalKeys) {
4512
+ Optional<Value> val = wait (kvStore->readValue (key));
4513
+ if (currentKeys.contains (key)) {
4514
+ ASSERT (val.present ());
4515
+ ASSERT (val.get ().toString () == key);
4516
+ } else {
4517
+ ASSERT (!val.present ());
4518
+ }
4519
+ }
4520
+
4521
+ {
4522
+ Future<Void> closed = kvStore->onClosed ();
4523
+ kvStore->dispose ();
4524
+ wait (closed);
4525
+ }
4526
+ ASSERT (!directoryExists (rocksDBTestDir));
4527
+ return Void ();
4528
+ }
4529
+
4406
4530
TEST_CASE (" noSim/ShardedRocksDBCheckpoint/CheckpointBasic" ) {
4407
4531
state std::string rocksDBTestDir = " sharded-rocks-checkpoint-restore" ;
4408
4532
state std::map<Key, Value> kvs ({ { " a" _sr, " TestValueA" _sr },
@@ -4777,7 +4901,7 @@ ACTOR Future<Void> testWrites(IKeyValueStore* kvStore, int writeCount) {
4777
4901
state int i = 0 ;
4778
4902
4779
4903
while (i < writeCount) {
4780
- state int endCount = deterministicRandom ()->randomInt (i, i + 1000 );
4904
+ state int endCount = deterministicRandom ()->randomInt (i+ 1 , i + 1000 );
4781
4905
state std::string beginKey = format (" key-%6d" , i);
4782
4906
state std::string endKey = format (" key-%6d" , endCount);
4783
4907
wait (kvStore->addRange (KeyRangeRef (beginKey, endKey), deterministicRandom ()->randomUniqueID ().toString ()));
0 commit comments