38
38
import java .util .concurrent .TimeUnit ;
39
39
import java .util .concurrent .atomic .AtomicInteger ;
40
40
41
+ import javax .management .ObjectName ;
42
+
41
43
import org .apache .commons .lang3 .builder .ToStringBuilder ;
42
44
import org .apache .hadoop .metrics2 .lib .DefaultMetricsSystem ;
43
45
import org .apache .hadoop .metrics2 .source .JvmMetrics ;
66
68
import org .apache .hadoop .hdfs .server .protocol .DatanodeStorageReport ;
67
69
import org .apache .hadoop .hdfs .server .protocol .StorageReport ;
68
70
import org .apache .hadoop .io .IOUtils ;
71
+ import org .apache .hadoop .metrics2 .util .MBeans ;
69
72
import org .apache .hadoop .net .NetUtils ;
70
73
import org .apache .hadoop .security .SecurityUtil ;
71
74
import org .apache .hadoop .security .UserGroupInformation ;
76
79
import org .apache .hadoop .util .ToolRunner ;
77
80
78
81
import org .apache .hadoop .util .Preconditions ;
82
+ import org .apache .hadoop .util .VersionInfo ;
79
83
80
84
/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
81
85
* when some datanodes become full or when new empty nodes join the cluster.
180
184
*/
181
185
182
186
@ InterfaceAudience .Private
183
- public class Balancer {
187
+ public class Balancer implements BalancerMXBean {
184
188
static final Logger LOG = LoggerFactory .getLogger (Balancer .class );
185
189
186
190
static final Path BALANCER_ID_PATH = new Path ("/system/balancer.id" );
@@ -241,6 +245,7 @@ public class Balancer {
241
245
private final boolean sortTopNodes ;
242
246
private final int limitOverUtilizedNum ;
243
247
private final BalancerMetrics metrics ;
248
+ private ObjectName balancerInfoBeanName ;
244
249
245
250
// all data node lists
246
251
private final Collection <Source > overUtilized = new LinkedList <Source >();
@@ -377,6 +382,8 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
377
382
DFSConfigKeys .DFS_BLOCK_SIZE_KEY ,
378
383
DFSConfigKeys .DFS_BLOCK_SIZE_DEFAULT );
379
384
this .metrics = BalancerMetrics .create (this );
385
+
386
+ registerBalancerMXBean ();
380
387
}
381
388
382
389
private static long getCapacity (DatanodeStorageReport report , StorageType t ) {
@@ -680,6 +687,13 @@ private boolean matchStorageGroups(StorageGroup left, StorageGroup right,
680
687
left .getDatanodeInfo (), right .getDatanodeInfo ());
681
688
}
682
689
690
+ /**
691
+ * Register BalancerMXBean.
692
+ */
693
+ private void registerBalancerMXBean () {
694
+ balancerInfoBeanName = MBeans .register ("Balancer" , "BalancerInfo" , this );
695
+ }
696
+
683
697
/* reset all fields in a balancer preparing for the next iteration */
684
698
void resetData (Configuration conf ) {
685
699
this .overUtilized .clear ();
@@ -689,12 +703,32 @@ void resetData(Configuration conf) {
689
703
this .policy .reset ();
690
704
this .dispatcher .reset (conf );
691
705
DefaultMetricsSystem .removeSourceName (metrics .getName ());
706
+ if (balancerInfoBeanName != null ) {
707
+ MBeans .unregister (balancerInfoBeanName );
708
+ balancerInfoBeanName = null ;
709
+ }
692
710
}
693
711
694
712
NameNodeConnector getNnc () {
695
713
return nnc ;
696
714
}
697
715
716
+ @ Override
717
+ public String getVersion () {
718
+ return VersionInfo .getVersion () + ", r" + VersionInfo .getRevision ();
719
+ }
720
+
721
+ @ Override
722
+ public String getSoftwareVersion () {
723
+ return VersionInfo .getVersion ();
724
+ }
725
+
726
+ @ Override
727
+ public String getCompileInfo () {
728
+ return VersionInfo .getDate () + " by " + VersionInfo .getUser () + " from "
729
+ + VersionInfo .getBranch ();
730
+ }
731
+
698
732
static class Result {
699
733
private final ExitStatus exitStatus ;
700
734
private final long bytesLeftToMove ;
@@ -860,6 +894,7 @@ static private int doBalance(Collection<URI> namenodes,
860
894
+ " NameNode" );
861
895
862
896
List <NameNodeConnector > connectors = Collections .emptyList ();
897
+ BalancerHttpServer balancerHttpServer = startBalancerHttpServer (conf );
863
898
try {
864
899
connectors = NameNodeConnector .newNameNodeConnectors (namenodes , nsIds ,
865
900
Balancer .class .getSimpleName (), BALANCER_ID_PATH , conf ,
@@ -872,6 +907,9 @@ static private int doBalance(Collection<URI> namenodes,
872
907
if (p .getBlockPools ().size () == 0
873
908
|| p .getBlockPools ().contains (nnc .getBlockpoolID ())) {
874
909
final Balancer b = new Balancer (nnc , p , conf );
910
+ if (balancerHttpServer != null ) {
911
+ balancerHttpServer .setBalancerAttribute (b );
912
+ }
875
913
final Result r = b .runOneIteration ();
876
914
r .print (iteration , nnc , System .out );
877
915
@@ -898,6 +936,9 @@ static private int doBalance(Collection<URI> namenodes,
898
936
for (NameNodeConnector nnc : connectors ) {
899
937
IOUtils .cleanupWithLogger (LOG , nnc );
900
938
}
939
+ if (balancerHttpServer != null ) {
940
+ balancerHttpServer .stop ();
941
+ }
901
942
}
902
943
return ExitStatus .SUCCESS .getExitCode ();
903
944
}
@@ -969,6 +1010,18 @@ static void stop() {
969
1010
serviceRunning = false ;
970
1011
}
971
1012
1013
+ private static BalancerHttpServer startBalancerHttpServer (Configuration conf ) throws IOException {
1014
+ boolean httpServerEnabled = conf .getBoolean (DFSConfigKeys .DFS_BALANCER_HTTPSERVER_ENABLED_KEY ,
1015
+ DFSConfigKeys .DFS_BALANCER_HTTPSERVER_ENABLED_DEFAULT );
1016
+ if (httpServerEnabled ) {
1017
+ BalancerHttpServer balancerHttpServer = new BalancerHttpServer (conf );
1018
+ balancerHttpServer .start ();
1019
+ return balancerHttpServer ;
1020
+ } else {
1021
+ return null ;
1022
+ }
1023
+ }
1024
+
972
1025
private static void checkKeytabAndInit (Configuration conf )
973
1026
throws IOException {
974
1027
if (conf .getBoolean (DFSConfigKeys .DFS_BALANCER_KEYTAB_ENABLED_KEY ,
0 commit comments