@@ -62,6 +62,25 @@ bool shouldCreateCheckpoint(const UID& dataMoveId) {
6262	return  (type == DataMoveType::PHYSICAL || type == DataMoveType::PHYSICAL_EXP);
6363}
6464
65+ std::unordered_map<std::string, std::string> generateTeamIds (
66+     std::unordered_map<std::string, std::vector<std::string>>& dcServerIds) {
67+ 	std::unordered_map<std::string, std::string> dcTeamIds;
68+ 	for  (auto & [dc, serverIds] : dcServerIds) {
69+ 		std::sort (serverIds.begin (), serverIds.end ());
70+ 		std::string teamId;
71+ 		for  (const  auto & serverId : serverIds) {
72+ 			if  (teamId.size () == 0 ) {
73+ 				teamId = serverId;
74+ 			} else  {
75+ 				teamId += " ,"   + serverId;
76+ 			}
77+ 		}
78+ 		//  Use the concatenated server ids as the team id to avoid conflicts.
79+ 		dcTeamIds[dc] = teamId;
80+ 	}
81+ 	return  dcTeamIds;
82+ }
83+ 
6584//  Unassigns keyrange `range` from server `ssId`, except ranges in `shards`.
6685//  Note: krmSetRangeCoalescing() doesn't work in this case since each shard is assigned an ID.
6786ACTOR Future<Void> unassignServerKeys (Transaction* tr, UID ssId, KeyRange range, std::vector<Shard> shards, UID logId) {
@@ -1712,7 +1731,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
17121731					serverListEntries.push_back (tr.get (serverListKeyFor (servers[s])));
17131732				}
17141733				std::vector<Optional<Value>> serverListValues = wait (getAll (serverListEntries));
1715- 
1734+ 				state std::unordered_map<std::string, std::vector<std::string>> dcServers; 
17161735				for  (int  s = 0 ; s < serverListValues.size (); s++) {
17171736					if  (!serverListValues[s].present ()) {
17181737						//  Attempt to move onto a server that isn't in serverList (removed or never added to the
@@ -1721,6 +1740,13 @@ ACTOR static Future<Void> startMoveShards(Database occ,
17211740						//  TODO(psm): Mark the data move as 'deleting'.
17221741						throw  move_to_removed_server ();
17231742					}
1743+ 					auto  si = decodeServerListValue (serverListValues[s].get ());
1744+ 					ASSERT (si.id () == servers[s]);
1745+ 					auto  it = dcServers.find (si.locality .describeDcId ());
1746+ 					if  (it == dcServers.end ()) {
1747+ 						dcServers[si.locality .describeDcId ()] = std::vector<std::string>();
1748+ 					}
1749+ 					dcServers[si.locality .describeDcId ()].push_back (si.id ().shortString ());
17241750				}
17251751
17261752				currentKeys = KeyRangeRef (begin, keys.end );
@@ -1733,6 +1759,15 @@ ACTOR static Future<Void> startMoveShards(Database occ,
17331759					state Key endKey = old.back ().key ;
17341760					currentKeys = KeyRangeRef (currentKeys.begin , endKey);
17351761
1762+ 					if  (ranges.front () != currentKeys) {
1763+ 						TraceEvent (" MoveShardsPartialRange"  )
1764+ 						    .detail (" ExpectedRange"  , ranges.front ())
1765+ 						    .detail (" ActualRange"  , currentKeys)
1766+ 						    .detail (" DataMoveId"  , dataMoveId)
1767+ 						    .detail (" RowLimit"  , SERVER_KNOBS->MOVE_SHARD_KRM_ROW_LIMIT )
1768+ 						    .detail (" ByteLimit"  , SERVER_KNOBS->MOVE_SHARD_KRM_BYTE_LIMIT );
1769+ 					}
1770+ 
17361771					//  Check that enough servers for each shard are in the correct state
17371772					state RangeResult UIDtoTagMap = wait (tr.getRange (serverTagKeys, CLIENT_KNOBS->TOO_MANY ));
17381773					ASSERT (!UIDtoTagMap.more  && UIDtoTagMap.size () < CLIENT_KNOBS->TOO_MANY );
@@ -1806,6 +1841,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
18061841									TraceEvent (
18071842									    SevWarn, " StartMoveShardsCancelConflictingDataMove"  , relocationIntervalId)
18081843									    .detail (" Range"  , rangeIntersectKeys)
1844+ 									    .detail (" CurrentDataMoveRange"  , ranges[0 ])
18091845									    .detail (" DataMoveID"  , dataMoveId.toString ())
18101846									    .detail (" ExistingDataMoveID"  , destId.toString ());
18111847									wait (cleanUpDataMove (occ, destId, lock, startMoveKeysLock, keys, ddEnabledState));
@@ -1868,6 +1904,7 @@ ACTOR static Future<Void> startMoveShards(Database occ,
18681904					dataMove.ranges .clear ();
18691905					dataMove.ranges .push_back (KeyRangeRef (keys.begin , currentKeys.end ));
18701906					dataMove.dest .insert (servers.begin (), servers.end ());
1907+ 					dataMove.dcTeamIds  = generateTeamIds (dcServers);
18711908				}
18721909
18731910				if  (currentKeys.end  == keys.end ) {
@@ -3348,6 +3385,8 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector<Storag
33483385	std::map<Optional<Value>, Tag> dcId_locality;
33493386	std::map<UID, Tag> server_tag;
33503387	int8_t  nextLocality = 0 ;
3388+ 	std::unordered_map<std::string, std::vector<std::string>> dcServerIds;
3389+ 
33513390	for  (auto & s : servers) {
33523391		if  (!dcId_locality.contains (s.locality .dcId ())) {
33533392			tr.set (arena, tagLocalityListKeyFor (s.locality .dcId ()), tagLocalityListValue (nextLocality));
@@ -3357,6 +3396,8 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector<Storag
33573396		Tag& t = dcId_locality[s.locality .dcId ()];
33583397		server_tag[s.id ()] = Tag (t.locality , t.id );
33593398		t.id ++;
3399+ 
3400+ 		dcServerIds[s.locality .describeDcId ()].push_back (s.id ().shortString ());
33603401	}
33613402	std::sort (servers.begin (), servers.end ());
33623403
@@ -3403,11 +3444,16 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector<Storag
34033444		                                  UnassignShard (false ));
34043445		ksValue = keyServersValue (serverSrcUID, /* dest=*/  std::vector<UID>(), shardId, UID ());
34053446		krmSetPreviouslyEmptyRange (tr, arena, keyServersPrefix, KeyRangeRef (KeyRef (), allKeys.end ), ksValue, Value ());
3406- 
34073447		for  (auto & s : servers) {
34083448			krmSetPreviouslyEmptyRange (
34093449			    tr, arena, serverKeysPrefixFor (s.id ()), allKeys, serverKeysValue (shardId), serverKeysFalse);
34103450		}
3451+ 
3452+ 		DataMoveMetaData metadata{ shardId, allKeys };
3453+ 		metadata.dcTeamIds  = generateTeamIds (dcServerIds);
3454+ 
3455+ 		//  Data move metadata will be clean up on DD restarts.
3456+ 		tr.set (arena, dataMoveKeyFor (shardId), dataMoveValue (metadata));
34113457	} else  {
34123458		krmSetPreviouslyEmptyRange (tr, arena, keyServersPrefix, KeyRangeRef (KeyRef (), allKeys.end ), ksValue, Value ());
34133459
0 commit comments