Skip to content

Commit 6a25b23

Browse files
author
fanshilun
committed
HDDS-11881. Simplify DatanodeAdminMonitorImpl Code Structure.
1 parent 51c6ed6 commit 6a25b23

File tree

8 files changed

+296
-120
lines changed

8 files changed

+296
-120
lines changed

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public String toString() {
286286
*
287287
* When considering inflight operations, it is assumed any operation will
288288
* fail. However, to consider the worst case and avoid data loss, we always
289-
* assume a delete will succeed and and add will fail. In this way, we will
289+
* assume a delete will succeed and add will fail. In this way, we will
290290
* avoid scheduling too many deletes which could result in dataloss.
291291
*
292292
* Decisions around over-replication are made only on healthy replicas,
@@ -320,7 +320,7 @@ public String toString() {
320320
* For under replicated containers we do consider inflight add and delete to
321321
* avoid scheduling more adds than needed. There is additional logic around
322322
* containers with maintenance replica to ensure minHealthyForMaintenance
323-
* replia are maintained.
323+
* replica are maintained.
324324
*
325325
* @return Delta of replicas needed. Negative indicates over replication and
326326
* containers should be removed. Positive indicates over replication
@@ -621,7 +621,7 @@ public int getExcessRedundancy(boolean includePendingDelete) {
621621
* considering inflight add and deletes.
622622
* @return zero if perfectly replicated, a negative value for over replication
623623
* and a positive value for under replication. The magnitude of the
624-
* return value indicates how many replias the container is over or
624+
* return value indicates how many replies the container is over or
625625
* under replicated by.
626626
*/
627627
private int redundancyDelta(boolean includePendingDelete,
@@ -650,7 +650,7 @@ boolean insufficientDueToOutOfService() {
650650

651651
/**
652652
* How many more replicas can be lost before the container is
653-
* unreadable, assuming any infligh deletes will complete. For containers
653+
* unreadable, assuming any inflight deletes will complete. For containers
654654
* which are under-replicated due to decommission
655655
* or maintenance only, the remaining redundancy will include those
656656
* decommissioning or maintenance replicas, as they are technically still

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public int processAndSendCommands(
145145

146146
if (targetDatanodes.size() < replicaCount.additionalReplicaNeeded()) {
147147
// The placement policy failed to find enough targets to satisfy fix
148-
// the under replication. There fore even though some commands were sent,
148+
// the under replication. Therefore even though some commands were sent,
149149
// we throw an exception to indicate that the container is still under
150150
// replicated and should be re-queued for another attempt later.
151151
LOG.debug("Placement policy failed to find enough targets to satisfy " +

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public void processAll(ReplicationQueue queue) {
9292
List<HealthResult> failedOnes = new LinkedList<>();
9393
// Getting the limit requires iterating over all nodes registered in
9494
// NodeManager and counting the healthy ones. This is somewhat expensive
95-
// so we get get the count once per iteration as it should not change too
95+
// so we get the count once per iteration as it should not change too
9696
// often.
9797
long inflightLimit = replicationManager.getReplicationInFlightLimit();
9898
while (true) {

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public interface DatanodeAdminMonitor extends Runnable {
3333

3434
void startMonitoring(DatanodeDetails dn);
3535
void stopMonitoring(DatanodeDetails dn);
36-
Set<DatanodeAdminMonitorImpl.TrackedNode> getTrackedNodes();
36+
Set<TrackedNode> getTrackedNodes();
3737
void setMetrics(NodeDecommissionMetrics metrics);
3838
Map<String, List<ContainerID>> getContainersPendingReplication(DatanodeDetails dn)
3939
throws NodeNotFoundException;

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java

Lines changed: 60 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
2626
import org.apache.hadoop.hdds.scm.container.ContainerID;
2727
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
28-
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
2928
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
3029
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaCount;
3130
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
@@ -39,8 +38,6 @@
3938
import org.slf4j.LoggerFactory;
4039

4140
import java.util.ArrayDeque;
42-
import java.util.ArrayList;
43-
import java.util.Collection;
4441
import java.util.Collections;
4542
import java.util.HashMap;
4643
import java.util.Iterator;
@@ -49,12 +46,14 @@
4946
import java.util.Queue;
5047
import java.util.Set;
5148
import java.util.concurrent.ConcurrentHashMap;
52-
import java.util.stream.Collectors;
49+
50+
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETED;
51+
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.DELETING;
5352

5453
/**
5554
* Monitor thread which watches for nodes to be decommissioned, recommissioned
5655
* or placed into maintenance. Newly added nodes are queued in pendingNodes
57-
* and recommissoned nodes are queued in cancelled nodes. On each monitor
56+
* and recommissioned nodes are queued in cancelled nodes. On each monitor
5857
* 'tick', the cancelled nodes are processed and removed from the monitor.
5958
* Then any pending nodes are added to the trackedNodes set, where they stay
6059
* until decommission or maintenance has ended.
@@ -89,50 +88,6 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor {
8988
private long unClosedContainers = 0;
9089
private long underReplicatedContainers = 0;
9190

92-
/**
93-
* Inner class for snapshot of Datanode ContainerState in
94-
* Decommissioning and Maintenance mode workflow.
95-
*/
96-
public static final class TrackedNode {
97-
98-
private DatanodeDetails datanodeDetails;
99-
private long startTime = 0L;
100-
private Map<String, List<ContainerID>> containersReplicatedOnNode = new ConcurrentHashMap<>();
101-
102-
public TrackedNode(DatanodeDetails datanodeDetails, long startTime) {
103-
this.datanodeDetails = datanodeDetails;
104-
this.startTime = startTime;
105-
}
106-
107-
@Override
108-
public int hashCode() {
109-
return datanodeDetails.hashCode();
110-
}
111-
112-
@Override
113-
public boolean equals(Object obj) {
114-
return obj instanceof TrackedNode &&
115-
datanodeDetails.equals(((TrackedNode) obj).datanodeDetails);
116-
}
117-
118-
public DatanodeDetails getDatanodeDetails() {
119-
return datanodeDetails;
120-
}
121-
122-
public long getStartTime() {
123-
return startTime;
124-
}
125-
126-
public Map<String, List<ContainerID>> getContainersReplicatedOnNode() {
127-
return containersReplicatedOnNode;
128-
}
129-
130-
public void setContainersReplicatedOnNode(List<ContainerID> underReplicated, List<ContainerID> unClosed) {
131-
this.containersReplicatedOnNode.put("UnderReplicated", Collections.unmodifiableList(underReplicated));
132-
this.containersReplicatedOnNode.put("UnClosed", Collections.unmodifiableList(unClosed));
133-
}
134-
}
135-
13691
private Map<String, ContainerStateInWorkflow> containerStateByHost;
13792

13893
private static final Logger LOG =
@@ -407,91 +362,96 @@ private boolean checkPipelinesClosedOnNode(TrackedNode dn)
407362

408363
private boolean checkContainersReplicatedOnNode(TrackedNode dn)
409364
throws NodeNotFoundException {
410-
int sufficientlyReplicated = 0;
411-
int deleting = 0;
412-
int underReplicated = 0;
413-
int unclosed = 0;
414-
List<ContainerID> underReplicatedIDs = new ArrayList<>();
415-
List<ContainerID> unClosedIDs = new ArrayList<>();
416-
Set<ContainerID> containers =
417-
nodeManager.getContainers(dn.getDatanodeDetails());
418-
for (ContainerID cid : containers) {
365+
TrackedNodeContainers tnc =
366+
new TrackedNodeContainers(containerDetailsLoggingLimit);
367+
DatanodeDetails details = dn.getDatanodeDetails();
368+
Set<ContainerID> containers = nodeManager.getContainers(details);
369+
370+
handContainersReplicatedOnNode(details, containers, tnc);
371+
372+
LOG.info("Dn {} has {} sufficientlyReplicated, {} deleting, " +
373+
"{} underReplicated and {} unclosed containers", dn,
374+
tnc.getSufficientlyReplicated(),
375+
tnc.getDeleting(),
376+
tnc.getUnderReplicated(),
377+
tnc.getUnclosed());
378+
379+
containerStateByHost.put(details.getHostName(),
380+
new ContainerStateInWorkflow(details.getHostName(),
381+
tnc.getSufficientlyReplicated(),
382+
tnc.getUnderReplicated(),
383+
tnc.getUnclosed(),
384+
0L, dn.getStartTime()));
385+
386+
sufficientlyReplicatedContainers += tnc.getSufficientlyReplicated();
387+
underReplicatedContainers += tnc.getUnderReplicated();
388+
unClosedContainers += tnc.getUnclosed();
389+
dn.setContainersReplicatedOnNode(tnc);
390+
return tnc.getUnderReplicated() == 0 && tnc.getUnclosed() == 0;
391+
}
392+
393+
private void handContainersReplicatedOnNode(DatanodeDetails details,
394+
Set<ContainerID> containers, TrackedNodeContainers tnc) {
395+
396+
for (ContainerID cid : containers) {
419397
try {
420398
ContainerReplicaCount replicaSet =
421399
replicationManager.getContainerReplicaCount(cid);
422400

423401
// If a container is deleted or deleting, and we have a replica on this
424402
// datanode, just ignore it. It should not block decommission.
425-
HddsProtos.LifeCycleState containerState
426-
= replicaSet.getContainer().getState();
427-
if (containerState == HddsProtos.LifeCycleState.DELETED
428-
|| containerState == HddsProtos.LifeCycleState.DELETING) {
429-
deleting++;
403+
HddsProtos.LifeCycleState containerState = replicaSet.getContainer().getState();
404+
if (containerState == DELETED || containerState == DELETING) {
405+
tnc.incrDeleting();
430406
continue;
431407
}
432408

409+
// If the container is unhealthy, we need to add it to the unClosed list.
433410
boolean isHealthy = replicaSet.isHealthyEnoughForOffline();
434411
if (!isHealthy) {
435-
unClosedIDs.add(cid);
436-
if (unclosed < containerDetailsLoggingLimit
437-
|| LOG.isDebugEnabled()) {
438-
LOG.info("Unclosed Container {} {}; {}", cid, replicaSet, replicaDetails(replicaSet.getReplicas()));
412+
tnc.addUnClosedIDs(cid);
413+
if (tnc.isLogUnClosedContainers() || LOG.isDebugEnabled()) {
414+
LOG.info("Unclosed Container {}, ReplicaSet {}, ReplicaDetails {}.",
415+
cid, replicaSet, tnc.replicaDetails(replicaSet));
439416
}
440-
unclosed++;
441417
continue;
442418
}
443419

444420
// If we get here, the container is closed or quasi-closed and all the replicas match that
445421
// state, except for any which are unhealthy. As the container is closed, we can check
446422
// if it is sufficiently replicated using replicationManager, but this only works if the
447423
// legacy RM is not enabled.
448-
boolean legacyEnabled = conf.getBoolean("hdds.scm.replication.enable" +
449-
".legacy", false);
424+
boolean legacyEnabled = conf.getBoolean("hdds.scm.replication.enable.legacy", false);
450425
boolean replicatedOK;
451426
if (legacyEnabled) {
452-
replicatedOK = replicaSet.isSufficientlyReplicatedForOffline(dn.getDatanodeDetails(), nodeManager);
427+
replicatedOK = replicaSet.isSufficientlyReplicatedForOffline(details, nodeManager);
453428
} else {
454429
ReplicationManagerReport report = new ReplicationManagerReport();
455430
replicationManager.checkContainerStatus(replicaSet.getContainer(), report);
456431
replicatedOK = report.getStat(ReplicationManagerReport.HealthState.UNDER_REPLICATED) == 0;
457432
}
433+
458434
if (replicatedOK) {
459-
sufficientlyReplicated++;
435+
tnc.incrSufficientlyReplicated();
460436
} else {
461-
underReplicatedIDs.add(cid);
462-
if (underReplicated < containerDetailsLoggingLimit || LOG.isDebugEnabled()) {
463-
LOG.info("Under Replicated Container {} {}; {}", cid, replicaSet, replicaDetails(replicaSet.getReplicas()));
437+
tnc.addUnderReplicated(cid);
438+
if (tnc.isLogUnderReplicatedContainers() || LOG.isDebugEnabled()) {
439+
LOG.info("Under Replicated Container {}, ReplicaSet {}, ReplicaDetails {}.",
440+
cid, replicaSet, tnc.replicaDetails(replicaSet));
464441
}
465-
underReplicated++;
466442
}
467443
} catch (ContainerNotFoundException e) {
468-
LOG.warn("ContainerID {} present in node list for {} but not found in containerManager", cid,
469-
dn.getDatanodeDetails());
444+
LOG.warn("ContainerID {} present in node list for {} but not found in containerManager.",
445+
cid, details);
470446
}
471447
}
472-
LOG.info("{} has {} sufficientlyReplicated, {} deleting, {} " +
473-
"underReplicated and {} unclosed containers",
474-
dn, sufficientlyReplicated, deleting, underReplicated, unclosed);
475-
containerStateByHost.put(dn.getDatanodeDetails().getHostName(),
476-
new ContainerStateInWorkflow(dn.getDatanodeDetails().getHostName(),
477-
sufficientlyReplicated,
478-
underReplicated,
479-
unclosed,
480-
0L, dn.getStartTime()));
481-
sufficientlyReplicatedContainers += sufficientlyReplicated;
482-
underReplicatedContainers += underReplicated;
483-
unClosedContainers += unclosed;
484-
if (LOG.isDebugEnabled() && underReplicatedIDs.size() < 10000 &&
485-
unClosedIDs.size() < 10000) {
486-
LOG.debug("{} has {} underReplicated [{}] and {} unclosed [{}] " +
487-
"containers", dn, underReplicated,
488-
underReplicatedIDs.stream().map(
489-
Object::toString).collect(Collectors.joining(", ")),
490-
unclosed, unClosedIDs.stream().map(
491-
Object::toString).collect(Collectors.joining(", ")));
448+
449+
if (LOG.isDebugEnabled() ||
450+
(tnc.isLogUnderReplicatedContainers() && tnc.isLogUnClosedContainers())) {
451+
LOG.debug("DN {} has {} underReplicated [{}] and {} unclosed [{}] containers", details.getHostName(),
452+
tnc.getUnderReplicated(), tnc.getFormatUnderReplicatedIDs(),
453+
tnc.getUnclosed(), tnc.getFormatUnClosedIDs());
492454
}
493-
dn.setContainersReplicatedOnNode(underReplicatedIDs, unClosedIDs);
494-
return underReplicated == 0 && unclosed == 0;
495455
}
496456

497457
@Override
@@ -506,16 +466,6 @@ public Map<String, List<ContainerID>> getContainersPendingReplication(DatanodeDe
506466
return new HashMap<>();
507467
}
508468

509-
private String replicaDetails(Collection<ContainerReplica> replicas) {
510-
StringBuilder sb = new StringBuilder();
511-
sb.append("Replicas{");
512-
sb.append(replicas.stream()
513-
.map(Object::toString)
514-
.collect(Collectors.joining(",")));
515-
sb.append("}");
516-
return sb.toString();
517-
}
518-
519469
private void completeDecommission(DatanodeDetails dn)
520470
throws NodeNotFoundException {
521471
setNodeOpState(dn, NodeOperationalState.DECOMMISSIONED);
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
package org.apache.hadoop.hdds.scm.node;
20+
21+
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
22+
import org.apache.hadoop.hdds.scm.container.ContainerID;
23+
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
29+
/**
30+
* This class is used to track DataNodes that are
31+
* being decommissioned or are in maintenance mode.
32+
*/
33+
public class TrackedNode {
34+
private DatanodeDetails datanodeDetails;
35+
private long startTime;
36+
private Map<String, List<ContainerID>> containersReplicatedOnNode = new ConcurrentHashMap<>();
37+
38+
public TrackedNode(DatanodeDetails datanodeDetails, long startTime) {
39+
this.datanodeDetails = datanodeDetails;
40+
this.startTime = startTime;
41+
}
42+
43+
@Override
44+
public int hashCode() {
45+
return datanodeDetails.hashCode();
46+
}
47+
48+
@Override
49+
public boolean equals(Object obj) {
50+
return obj instanceof TrackedNode &&
51+
datanodeDetails.equals(((TrackedNode) obj).getDatanodeDetails());
52+
}
53+
54+
public DatanodeDetails getDatanodeDetails() {
55+
return datanodeDetails;
56+
}
57+
58+
public long getStartTime() {
59+
return startTime;
60+
}
61+
62+
public Map<String, List<ContainerID>> getContainersReplicatedOnNode() {
63+
return containersReplicatedOnNode;
64+
}
65+
66+
public void setContainersReplicatedOnNode(TrackedNodeContainers containers) {
67+
List<ContainerID> underReplicated = containers.getUnderReplicatedIDs();
68+
List<ContainerID> unClosed = containers.getUnClosedIDs();
69+
this.containersReplicatedOnNode.put("UnderReplicated",
70+
Collections.unmodifiableList(underReplicated));
71+
this.containersReplicatedOnNode.put("UnClosed",
72+
Collections.unmodifiableList(unClosed));
73+
}
74+
}

0 commit comments

Comments
 (0)