@@ -1546,7 +1546,7 @@ class ShardManager {
1546
1546
for (auto it = ranges.begin (); it != ranges.end (); ++it) {
1547
1547
if (it.value ()) {
1548
1548
if (it.value ()->physicalShard ->id == id) {
1549
- TraceEvent (SevError , " ShardedRocksDBAddRange" )
1549
+ TraceEvent (SevWarn , " ShardedRocksDBAddRange" )
1550
1550
.detail (" ErrorType" , " RangeAlreadyExist" )
1551
1551
.detail (" IntersectingRange" , it->range ())
1552
1552
.detail (" DataShardRange" , it->value ()->range )
@@ -1564,15 +1564,21 @@ class ShardManager {
1564
1564
}
1565
1565
}
1566
1566
1567
- auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1568
- auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1569
- std::shared_ptr<PhysicalShard>& shard = it->second ;
1570
-
1571
- activePhysicalShardIds.emplace (id);
1567
+ auto it = physicalShards.find (id);
1568
+ std::shared_ptr<PhysicalShard> physicalShard = nullptr ;
1569
+ if (it != physicalShards.end ()) {
1570
+ physicalShard = it->second ;
1571
+ ASSERT (SERVER_KNOBS->SHARDED_ROCKSDB_ALLOW_MULTIPLE_RANGES );
1572
+ } else {
1573
+ auto currentCfOptions = active ? rState->getCFOptions () : rState->getCFOptionsForInactiveShard ();
1574
+ auto [it, inserted] = physicalShards.emplace (id, std::make_shared<PhysicalShard>(db, id, currentCfOptions));
1575
+ physicalShard = it->second ;
1576
+ activePhysicalShardIds.emplace (id);
1577
+ }
1572
1578
1573
- auto dataShard = std::make_unique<DataShard>(range, shard .get ());
1579
+ auto dataShard = std::make_unique<DataShard>(range, physicalShard .get ());
1574
1580
dataShardMap.insert (range, dataShard.get ());
1575
- shard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1581
+ physicalShard ->dataShards [range.begin .toString ()] = std::move (dataShard);
1576
1582
1577
1583
validate ();
1578
1584
@@ -1581,7 +1587,7 @@ class ShardManager {
1581
1587
.detail (" ShardId" , id)
1582
1588
.detail (" Active" , active);
1583
1589
1584
- return shard .get ();
1590
+ return physicalShard .get ();
1585
1591
}
1586
1592
1587
1593
std::vector<std::string> removeRange (KeyRange range) {
@@ -1636,6 +1642,7 @@ class ShardManager {
1636
1642
1637
1643
// Range modification could result in more than one segments. Remove the original segment key here.
1638
1644
existingShard->dataShards .erase (shardRange.begin .toString ());
1645
+ int count = 0 ;
1639
1646
if (shardRange.begin < range.begin ) {
1640
1647
auto dataShard =
1641
1648
std::make_unique<DataShard>(KeyRange (KeyRangeRef (shardRange.begin , range.begin )), existingShard);
@@ -1646,6 +1653,7 @@ class ShardManager {
1646
1653
1647
1654
existingShard->dataShards [shardRange.begin .toString ()] = std::move (dataShard);
1648
1655
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1656
+ count++;
1649
1657
}
1650
1658
1651
1659
if (shardRange.end > range.end ) {
@@ -1658,6 +1666,12 @@ class ShardManager {
1658
1666
1659
1667
existingShard->dataShards [range.end .toString ()] = std::move (dataShard);
1660
1668
logShardEvent (existingShard->id , shardRange, ShardOp::MODIFY_RANGE, SevInfo, msg);
1669
+ count++;
1670
+ }
1671
+
1672
+ if (count > 1 ) {
1673
+ TraceEvent (" RangeSplit" ).detail (" OriginalRange" , shardRange).detail (" RemovedRange" , range);
1674
+ ASSERT (SERVER_KNOBS->SHARDED_ROCKSDB_ALLOW_MULTIPLE_RANGES );
1661
1675
}
1662
1676
}
1663
1677
@@ -1986,28 +2000,37 @@ class ShardManager {
1986
2000
}
1987
2001
1988
2002
TraceEvent (SevVerbose, " ShardedRocksValidateShardManager" , this ->logId );
2003
+ int totalDataShards = 0 ;
2004
+ int expectedDataShards = 0 ;
1989
2005
for (auto s = dataShardMap.ranges ().begin (); s != dataShardMap.ranges ().end (); ++s) {
1990
2006
TraceEvent e (SevVerbose, " ShardedRocksValidateDataShardMap" , this ->logId );
1991
2007
e.detail (" Range" , s->range ());
1992
2008
const DataShard* shard = s->value ();
1993
2009
e.detail (" ShardAddress" , reinterpret_cast <std::uintptr_t >(shard));
1994
- if (shard != nullptr ) {
1995
- e.detail (" PhysicalShard" , shard->physicalShard ->id );
1996
- } else {
1997
- e.detail (" Shard" , " Empty" );
2010
+ if (shard == nullptr ) {
2011
+ e.detail (" RangeUnassigned" , " True" );
2012
+ continue ;
1998
2013
}
1999
- if (shard != nullptr ) {
2000
- if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2001
- TraceEvent (SevWarnAlways, " ShardRangeMismatch" ).detail (" Range" , s->range ());
2002
- }
2003
-
2004
- ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2005
- ASSERT (shard->physicalShard != nullptr );
2006
- auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2007
- ASSERT (it != shard->physicalShard ->dataShards .end ());
2008
- ASSERT (it->second .get () == shard);
2014
+ totalDataShards++;
2015
+ if (shard->range != static_cast <KeyRangeRef>(s->range ())) {
2016
+ TraceEvent (SevWarnAlways, " ShardRangeMismatch" )
2017
+ .detail (" Range" , s->range ())
2018
+ .detail (" DataShardRange" , shard->range )
2019
+ .detail (" PhysicalShardId" , shard->physicalShard ->id );
2009
2020
}
2021
+
2022
+ ASSERT (shard->range == static_cast <KeyRangeRef>(s->range ()));
2023
+ ASSERT (shard->physicalShard != nullptr );
2024
+ auto it = shard->physicalShard ->dataShards .find (shard->range .begin .toString ());
2025
+ ASSERT (it != shard->physicalShard ->dataShards .end ());
2026
+ ASSERT (it->second .get () == shard);
2027
+ }
2028
+
2029
+ for (auto [shardId, physicalShard] : physicalShards) {
2030
+ ASSERT (physicalShard);
2031
+ expectedDataShards += physicalShard->dataShards .size ();
2010
2032
}
2033
+ ASSERT_EQ (expectedDataShards, totalDataShards);
2011
2034
}
2012
2035
2013
2036
private:
@@ -4403,6 +4426,81 @@ TEST_CASE("noSim/ShardedRocksDB/Metadata") {
4403
4426
return Void ();
4404
4427
}
4405
4428
4429
+ TEST_CASE (" noSim/ShardedRocksDBRangeOps/RemoveSplitRange" ) {
4430
+ state std::string rocksDBTestDir = " sharded-rocksdb-kvs-test-db" ;
4431
+ platform::eraseDirectoryRecursive (rocksDBTestDir);
4432
+
4433
+ state ShardedRocksDBKeyValueStore* rocksdbStore =
4434
+ new ShardedRocksDBKeyValueStore (rocksDBTestDir, deterministicRandom ()->randomUniqueID ());
4435
+ state IKeyValueStore* kvStore = rocksdbStore;
4436
+ wait (kvStore->init ());
4437
+
4438
+ // Add two ranges to the same shard.
4439
+ {
4440
+ std::vector<Future<Void>> addRangeFutures;
4441
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" a" _sr, " d" _sr), " shard-1" ));
4442
+ addRangeFutures.push_back (kvStore->addRange (KeyRangeRef (" g" _sr, " n" _sr), " shard-1" ));
4443
+
4444
+ wait (waitForAll (addRangeFutures));
4445
+ }
4446
+
4447
+ state std::set<std::string> originalKeys = { " a" , " b" , " c" , " g" , " h" , " m" };
4448
+ state std::set<std::string> currentKeys = originalKeys;
4449
+ for (auto key : originalKeys) {
4450
+ kvStore->set (KeyValueRef (key, key));
4451
+ }
4452
+ wait (kvStore->commit ());
4453
+
4454
+ state std::string key;
4455
+ for (key : currentKeys) {
4456
+ Optional<Value> val = wait (kvStore->readValue (key));
4457
+ ASSERT (val.present ());
4458
+ ASSERT (val.get ().toString () == key);
4459
+ }
4460
+
4461
+ // Remove single range.
4462
+ std::vector<std::string> shardsToCleanUp;
4463
+ auto shardIds = kvStore->removeRange (KeyRangeRef (" b" _sr, " c" _sr));
4464
+ // Remove range didn't create empty shard.
4465
+ ASSERT_EQ (shardIds.size (), 0 );
4466
+
4467
+ currentKeys.erase (" b" );
4468
+ for (key : originalKeys) {
4469
+ Optional<Value> val = wait (kvStore->readValue (key));
4470
+ if (currentKeys.contains (key)) {
4471
+ ASSERT (val.present ());
4472
+ ASSERT (val.get ().toString () == key);
4473
+ } else {
4474
+ ASSERT (!val.present ());
4475
+ }
4476
+ }
4477
+
4478
+ // Remove range spanning on multple sub range.
4479
+ auto shardIds = kvStore->removeRange (KeyRangeRef (" c" _sr, " k" _sr));
4480
+ ASSERT (shardIds.empty ());
4481
+
4482
+ currentKeys.erase (" c" );
4483
+ currentKeys.erase (" g" );
4484
+ currentKeys.erase (" h" );
4485
+ for (key : originalKeys) {
4486
+ Optional<Value> val = wait (kvStore->readValue (key));
4487
+ if (currentKeys.contains (key)) {
4488
+ ASSERT (val.present ());
4489
+ ASSERT (val.get ().toString () == key);
4490
+ } else {
4491
+ ASSERT (!val.present ());
4492
+ }
4493
+ }
4494
+
4495
+ {
4496
+ Future<Void> closed = kvStore->onClosed ();
4497
+ kvStore->dispose ();
4498
+ wait (closed);
4499
+ }
4500
+ ASSERT (!directoryExists (rocksDBTestDir));
4501
+ return Void ();
4502
+ }
4503
+
4406
4504
TEST_CASE (" noSim/ShardedRocksDBCheckpoint/CheckpointBasic" ) {
4407
4505
state std::string rocksDBTestDir = " sharded-rocks-checkpoint-restore" ;
4408
4506
state std::map<Key, Value> kvs ({ { " a" _sr, " TestValueA" _sr },
0 commit comments