Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions app/src/main/java/org/astraea/app/performance/Report.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import java.util.Optional;
import java.util.stream.Collectors;
import org.astraea.common.metrics.BeanQuery;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.client.consumer.ConsumerMetrics;
import org.astraea.common.metrics.client.consumer.HasConsumerFetchMetrics;
import org.astraea.common.metrics.client.producer.ProducerMetrics;

public interface Report {

static long recordsConsumedTotal() {
var client = MBeanClient.local();
var client = JndiClient.local();
return (long)
ConsumerMetrics.fetch(client).stream()
.mapToDouble(HasConsumerFetchMetrics::recordsConsumedTotal)
Expand All @@ -37,7 +37,7 @@ static long recordsConsumedTotal() {

static List<Report> consumers() {

return ConsumerMetrics.fetch(MBeanClient.local()).stream()
return ConsumerMetrics.fetch(JndiClient.local()).stream()
.map(
m ->
new Report() {
Expand Down Expand Up @@ -74,7 +74,7 @@ public String clientId() {
@Override
public Optional<Double> e2eLatency() {
return Optional.ofNullable(
MBeanClient.local()
JndiClient.local()
.bean(
BeanQuery.builder()
.domainName(ConsumerThread.DOMAIN_NAME)
Expand All @@ -91,7 +91,7 @@ public Optional<Double> e2eLatency() {
}

static List<Report> producers() {
return ProducerMetrics.producer(MBeanClient.local()).stream()
return ProducerMetrics.producer(JndiClient.local()).stream()
.map(
m ->
new Report() {
Expand All @@ -113,7 +113,7 @@ public double avgLatency() {
@Override
public Optional<Double> e2eLatency() {
return Optional.ofNullable(
MBeanClient.local()
JndiClient.local()
.bean(
BeanQuery.builder()
.domainName(ProducerThread.DOMAIN_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.astraea.common.DataSize;
import org.astraea.common.Utils;
import org.astraea.common.metrics.HasBeanObject;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.client.consumer.ConsumerMetrics;
import org.astraea.common.metrics.client.consumer.HasConsumerCoordinatorMetrics;
import org.astraea.common.metrics.client.producer.HasProducerTopicMetrics;
Expand All @@ -37,7 +37,7 @@
public interface TrackerThread extends AbstractThread {

class ProducerPrinter {
private final MBeanClient mBeanClient = MBeanClient.local();
private final JndiClient mBeanClient = JndiClient.local();
private final Supplier<List<Report>> reportSupplier;
private long lastRecords = 0;

Expand Down Expand Up @@ -97,7 +97,7 @@ boolean tryToPrint(Duration duration) {
}

class ConsumerPrinter {
private final MBeanClient mBeanClient = MBeanClient.local();
private final JndiClient mBeanClient = JndiClient.local();
private final Supplier<List<Report>> reportSupplier;
private long lastRecords = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.collector.MetricFetcher;

/** Keep fetching all kinds of metrics and publish to inner topics. */
Expand Down Expand Up @@ -58,7 +58,7 @@ static void execute(Arguments arguments) {
Collectors.toUnmodifiableMap(
NodeInfo::id,
node ->
MBeanClient.jndi(
JndiClient.of(
node.host(),
arguments.idToJmxPort().apply(node.id()))))))
.fetchBeanDelay(arguments.period)
Expand Down
5 changes: 2 additions & 3 deletions app/src/main/java/org/astraea/app/web/BeanHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.astraea.common.admin.Admin;
import org.astraea.common.metrics.BeanObject;
import org.astraea.common.metrics.BeanQuery;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.JndiClient;

public class BeanHandler implements Handler {
private final Admin admin;
Expand All @@ -45,8 +45,7 @@ public CompletionStage<Response> get(Channel channel) {
brokers.stream()
.map(
b -> {
try (var client =
MBeanClient.jndi(b.host(), jmxPorts.apply(b.id()))) {
try (var client = JndiClient.of(b.host(), jmxPorts.apply(b.id()))) {
return new NodeBean(
b.host(),
client.beans(builder.build()).stream()
Expand Down
4 changes: 2 additions & 2 deletions app/src/main/java/org/astraea/app/web/WebService.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.astraea.common.Utils;
import org.astraea.common.admin.Admin;
import org.astraea.common.admin.NodeInfo;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.collector.MetricSensor;
import org.astraea.common.metrics.collector.MetricStore;
Expand Down Expand Up @@ -62,8 +63,7 @@ public WebService(
Collectors.toUnmodifiableMap(
NodeInfo::id,
b ->
MBeanClient.jndi(
b.host(), brokerIdToJmxPort.apply(b.id())))));
JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id())))));
var metricStore =
MetricStore.builder()
.beanExpiration(beanExpiration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.astraea.common.json.JsonConverter;
import org.astraea.common.json.TypeRef;
import org.astraea.common.metrics.ClusterBean;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.collector.MetricSensor;
import org.astraea.common.metrics.collector.MetricStore;
Expand Down Expand Up @@ -1355,8 +1356,7 @@ private MetricStore metricStore(Admin admin, List<CostWeight> costWeights) {
Collectors.toUnmodifiableMap(
NodeInfo::id,
b ->
MBeanClient.jndi(
b.host(), brokerIdToJmxPort.apply(b.id())))));
JndiClient.of(b.host(), brokerIdToJmxPort.apply(b.id())))));
var cw = costWeights.stream().map(x -> x.cost).collect(Collectors.toSet());
var cf = Utils.costFunctions(cw, HasClusterCost.class, Configuration.EMPTY);
var metricSensors = cf.stream().map(c -> c.metricSensor().get()).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.cost.HasPartitionCost;
import org.astraea.common.cost.ReplicaLeaderSizeCost;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.collector.MetricStore;
import org.astraea.common.partitioner.PartitionerUtils;
Expand Down Expand Up @@ -159,13 +160,13 @@ public final void configure(Map<String, ?> configs) {
.brokers()
.thenApply(
brokers -> {
var map = new HashMap<Integer, MBeanClient>();
var map = new HashMap<Integer, JndiClient>();
brokers.forEach(
b ->
map.put(
b.id(), MBeanClient.jndi(b.host(), jmxPortGetter.apply(b.id()))));
b.id(), JndiClient.of(b.host(), jmxPortGetter.apply(b.id()))));
// add local client to fetch consumer metrics
map.put(-1, MBeanClient.local());
map.put(-1, JndiClient.local());
return Collections.unmodifiableMap(map);
});
metricStore =
Expand Down
184 changes: 184 additions & 0 deletions common/src/main/java/org/astraea/common/metrics/JndiClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MBeanFeatureInfo;
import javax.management.MBeanServerConnection;
import javax.management.ObjectInstance;
import javax.management.ReflectionException;
import javax.management.RuntimeMBeanException;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.astraea.common.Utils;

/** A MBeanClient used to retrieve mbean value from remote Jmx server. */
public interface JndiClient extends MBeanClient, AutoCloseable {

/**
* @param host the address of jmx server
* @param port the port of jmx server
* @return a mbean client using JNDI to lookup metrics.
*/
static JndiClient of(String host, int port) {
try {
return of(
new JMXServiceURL(
String.format(
"service:jmx:rmi://%s:%s/jndi/rmi://%s:%s/jmxrmi", host, port, host, port)));
} catch (MalformedURLException e) {
throw new IllegalArgumentException(e);
}
}

static JndiClient of(JMXServiceURL jmxServiceURL) {
return Utils.packException(
() -> {
var jmxConnector = JMXConnectorFactory.connect(jmxServiceURL);
return new BasicMBeanClient(
jmxConnector.getMBeanServerConnection(),
jmxServiceURL.getHost(),
jmxServiceURL.getPort()) {
@Override
public void close() {
Utils.close(jmxConnector);
}
};
});
}

static JndiClient local() {
return new BasicMBeanClient(ManagementFactory.getPlatformMBeanServer(), Utils.hostname(), -1);
}

@Override
default void close() {}

class BasicMBeanClient implements JndiClient {

private final MBeanServerConnection connection;
final String host;

final int port;

BasicMBeanClient(MBeanServerConnection connection, String host, int port) {
this.connection = connection;
this.host = host;
this.port = port;
}

@Override
public BeanObject bean(BeanQuery beanQuery) {
return Utils.packException(
() -> {
// ask for MBeanInfo
var mBeanInfo = connection.getMBeanInfo(beanQuery.objectName());

// create a list builder all available attributes name
var attributeName =
Arrays.stream(mBeanInfo.getAttributes())
.map(MBeanFeatureInfo::getName)
.collect(Collectors.toList());

// query the result
return queryBean(beanQuery, attributeName);
});
}

BeanObject queryBean(BeanQuery beanQuery, Collection<String> attributeNameCollection)
throws ReflectionException,
InstanceNotFoundException,
IOException,
AttributeNotFoundException,
MBeanException {
// fetch attribute value from mbean server
var attributeNameArray = attributeNameCollection.toArray(new String[0]);
var attributeList =
connection.getAttributes(beanQuery.objectName(), attributeNameArray).asList();

// collect attribute name & value into a map
var attributes = new HashMap<String, Object>();
attributeList.forEach(attribute -> attributes.put(attribute.getName(), attribute.getValue()));

// according to the javadoc of MBeanServerConnection#getAttributes, the API will
// ignore any error occurring during the fetch process (for example, attribute not
// exists). Below code check for such condition and try to figure out what exactly
// the error is. put it into attributes return result.
for (var str : attributeNameArray) {
if (attributes.containsKey(str)) continue;
try {
attributes.put(str, connection.getAttribute(beanQuery.objectName(), str));
} catch (RuntimeMBeanException e) {
if (!(e.getCause() instanceof UnsupportedOperationException))
throw new IllegalStateException(e);
// the UnsupportedOperationException is thrown when we query unacceptable
// attribute. we just skip it as it is normal case to
// return "acceptable" attribute only
}
}

// collect result, and build a new BeanObject as return result
return new BeanObject(beanQuery.domainName(), beanQuery.properties(), attributes);
}

@Override
public Collection<BeanObject> beans(
BeanQuery beanQuery, Consumer<RuntimeException> errorHandle) {
return Utils.packException(
() ->
connection.queryMBeans(beanQuery.objectName(), null).stream()
// Parallelize the sampling of bean objects. The underlying RMI is thread-safe.
// https://github.com/skiptests/astraea/issues/1553#issuecomment-1461143723
.parallel()
.map(ObjectInstance::getObjectName)
.map(BeanQuery::fromObjectName)
.flatMap(
query -> {
try {
return Stream.of(bean(query));
} catch (RuntimeException e) {
errorHandle.accept(e);
return Stream.empty();
}
})
.collect(Collectors.toUnmodifiableList()));
}

/**
* Returns the list of domains in which any MBean is currently registered.
*
* <p>The order of strings within the returned array is not defined.
*
* @return a {@link List} of domain name {@link String}
*/
List<String> domains() {
return Utils.packException(() -> Arrays.asList(connection.getDomains()));
}
}
}
Loading