@@ -1546,7 +1546,7 @@ class ShardManager {
1546
1546
for (auto it = ranges.begin (); it != ranges.end (); ++it) {
1547
1547
if (it.value ()) {
1548
1548
if (it.value ()->physicalShard ->id == id) {
1549
- TraceEvent (SevError , " ShardedRocksDBAddRange" )
1549
+ TraceEvent (SevWarn , " ShardedRocksDBAddRange" )
1550
1550
.detail (" ErrorType" , " RangeAlreadyExist" )
1551
1551
.detail (" IntersectingRange" , it->range ())
1552
1552
.detail (" DataShardRange" , it->value ()->range )
@@ -1564,15 +1564,45 @@ class ShardManager {
1564
1564
}
1565
1565
}
1566
1566
1567
- auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1568
- auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1569
- std::shared_ptr<PhysicalShard>& shard = it->second ;
1567
+ auto it = physicalShards.find (id);
1568
+ std::shared_ptr<PhysicalShard> physicalShard = nullptr ;
1569
+ if (it != physicalShards.end ()) {
1570
+ physicalShard = it->second ;
1571
+ // TODO: consider coalescing continous key ranges.
1572
+ if (!SERVER_KNOBS->SHARDED_ROCKSDB_ALLOW_MULTIPLE_RANGES ) {
1573
+ bool continous = physicalShard->dataShards .empty ();
1574
+ std::string rangeStr = " " ;
1575
+ for (auto &[_, shard]: physicalShard->dataShards ) {
1576
+ rangeStr += shard->range .toString () + " , " ;
1577
+ if (shard->range .begin < range.begin && shard->range .end == range.begin ) {
1578
+ continous = true ;
1579
+ break ;
1580
+ }
1581
+ if (shard->range .begin > range.begin && range.end == shard->range .begin ) {
1582
+ continous = true ;
1583
+ break ;
1584
+ }
1585
+ }
1586
+ if (!continous) {
1587
+ // When multiple shards are merged into a single shard, the storage server might already own a piece
1588
+ // of the resulting shard. Because intra storage server move is disabled, the merge data move could
1589
+ // create multiple segments in a single physcial shard.
1590
+ TraceEvent (" AddMultipleRanges" )
1591
+ .detail (" NewRange" , range)
1592
+ .detail (" OtherRanges" , rangeStr)
1593
+ .setMaxFieldLength (1000 );
1594
+ }
1595
+ }
1596
+ } else {
1597
+ auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1598
+ auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1599
+ physicalShard = it->second ;
1600
+ }
1570
1601
1571
1602
activePhysicalShardIds.emplace (id);
1572
-
1573
- auto dataShard = std::make_unique<DataShard>(range, shard.get ());
1603
+ auto dataShard = std::make_unique<DataShard>(range, physicalShard.get ());
1574
1604
dataShardMap.insert (range, dataShard.get ());
1575
- shard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1605
+ physicalShard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1576
1606
1577
1607
validate ();
1578
1608
@@ -1581,7 +1611,7 @@ class ShardManager {
1581
1611
.detail (" ShardId" , id)
1582
1612
.detail (" Active" , active);
1583
1613
1584
- return shard .get ();
1614
+ return physicalShard .get ();
1585
1615
}
1586
1616
1587
1617
std::vector<std::string> removeRange (KeyRange range) {
@@ -1636,6 +1666,7 @@ class ShardManager {
1636
1666
1637
1667
// Range modification could result in more than one segments. Remove the original segment key here.
1638
1668
existingShard->dataShards .erase (shardRange.begin .toString ());
1669
+ int count = 0 ;
1639
1670
if (shardRange.begin < range.begin ) {
1640
1671
auto dataShard =
1641
1672
std::make_unique<DataShard>(KeyRange (KeyRangeRef (shardRange.begin , range.begin )), existingShard);
@@ -1646,6 +1677,7 @@ class ShardManager {
1646
1677
1647
1678
existingShard->dataShards [shardRange.begin .toString ()] = std::move (dataShard);
1648
1679
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1680
+ count++;
1649
1681
}
1650
1682
1651
1683
if (shardRange.end > range.end ) {
@@ -1658,6 +1690,17 @@ class ShardManager {
1658
1690
1659
1691
existingShard->dataShards [range.end .toString ()] = std::move (dataShard);
1660
1692
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1693
+ count++;
1694
+ }
1695
+
1696
+ if (count > 1 ) {
1697
+ // During shard split, a shard could be split into multiple key ranges. One of the key ranges will
1698
+ // remain on the storage server, other key ranges will be moved to new server. Depending on the order of
1699
+ // executing the split data moves, a shard could be break into multiple pieces. Eventually a single
1700
+ // continous key range will remain on the physical shard. Data consistency is guaranteed.
1701
+ //
1702
+ // For team based shard placement, we expect multiple data shards to be located on the same physical shard.
1703
+ TraceEvent (" RangeSplit" ).detail (" OriginalRange" , shardRange).detail (" RemovedRange" , range);
1661
1704
}
1662
1705
}
1663
1706
@@ -1986,28 +2029,37 @@ class ShardManager {
1986
2029
}
1987
2030
1988
2031
TraceEvent (SevVerbose, " ShardedRocksValidateShardManager" , this ->logId );
2032
+ int totalDataShards = 0 ;
2033
+ int expectedDataShards = 0 ;
1989
2034
for (auto s = dataShardMap.ranges ().begin (); s != dataShardMap.ranges ().end (); ++s) {
1990
2035
TraceEvent e (SevVerbose, " ShardedRocksValidateDataShardMap" , this ->logId );
1991
2036
e.detail (" Range" , s->range ());
1992
2037
const DataShard* shard = s->value ();
1993
2038
e.detail (" ShardAddress" , reinterpret_cast <std::uintptr_t >(shard));
1994
- if (shard != nullptr ) {
1995
- e.detail (" PhysicalShard" , shard->physicalShard ->id );
1996
- } else {
1997
- e.detail (" Shard" , " Empty" );
2039
+ if (shard == nullptr ) {
2040
+ e.detail (" RangeUnassigned" , " True" );
2041
+ continue ;
1998
2042
}
1999
- if (shard != nullptr ) {
2000
- if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2001
- TraceEvent (SevWarnAlways, " ShardRangeMismatch" ).detail (" Range" , s->range ());
2002
- }
2003
-
2004
- ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2005
- ASSERT (shard->physicalShard != nullptr );
2006
- auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2007
- ASSERT (it != shard->physicalShard ->dataShards .end ());
2008
- ASSERT (it->second .get () == shard);
2043
+ totalDataShards++;
2044
+ if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2045
+ TraceEvent (SevWarnAlways, " ShardRangeMismatch" )
2046
+ .detail (" Range" , s->range ())
2047
+ .detail (" DataShardRange" , shard->range )
2048
+ .detail (" PhysicalShardId" , shard->physicalShard ->id );
2009
2049
}
2050
+
2051
+ ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2052
+ ASSERT (shard->physicalShard != nullptr );
2053
+ auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2054
+ ASSERT (it != shard->physicalShard ->dataShards .end ());
2055
+ ASSERT (it->second .get () == shard);
2056
+ }
2057
+
2058
+ for (auto [shardId, physicalShard] : physicalShards) {
2059
+ ASSERT (physicalShard);
2060
+ expectedDataShards += physicalShard->dataShards .size ();
2010
2061
}
2062
+ ASSERT_EQ (expectedDataShards, totalDataShards);
2011
2063
}
2012
2064
2013
2065
private:
@@ -4403,6 +4455,81 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
4403
4455
return Void ();
4404
4456
}
4405
4457
4458
+ TEST_CASE (" noSim/ShardedRocksDBRangeOps/RemoveSplitRange" ) {
4459
+ state std::string rocksDBTestDir = " sharded-rocksdb-kvs-test-db" ;
4460
+ platform::eraseDirectoryRecursive (rocksDBTestDir);
4461
+
4462
+ state ShardedRocksDBKeyValueStore* rocksdbStore =
4463
+ new ShardedRocksDBKeyValueStore (rocksDBTestDir, deterministicRandom ()->randomUniqueID ());
4464
+ state IKeyValueStore* kvStore = rocksdbStore;
4465
+ wait (kvStore->init ());
4466
+
4467
+ // Add two ranges to the same shard.
4468
+ {
4469
+ std::vector<Future<Void>> addRangeFutures;
4470
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" a" _sr, " d" _sr), " shard-1" ));
4471
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" g" _sr, " n" _sr), " shard-1" ));
4472
+
4473
+ wait (waitForAll (addRangeFutures));
4474
+ }
4475
+
4476
+ state std::set<std::string> originalKeys = { " a" , " b" , " c" , " g" , " h" , " m" };
4477
+ state std::set<std::string> currentKeys = originalKeys;
4478
+ for (auto key : originalKeys) {
4479
+ kvStore->set (KeyValueRef (key, key));
4480
+ }
4481
+ wait (kvStore->commit ());
4482
+
4483
+ state std::string key;
4484
+ for (key : currentKeys) {
4485
+ Optional<Value> val = wait (kvStore->readValue (key));
4486
+ ASSERT (val.present ());
4487
+ ASSERT (val.get ().toString () == key);
4488
+ }
4489
+
4490
+ // Remove single range.
4491
+ std::vector<std::string> shardsToCleanUp;
4492
+ auto shardIds = kvStore->removeRange (KeyRangeRef (" b" _sr, " c" _sr));
4493
+ // Remove range didn't create empty shard.
4494
+ ASSERT_EQ (shardIds.size (), 0 );
4495
+
4496
+ currentKeys.erase (" b" );
4497
+ for (key : originalKeys) {
4498
+ Optional<Value> val = wait (kvStore->readValue (key));
4499
+ if (currentKeys.contains (key)) {
4500
+ ASSERT (val.present ());
4501
+ ASSERT (val.get ().toString () == key);
4502
+ } else {
4503
+ ASSERT (!val.present ());
4504
+ }
4505
+ }
4506
+
4507
+ // Remove range spanning on multple sub range.
4508
+ auto shardIds = kvStore->removeRange (KeyRangeRef (" c" _sr, " k" _sr));
4509
+ ASSERT (shardIds.empty ());
4510
+
4511
+ currentKeys.erase (" c" );
4512
+ currentKeys.erase (" g" );
4513
+ currentKeys.erase (" h" );
4514
+ for (key : originalKeys) {
4515
+ Optional<Value> val = wait (kvStore->readValue (key));
4516
+ if (currentKeys.contains (key)) {
4517
+ ASSERT (val.present ());
4518
+ ASSERT (val.get ().toString () == key);
4519
+ } else {
4520
+ ASSERT (!val.present ());
4521
+ }
4522
+ }
4523
+
4524
+ {
4525
+ Future<Void> closed = kvStore->onClosed ();
4526
+ kvStore->dispose ();
4527
+ wait (closed);
4528
+ }
4529
+ ASSERT (!directoryExists (rocksDBTestDir));
4530
+ return Void ();
4531
+ }
4532
+
4406
4533
TEST_CASE (" noSim/ShardedRocksDBCheckpoint/CheckpointBasic" ) {
4407
4534
state std::string rocksDBTestDir = " sharded-rocks-checkpoint-restore" ;
4408
4535
state std::map<Key, Value> kvs ({ { " a" _sr, " TestValueA" _sr },
@@ -4777,7 +4904,7 @@ ACTOR Future<Void> testWrites(IKeyValueStore* kvStore, int writeCount) {
4777
4904
state int i = 0 ;
4778
4905
4779
4906
while (i < writeCount) {
4780
- state int endCount = deterministicRandom ()->randomInt (i, i + 1000 );
4907
+ state int endCount = deterministicRandom ()->randomInt (i+ 1 , i + 1000 );
4781
4908
state std::string beginKey = format (" key-%6d" , i);
4782
4909
state std::string endKey = format (" key-%6d" , endCount);
4783
4910
wait (kvStore->addRange (KeyRangeRef (beginKey, endKey), deterministicRandom ()->randomUniqueID ().toString ()));
0 commit comments