Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@ hivemqExtension {
}
}

java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}

tasks.compileJava {
javaCompiler = javaToolchains.compilerFor {
languageVersion = JavaLanguageVersion.of(11)
}
}

dependencies {
compileOnly(libs.jetbrains.annotations)

Expand Down Expand Up @@ -69,6 +81,7 @@ testing {
"test"(JvmTestSuite::class) {
dependencies {
compileOnly(libs.jetbrains.annotations)
implementation(libs.assertj)
implementation(libs.mockito)
}
}
Expand Down
4 changes: 3 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[versions]
assertj = "3.27.6"
awaitility = "4.3.0"
aws-sdkv2 = "2.40.5"
dropwizard-metrics-cloudwatch = "2.0.8"
Expand All @@ -9,11 +10,12 @@ jaxb-api = "2.3.3"
jaxb-impl = "2.3.9"
jetbrains-annotations = "26.0.2-1"
junit-jupiter = "5.10.0"
logback = "1.5.21"
logback = "1.5.22"
mockito = "5.21.0"
testcontainers = "2.0.2"

[libraries]
assertj = { module = "org.assertj:assertj-core", version.ref = "assertj" }
awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" }
aws-sdkv2-cloudwatch = { module = "software.amazon.awssdk:cloudwatch", version.ref = "aws-sdkv2" }
dropwizard-metrics-cloudwatch = { module = "io.github.azagniotov:dropwizard-metrics-cloudwatch", version.ref = "dropwizard-metrics-cloudwatch" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.extensions.aws.cloudwatch;

import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
Expand Down Expand Up @@ -51,7 +52,9 @@ class EndToEndIT {
private final @NotNull LocalStackContainer localStack =
new LocalStackContainer(OciImages.getImageName("localstack/localstack")).withServices("cloudwatch")
.withNetwork(network)
.withNetworkAliases("localstack");
.withNetworkAliases("localstack")
.withLogConsumer(outputFrame -> System.out.println("LOCALSTACK: " +
outputFrame.getUtf8StringWithoutLineEnding()));

private final @NotNull HiveMQContainer hivemq =
new HiveMQContainer(OciImages.getImageName("hivemq/extensions/hivemq-aws-cloudwatch-extension")
Expand Down Expand Up @@ -92,7 +95,8 @@ void endToEnd() {
.region(Region.of(localStack.getRegion()))
.build();

await().timeout(Durations.FIVE_MINUTES)
await().timeout(Durations.TWO_MINUTES)
.pollInterval(Durations.ONE_SECOND)
.until(() -> cloudWatchClient.listMetrics()
.metrics()
.stream()
Expand All @@ -109,7 +113,7 @@ void endToEnd() {

final var metricDataQuery = MetricDataQuery.builder().id("m1").metricStat(metricStat).returnData(true).build();

await().timeout(Durations.FIVE_MINUTES).until(() -> {
await().timeout(Durations.TWO_MINUTES).pollInterval(Durations.ONE_SECOND).until(() -> {
final GetMetricDataRequest request = GetMetricDataRequest.builder()
.startTime(Instant.now().minusSeconds(3600))
.endTime(Instant.now())
Expand All @@ -129,7 +133,7 @@ void endToEnd() {
mqttClient.connect();
mqttClient.publishWith().topic("wabern").send();

await().timeout(Durations.FIVE_MINUTES).until(() -> {
await().timeout(Durations.TWO_MINUTES).pollInterval(Durations.ONE_SECOND).until(() -> {
final var request = GetMetricDataRequest.builder()
.startTime(Instant.now().minusSeconds(3600))
.endTime(Instant.now())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.extensions.aws.cloudwatch;

import com.codahale.metrics.MetricRegistry;
Expand All @@ -28,14 +29,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;

/**
* Main class for HiveMQ AWS CloudWatch extension
* Main class for HiveMQ AWS CloudWatch extension.
* <p>
* After HiveMQ is started, the configured HiveMQ metrics will be exposed to Amazon AWS - CloudWatch
* After HiveMQ is started, the configured HiveMQ metrics will be exposed to AWS CloudWatch.
*
* @author Anja Helmbrecht-Schaar
* @author David Sondermann
*/
@SuppressWarnings("unused")
public class CloudWatchMain implements ExtensionMain {
Expand All @@ -50,10 +49,9 @@ public class CloudWatchMain implements ExtensionMain {
public final void extensionStart(
final @NotNull ExtensionStartInput extensionStartInput,
final @NotNull ExtensionStartOutput extensionStartOutput) {

try {
final File extensionHomeFolder = extensionStartInput.getExtensionInformation().getExtensionHomeFolder();
final ExtensionConfiguration extensionConfiguration = new ExtensionConfiguration(extensionHomeFolder);
final var extensionHomeFolder = extensionStartInput.getExtensionInformation().getExtensionHomeFolder();
final var extensionConfiguration = new ExtensionConfiguration(extensionHomeFolder);
reporterService.startCloudWatchReporter(extensionConfiguration, service, metricRegistry);
log.info("Start {}", extensionStartInput.getExtensionInformation().getName());
} catch (final Exception e) {
Expand All @@ -66,7 +64,6 @@ public final void extensionStart(
public final void extensionStop(
final @NotNull ExtensionStopInput extensionStopInput,
final @NotNull ExtensionStopOutput extensionStopOutput) {

reporterService.stopCloudWatchReporter();
log.info("Stop {}", extensionStopInput.getExtensionInformation().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.extensions.aws.cloudwatch;

import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.hivemq.extension.sdk.api.services.ManagedExtensionExecutorService;
import com.hivemq.extensions.aws.cloudwatch.configuration.ExtensionConfiguration;
import com.hivemq.extensions.aws.cloudwatch.configuration.entities.Config;
import io.github.azagniotov.metrics.reporter.cloudwatch.CloudWatchReporter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -31,15 +31,14 @@
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;

import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.TimeUnit;

/**
* @author Anja Helmbrecht-Schaar
* @author David Sondermann
*/
class CloudWatchReporterService {

Expand All @@ -57,44 +56,36 @@ void startCloudWatchReporter(
final @NotNull ExtensionConfiguration configuration,
final @NotNull ManagedExtensionExecutorService executorService,
final @NotNull MetricRegistry metricRegistry) {

final Config cloudWatchConfig = configuration.getConfig();

final var cloudWatchConfig = configuration.getConfig();
if (configuration.getEnabledMetrics().isEmpty()) {
log.warn("No HiveMQ metrics enabled, no AWS CloudWatch report started");
} else {
final Duration apiTimeout = cloudWatchConfig.getApiTimeout().map(Duration::ofMillis).orElse(null);

final CloudWatchAsyncClientBuilder cloudWatchAsyncClientBuilder = CloudWatchAsyncClient.builder();
final var apiTimeout = cloudWatchConfig.getApiTimeout().map(Duration::ofMillis).orElse(null);

final var cloudWatchAsyncClientBuilder = CloudWatchAsyncClient.builder()
.credentialsProvider(DefaultCredentialsProvider.builder().build())
.asyncConfiguration(ClientAsyncConfiguration.builder()
.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, executorService)
.build())
.overrideConfiguration(ClientOverrideConfiguration.builder()
.apiCallTimeout(apiTimeout)
.apiCallAttemptTimeout(apiTimeout)
.build());
if (configuration.getConfig().getCloudWatchEndpointOverride() != null) {
cloudWatchAsyncClientBuilder.endpointOverride(URI.create(configuration.getConfig()
.getCloudWatchEndpointOverride()));
}

final CloudWatchAsyncClient cloudWatchAsyncClient =
cloudWatchAsyncClientBuilder.credentialsProvider(DefaultCredentialsProvider.create())
.asyncConfiguration(ClientAsyncConfiguration.builder()
.advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
executorService)
.build())
.overrideConfiguration(ClientOverrideConfiguration.builder()
.apiCallTimeout(apiTimeout)
.apiCallAttemptTimeout(apiTimeout)
.build())
.build();

final CloudWatchReporter.Builder builder =
CloudWatchReporter.forRegistry(metricRegistry, cloudWatchAsyncClient, METRIC_NAMESPACE);

final var cloudWatchReporterBuilder = CloudWatchReporter //
.forRegistry(metricRegistry, cloudWatchAsyncClientBuilder.build(), METRIC_NAMESPACE) //
.filter(new ConfiguredMetricsFilter(configuration.getEnabledMetrics()));
if (cloudWatchConfig.getZeroValuesSubmission()) {
builder.withZeroValuesSubmission();
cloudWatchReporterBuilder.withZeroValuesSubmission();
}
if (cloudWatchConfig.getReportRawCountValue()) {
builder.withReportRawCountValue();
cloudWatchReporterBuilder.withReportRawCountValue();
}
cloudWatchReporter = builder
.filter(new ConfiguredMetricsFilter(configuration.getEnabledMetrics()))
.build();
cloudWatchReporter = cloudWatchReporterBuilder.build();
cloudWatchReporter.start(cloudWatchConfig.getReportInterval(), TimeUnit.MINUTES);
log.info("Started CloudWatchReporter for {} HiveMQ metrics", configuration.getEnabledMetrics().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.extensions.aws.cloudwatch.configuration;

import com.hivemq.extension.sdk.api.annotations.ThreadSafe;
Expand All @@ -24,19 +25,18 @@

import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.File;
import java.io.IOException;

/**
* @author Anja Helmbrecht-Schaar
* @author David Sondermann
*/
@ThreadSafe
class ConfigurationXmlParser {

private static final @NotNull Logger log = LoggerFactory.getLogger(ConfigurationXmlParser.class);

//jaxb context is thread safe
// JAXB context is thread safe
private final @NotNull JAXBContext jaxb;

ConfigurationXmlParser() {
Expand All @@ -50,7 +50,7 @@ class ConfigurationXmlParser {

final @NotNull Config unmarshalExtensionConfig(final @NotNull File file) throws IOException {
try {
final Unmarshaller unmarshaller = jaxb.createUnmarshaller();
final var unmarshaller = jaxb.createUnmarshaller();
return (Config) unmarshaller.unmarshal(file);
} catch (final JAXBException e) {
log.error("Error in the AWS CloudWatch Extension. Could not unmarshal XML configuration", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.extensions.aws.cloudwatch.configuration;

import com.hivemq.extensions.aws.cloudwatch.configuration.entities.Config;
import com.hivemq.extensions.aws.cloudwatch.configuration.entities.Metric;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,12 +26,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* @author Anja Helmbrecht-Schaar
* @author David Sondermann
*/
public class ExtensionConfiguration {

Expand All @@ -58,9 +57,7 @@ public ExtensionConfiguration(final @NotNull File extensionHomeFolder) {
* @return the new config based on the file contents or null if the config is invalid
*/
private @NotNull Config read(final @NotNull File file) {

final Config defaultConfig = new Config();

final var defaultConfig = new Config();
if (file.exists() && file.canRead() && file.length() > 0) {
return doRead(file, defaultConfig);
} else {
Expand All @@ -72,20 +69,17 @@ public ExtensionConfiguration(final @NotNull File extensionHomeFolder) {

private @NotNull Config doRead(final @NotNull File file, final @NotNull Config defaultConfig) {
try {
final Config newConfig = configurationXmlParser.unmarshalExtensionConfig(file);
final var newConfig = configurationXmlParser.unmarshalExtensionConfig(file);
if (newConfig.getApiTimeout().isPresent() && newConfig.getApiTimeout().get() < 1) {
log.warn("Connection timeout must be greater than 0, using default timeout");
newConfig.setApiTimeout(defaultConfig.getApiTimeout().orElse(null));
}

if (newConfig.getReportInterval() < 1) {
log.warn("Report interval must be greater than 0, using default interval {}",
defaultConfig.getReportInterval());
newConfig.setReportInterval(defaultConfig.getReportInterval());
}

return newConfig;

} catch (final IOException e) {
log.warn("Could not read extension configuration file, reason: {}, using defaults {} ",
e.getMessage(),
Expand All @@ -96,7 +90,7 @@ public ExtensionConfiguration(final @NotNull File extensionHomeFolder) {

public @NotNull List<String> getEnabledMetrics() {
if (enabledMetrics.isEmpty()) {
final Lock writeLock = lock.writeLock();
final var writeLock = lock.writeLock();
writeLock.lock();
try {
enabledMetrics = readEnabledMetrics();
Expand All @@ -109,16 +103,15 @@ public ExtensionConfiguration(final @NotNull File extensionHomeFolder) {
}

private @NotNull List<String> readEnabledMetrics() {
final Lock readLock = lock.readLock();
final var readLock = lock.readLock();
readLock.lock();
try {
final List<String> newMetrics = new ArrayList<>();

final var newMetrics = new ArrayList<String>();
if (config.getMetrics().isEmpty()) {
log.error("Could not find any enabled HiveMQ metrics in configuration, no metrics were reported. ");
return List.of();
}
for (final Metric metric : config.getMetrics()) {
for (final var metric : config.getMetrics()) {
if (metric.isEnabled() && !metric.getValue().isEmpty()) {
newMetrics.add(metric.getValue());
log.trace("Added HiveMQ metric {} ", metric.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.extensions.aws.cloudwatch.configuration.entities;

import org.jetbrains.annotations.NotNull;
Expand All @@ -29,7 +30,7 @@
import java.util.Optional;

/**
* @author Anja Helmbrecht-Schaar
* @author David Sondermann
*/
@SuppressWarnings({"FieldCanBeLocal", "FieldMayBeFinal"})
@XmlRootElement(name = "cloudwatch-extension-configuration")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hivemq.extensions.aws.cloudwatch.configuration.entities;

import org.jetbrains.annotations.NotNull;
Expand All @@ -25,7 +26,7 @@
import javax.xml.bind.annotation.XmlValue;

/**
* @author Anja Helmbrecht-Schaar
* @author David Sondermann
*/
@XmlType(propOrder = {})
@XmlAccessorType(XmlAccessType.NONE)
Expand Down
Loading