Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class CloudReplica extends Replica implements GsonPostProcessable {
@SerializedName(value = "bes")
private ConcurrentHashMap<String, List<Long>> primaryClusterToBackends = null;
@SerializedName(value = "be")
private ConcurrentHashMap<String, Long> primaryClusterToBackend = new ConcurrentHashMap<>();
private volatile ConcurrentHashMap<String, Long> primaryClusterToBackend;
@SerializedName(value = "dbId")
private long dbId = -1;
@SerializedName(value = "tableId")
Expand All @@ -73,11 +73,22 @@ public class CloudReplica extends Replica implements GsonPostProcessable {

private static final Random rand = new Random();

// Intern pool for cluster ID strings to avoid millions of duplicate String instances.
// Typically only a handful of distinct cluster IDs exist in the system.
private static final ConcurrentHashMap<String, String> CLUSTER_ID_POOL = new ConcurrentHashMap<>();

private static String internClusterId(String clusterId) {
if (clusterId == null) {
return null;
}
String existing = CLUSTER_ID_POOL.putIfAbsent(clusterId, clusterId);
return existing != null ? existing : clusterId;
}
Comment on lines +76 to +86
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The static CLUSTER_ID_POOL is unbounded and will retain all distinct cluster IDs for the lifetime of the JVM. If cluster IDs can be unbounded (e.g., come from external inputs or can churn over time), this becomes a memory leak and can negate the intended savings. Consider using a bounded cache (e.g., max size + eviction), or weak-value/weak-key interning (if available in the codebase) so unused IDs can be reclaimed; at minimum, document/enforce that cluster IDs are from a small, fixed set.

Copilot uses AI. Check for mistakes.

private Map<String, List<Long>> memClusterToBackends = null;

// clusterId, secondaryBe, changeTimestamp
private Map<String, Pair<Long, Long>> secondaryClusterToBackends
= new ConcurrentHashMap<String, Pair<Long, Long>>();
private volatile Map<String, Pair<Long, Long>> secondaryClusterToBackends;
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

secondaryClusterToBackends is typed as Map, so after Gson deserialization it may be a non-concurrent implementation (e.g., LinkedTreeMap). In that case it will be non-null and getOrCreateSecondaryMap() will not replace it, and subsequent concurrent reads/writes can become unsafe. A concrete fix is to (1) declare the field as ConcurrentHashMap<String, Pair<Long, Long>> (and have getOrCreateSecondaryMap() return ConcurrentHashMap), and/or (2) in gsonPostProcess() normalize any deserialized map into a ConcurrentHashMap (similar to what’s done for primaryClusterToBackend).

Suggested change
private volatile Map<String, Pair<Long, Long>> secondaryClusterToBackends;
private volatile ConcurrentHashMap<String, Pair<Long, Long>> secondaryClusterToBackends;

Copilot uses AI. Check for mistakes.

public CloudReplica() {
}
Expand All @@ -99,6 +110,34 @@ public CloudReplica(long replicaId, Long backendId, ReplicaState state, long ver
this.idx = idx;
}

private ConcurrentHashMap<String, Long> getOrCreatePrimaryMap() {
ConcurrentHashMap<String, Long> map = primaryClusterToBackend;
if (map == null) {
synchronized (this) {
map = primaryClusterToBackend;
if (map == null) {
map = new ConcurrentHashMap<>(2);
primaryClusterToBackend = map;
}
}
}
return map;
}

private Map<String, Pair<Long, Long>> getOrCreateSecondaryMap() {
Map<String, Pair<Long, Long>> map = secondaryClusterToBackends;
if (map == null) {
synchronized (this) {
map = secondaryClusterToBackends;
if (map == null) {
map = new ConcurrentHashMap<>(2);
secondaryClusterToBackends = map;
}
}
}
return map;
Comment on lines +127 to +138
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

secondaryClusterToBackends is typed as Map, so after Gson deserialization it may be a non-concurrent implementation (e.g., LinkedTreeMap). In that case it will be non-null and getOrCreateSecondaryMap() will not replace it, and subsequent concurrent reads/writes can become unsafe. A concrete fix is to (1) declare the field as ConcurrentHashMap<String, Pair<Long, Long>> (and have getOrCreateSecondaryMap() return ConcurrentHashMap), and/or (2) in gsonPostProcess() normalize any deserialized map into a ConcurrentHashMap (similar to what’s done for primaryClusterToBackend).

Suggested change
private Map<String, Pair<Long, Long>> getOrCreateSecondaryMap() {
Map<String, Pair<Long, Long>> map = secondaryClusterToBackends;
if (map == null) {
synchronized (this) {
map = secondaryClusterToBackends;
if (map == null) {
map = new ConcurrentHashMap<>(2);
secondaryClusterToBackends = map;
}
}
}
return map;
private ConcurrentHashMap<String, Pair<Long, Long>> getOrCreateSecondaryMap() {
Map<String, Pair<Long, Long>> map = secondaryClusterToBackends;
if (map instanceof ConcurrentHashMap) {
return (ConcurrentHashMap<String, Pair<Long, Long>>) map;
}
synchronized (this) {
map = secondaryClusterToBackends;
if (map instanceof ConcurrentHashMap) {
return (ConcurrentHashMap<String, Pair<Long, Long>>) map;
}
ConcurrentHashMap<String, Pair<Long, Long>> concurrentMap = new ConcurrentHashMap<>(2);
if (map != null) {
concurrentMap.putAll(map);
}
secondaryClusterToBackends = concurrentMap;
return concurrentMap;
}

Copilot uses AI. Check for mistakes.
}

private boolean isColocated() {
return Env.getCurrentColocateIndex().isColocateTableNoLock(tableId);
}
Expand Down Expand Up @@ -206,7 +245,8 @@ public long getClusterPrimaryBackendId(String clusterId) {
}
}

return primaryClusterToBackend.getOrDefault(clusterId, -1L);
ConcurrentHashMap<String, Long> map = primaryClusterToBackend;
return map != null ? map.getOrDefault(clusterId, -1L) : -1L;
}

private String getCurrentClusterId() throws ComputeGroupException {
Expand Down Expand Up @@ -316,8 +356,9 @@ private long getBackendIdImpl(String clusterId) throws ComputeGroupException {
backendId = memClusterToBackends.get(clusterId).get(indexRand);
}

if (!replicaEnough && !allowColdRead && primaryClusterToBackend.containsKey(clusterId)) {
backendId = primaryClusterToBackend.get(clusterId);
ConcurrentHashMap<String, Long> priMap = primaryClusterToBackend;
if (!replicaEnough && !allowColdRead && priMap != null && priMap.containsKey(clusterId)) {
backendId = priMap.get(clusterId);
Comment on lines +360 to +361
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does two hash lookups (containsKey then get). Since ConcurrentHashMap disallows null values, you can do a single get and check for null (or use getOrDefault) to reduce overhead on a hot path.

Suggested change
if (!replicaEnough && !allowColdRead && priMap != null && priMap.containsKey(clusterId)) {
backendId = priMap.get(clusterId);
if (!replicaEnough && !allowColdRead && priMap != null) {
Long primaryBackendId = priMap.get(clusterId);
if (primaryBackendId != null) {
backendId = primaryBackendId;
}

Copilot uses AI. Check for mistakes.
}

if (backendId > 0) {
Expand Down Expand Up @@ -393,7 +434,11 @@ public Backend getPrimaryBackend(String clusterId, boolean setIfAbsent) {
}

public Backend getSecondaryBackend(String clusterId) {
Pair<Long, Long> secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId);
Map<String, Pair<Long, Long>> secMap = secondaryClusterToBackends;
if (secMap == null) {
return null;
}
Pair<Long, Long> secondBeAndChangeTimestamp = secMap.get(clusterId);
if (secondBeAndChangeTimestamp == null) {
return null;
}
Expand Down Expand Up @@ -575,8 +620,11 @@ public long getIdx() {
}

public void updateClusterToPrimaryBe(String cluster, long beId) {
primaryClusterToBackend.put(cluster, beId);
secondaryClusterToBackends.remove(cluster);
getOrCreatePrimaryMap().put(internClusterId(cluster), beId);
Map<String, Pair<Long, Long>> secMap = secondaryClusterToBackends;
if (secMap != null) {
secMap.remove(cluster);
}
}

/**
Expand All @@ -590,19 +638,29 @@ public void updateClusterToSecondaryBe(String cluster, long beId) {
LOG.debug("add to secondary clusterId {}, beId {}, changeTimestamp {}, replica info {}",
cluster, beId, changeTimestamp, this);
}
secondaryClusterToBackends.put(cluster, Pair.of(beId, changeTimestamp));
getOrCreateSecondaryMap().put(internClusterId(cluster), Pair.of(beId, changeTimestamp));
}

public void clearClusterToBe(String cluster) {
primaryClusterToBackend.remove(cluster);
secondaryClusterToBackends.remove(cluster);
ConcurrentHashMap<String, Long> priMap = primaryClusterToBackend;
if (priMap != null) {
priMap.remove(cluster);
}
Map<String, Pair<Long, Long>> secMap = secondaryClusterToBackends;
if (secMap != null) {
secMap.remove(cluster);
}
}

// ATTN: This func is only used by redundant tablet report clean in bes.
// Only the master node will do the diff logic,
// so just only need to clean up secondaryClusterToBackends on the master node.
public void checkAndClearSecondaryClusterToBe(String clusterId, long expireTimestamp) {
Pair<Long, Long> secondBeAndChangeTimestamp = secondaryClusterToBackends.get(clusterId);
Map<String, Pair<Long, Long>> secMap = secondaryClusterToBackends;
if (secMap == null) {
return;
}
Pair<Long, Long> secondBeAndChangeTimestamp = secMap.get(clusterId);
if (secondBeAndChangeTimestamp == null) {
return;
}
Expand All @@ -612,19 +670,22 @@ public void checkAndClearSecondaryClusterToBe(String clusterId, long expireTimes
if (changeTimestamp < expireTimestamp) {
LOG.debug("remove clusterId {} secondary beId {} changeTimestamp {} expireTimestamp {} replica info {}",
clusterId, beId, changeTimestamp, expireTimestamp, this);
secondaryClusterToBackends.remove(clusterId);
secMap.remove(clusterId);
return;
}
}

public List<Backend> getAllPrimaryBes() {
List<Backend> result = new ArrayList<Backend>();
primaryClusterToBackend.forEach((clusterId, beId) -> {
if (beId != -1) {
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
result.add(backend);
}
});
ConcurrentHashMap<String, Long> map = primaryClusterToBackend;
if (map != null) {
map.forEach((clusterId, beId) -> {
if (beId != -1) {
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
result.add(backend);
}
});
}
return result;
}

Expand Down Expand Up @@ -655,14 +716,24 @@ public void gsonPostProcess() throws IOException {
this.getId(), this.primaryClusterToBackends, this.primaryClusterToBackend);
}
if (primaryClusterToBackends != null) {
ConcurrentHashMap<String, Long> map = getOrCreatePrimaryMap();
for (Map.Entry<String, List<Long>> entry : primaryClusterToBackends.entrySet()) {
String clusterId = entry.getKey();
List<Long> beIds = entry.getValue();
if (beIds != null && !beIds.isEmpty()) {
primaryClusterToBackend.put(clusterId, beIds.get(0));
map.put(internClusterId(clusterId), beIds.get(0));
}
}
this.primaryClusterToBackends = null;
}
Comment on lines 718 to 728
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This eagerly allocates primaryClusterToBackend whenever primaryClusterToBackends is non-null, even if all entries have empty/null backend lists (no effective data to migrate). To preserve the memory-saving goal, consider deferring getOrCreatePrimaryMap() until the first time you actually encounter a non-empty beIds (i.e., allocate only when you’re about to put).

Copilot uses AI. Check for mistakes.
// Intern cluster ID keys in deserialized primary map to share String instances
// across millions of CloudReplica objects
if (primaryClusterToBackend != null) {
ConcurrentHashMap<String, Long> interned = new ConcurrentHashMap<>(primaryClusterToBackend.size());
for (Map.Entry<String, Long> entry : primaryClusterToBackend.entrySet()) {
interned.put(internClusterId(entry.getKey()), entry.getValue());
}
primaryClusterToBackend = interned;
}
}
}
Loading