diff --git a/build.gradle.kts b/build.gradle.kts index 3d6139b..74a3386 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -20,6 +20,18 @@ hivemqExtension { } } +java { + toolchain { + languageVersion = JavaLanguageVersion.of(21) + } +} + +tasks.compileJava { + javaCompiler = javaToolchains.compilerFor { + languageVersion = JavaLanguageVersion.of(11) + } +} + dependencies { compileOnly(libs.jetbrains.annotations) @@ -69,6 +81,7 @@ testing { "test"(JvmTestSuite::class) { dependencies { compileOnly(libs.jetbrains.annotations) + implementation(libs.assertj) implementation(libs.mockito) } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f29437c..c5a4430 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,4 +1,5 @@ [versions] +assertj = "3.27.6" awaitility = "4.3.0" aws-sdkv2 = "2.40.5" dropwizard-metrics-cloudwatch = "2.0.8" @@ -9,11 +10,12 @@ jaxb-api = "2.3.3" jaxb-impl = "2.3.9" jetbrains-annotations = "26.0.2-1" junit-jupiter = "5.10.0" -logback = "1.5.21" +logback = "1.5.22" mockito = "5.21.0" testcontainers = "2.0.2" [libraries] +assertj = { module = "org.assertj:assertj-core", version.ref = "assertj" } awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" } aws-sdkv2-cloudwatch = { module = "software.amazon.awssdk:cloudwatch", version.ref = "aws-sdkv2" } dropwizard-metrics-cloudwatch = { module = "io.github.azagniotov:dropwizard-metrics-cloudwatch", version.ref = "dropwizard-metrics-cloudwatch" } diff --git a/src/integrationTest/java/com/hivemq/extensions/aws/cloudwatch/EndToEndIT.java b/src/integrationTest/java/com/hivemq/extensions/aws/cloudwatch/EndToEndIT.java index 850b1c8..33396b9 100644 --- a/src/integrationTest/java/com/hivemq/extensions/aws/cloudwatch/EndToEndIT.java +++ b/src/integrationTest/java/com/hivemq/extensions/aws/cloudwatch/EndToEndIT.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.hivemq.extensions.aws.cloudwatch; import com.hivemq.client.mqtt.mqtt5.Mqtt5Client; @@ -51,7 +52,9 @@ class EndToEndIT { private final @NotNull LocalStackContainer localStack = new LocalStackContainer(OciImages.getImageName("localstack/localstack")).withServices("cloudwatch") .withNetwork(network) - .withNetworkAliases("localstack"); + .withNetworkAliases("localstack") + .withLogConsumer(outputFrame -> System.out.println("LOCALSTACK: " + + outputFrame.getUtf8StringWithoutLineEnding())); private final @NotNull HiveMQContainer hivemq = new HiveMQContainer(OciImages.getImageName("hivemq/extensions/hivemq-aws-cloudwatch-extension") @@ -92,7 +95,8 @@ void endToEnd() { .region(Region.of(localStack.getRegion())) .build(); - await().timeout(Durations.FIVE_MINUTES) + await().timeout(Durations.TWO_MINUTES) + .pollInterval(Durations.ONE_SECOND) .until(() -> cloudWatchClient.listMetrics() .metrics() .stream() @@ -109,7 +113,7 @@ void endToEnd() { final var metricDataQuery = MetricDataQuery.builder().id("m1").metricStat(metricStat).returnData(true).build(); - await().timeout(Durations.FIVE_MINUTES).until(() -> { + await().timeout(Durations.TWO_MINUTES).pollInterval(Durations.ONE_SECOND).until(() -> { final GetMetricDataRequest request = GetMetricDataRequest.builder() .startTime(Instant.now().minusSeconds(3600)) .endTime(Instant.now()) @@ -129,7 +133,7 @@ void endToEnd() { mqttClient.connect(); mqttClient.publishWith().topic("wabern").send(); - await().timeout(Durations.FIVE_MINUTES).until(() -> { + await().timeout(Durations.TWO_MINUTES).pollInterval(Durations.ONE_SECOND).until(() -> { final var request = GetMetricDataRequest.builder() .startTime(Instant.now().minusSeconds(3600)) .endTime(Instant.now()) diff --git a/src/main/java/com/hivemq/extensions/aws/cloudwatch/CloudWatchMain.java b/src/main/java/com/hivemq/extensions/aws/cloudwatch/CloudWatchMain.java index c401237..e1738ed 100755 --- a/src/main/java/com/hivemq/extensions/aws/cloudwatch/CloudWatchMain.java +++ b/src/main/java/com/hivemq/extensions/aws/cloudwatch/CloudWatchMain.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.hivemq.extensions.aws.cloudwatch; import com.codahale.metrics.MetricRegistry; @@ -28,14 +29,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; - /** - * Main class for HiveMQ AWS CloudWatch extension + * Main class for HiveMQ AWS CloudWatch extension. *
- * After HiveMQ is started, the configured HiveMQ metrics will be exposed to Amazon AWS - CloudWatch
+ * After HiveMQ is started, the configured HiveMQ metrics will be exposed to AWS CloudWatch.
*
- * @author Anja Helmbrecht-Schaar
+ * @author David Sondermann
*/
@SuppressWarnings("unused")
public class CloudWatchMain implements ExtensionMain {
@@ -50,10 +49,9 @@ public class CloudWatchMain implements ExtensionMain {
public final void extensionStart(
final @NotNull ExtensionStartInput extensionStartInput,
final @NotNull ExtensionStartOutput extensionStartOutput) {
-
try {
- final File extensionHomeFolder = extensionStartInput.getExtensionInformation().getExtensionHomeFolder();
- final ExtensionConfiguration extensionConfiguration = new ExtensionConfiguration(extensionHomeFolder);
+ final var extensionHomeFolder = extensionStartInput.getExtensionInformation().getExtensionHomeFolder();
+ final var extensionConfiguration = new ExtensionConfiguration(extensionHomeFolder);
reporterService.startCloudWatchReporter(extensionConfiguration, service, metricRegistry);
log.info("Start {}", extensionStartInput.getExtensionInformation().getName());
} catch (final Exception e) {
@@ -66,7 +64,6 @@ public final void extensionStart(
public final void extensionStop(
final @NotNull ExtensionStopInput extensionStopInput,
final @NotNull ExtensionStopOutput extensionStopOutput) {
-
reporterService.stopCloudWatchReporter();
log.info("Stop {}", extensionStopInput.getExtensionInformation().getName());
}
diff --git a/src/main/java/com/hivemq/extensions/aws/cloudwatch/CloudWatchReporterService.java b/src/main/java/com/hivemq/extensions/aws/cloudwatch/CloudWatchReporterService.java
index 01ee323..3afb4f0 100755
--- a/src/main/java/com/hivemq/extensions/aws/cloudwatch/CloudWatchReporterService.java
+++ b/src/main/java/com/hivemq/extensions/aws/cloudwatch/CloudWatchReporterService.java
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.hivemq.extensions.aws.cloudwatch;
import com.codahale.metrics.Metric;
@@ -20,7 +21,6 @@
import com.codahale.metrics.MetricRegistry;
import com.hivemq.extension.sdk.api.services.ManagedExtensionExecutorService;
import com.hivemq.extensions.aws.cloudwatch.configuration.ExtensionConfiguration;
-import com.hivemq.extensions.aws.cloudwatch.configuration.entities.Config;
import io.github.azagniotov.metrics.reporter.cloudwatch.CloudWatchReporter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -31,7 +31,6 @@
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
-import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
import java.net.URI;
import java.time.Duration;
@@ -39,7 +38,7 @@
import java.util.concurrent.TimeUnit;
/**
- * @author Anja Helmbrecht-Schaar
+ * @author David Sondermann
*/
class CloudWatchReporterService {
@@ -57,44 +56,36 @@ void startCloudWatchReporter(
final @NotNull ExtensionConfiguration configuration,
final @NotNull ManagedExtensionExecutorService executorService,
final @NotNull MetricRegistry metricRegistry) {
-
- final Config cloudWatchConfig = configuration.getConfig();
-
+ final var cloudWatchConfig = configuration.getConfig();
if (configuration.getEnabledMetrics().isEmpty()) {
log.warn("No HiveMQ metrics enabled, no AWS CloudWatch report started");
} else {
- final Duration apiTimeout = cloudWatchConfig.getApiTimeout().map(Duration::ofMillis).orElse(null);
-
- final CloudWatchAsyncClientBuilder cloudWatchAsyncClientBuilder = CloudWatchAsyncClient.builder();
+ final var apiTimeout = cloudWatchConfig.getApiTimeout().map(Duration::ofMillis).orElse(null);
+
+ final var cloudWatchAsyncClientBuilder = CloudWatchAsyncClient.builder()
+ .credentialsProvider(DefaultCredentialsProvider.builder().build())
+ .asyncConfiguration(ClientAsyncConfiguration.builder()
+ .advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, executorService)
+ .build())
+ .overrideConfiguration(ClientOverrideConfiguration.builder()
+ .apiCallTimeout(apiTimeout)
+ .apiCallAttemptTimeout(apiTimeout)
+ .build());
if (configuration.getConfig().getCloudWatchEndpointOverride() != null) {
cloudWatchAsyncClientBuilder.endpointOverride(URI.create(configuration.getConfig()
.getCloudWatchEndpointOverride()));
}
- final CloudWatchAsyncClient cloudWatchAsyncClient =
- cloudWatchAsyncClientBuilder.credentialsProvider(DefaultCredentialsProvider.create())
- .asyncConfiguration(ClientAsyncConfiguration.builder()
- .advancedOption(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
- executorService)
- .build())
- .overrideConfiguration(ClientOverrideConfiguration.builder()
- .apiCallTimeout(apiTimeout)
- .apiCallAttemptTimeout(apiTimeout)
- .build())
- .build();
-
- final CloudWatchReporter.Builder builder =
- CloudWatchReporter.forRegistry(metricRegistry, cloudWatchAsyncClient, METRIC_NAMESPACE);
-
+ final var cloudWatchReporterBuilder = CloudWatchReporter //
+ .forRegistry(metricRegistry, cloudWatchAsyncClientBuilder.build(), METRIC_NAMESPACE) //
+ .filter(new ConfiguredMetricsFilter(configuration.getEnabledMetrics()));
if (cloudWatchConfig.getZeroValuesSubmission()) {
- builder.withZeroValuesSubmission();
+ cloudWatchReporterBuilder.withZeroValuesSubmission();
}
if (cloudWatchConfig.getReportRawCountValue()) {
- builder.withReportRawCountValue();
+ cloudWatchReporterBuilder.withReportRawCountValue();
}
- cloudWatchReporter = builder
- .filter(new ConfiguredMetricsFilter(configuration.getEnabledMetrics()))
- .build();
+ cloudWatchReporter = cloudWatchReporterBuilder.build();
cloudWatchReporter.start(cloudWatchConfig.getReportInterval(), TimeUnit.MINUTES);
log.info("Started CloudWatchReporter for {} HiveMQ metrics", configuration.getEnabledMetrics().size());
}
diff --git a/src/main/java/com/hivemq/extensions/aws/cloudwatch/configuration/ConfigurationXmlParser.java b/src/main/java/com/hivemq/extensions/aws/cloudwatch/configuration/ConfigurationXmlParser.java
index cb4f1b3..2935ec0 100755
--- a/src/main/java/com/hivemq/extensions/aws/cloudwatch/configuration/ConfigurationXmlParser.java
+++ b/src/main/java/com/hivemq/extensions/aws/cloudwatch/configuration/ConfigurationXmlParser.java
@@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.hivemq.extensions.aws.cloudwatch.configuration;
import com.hivemq.extension.sdk.api.annotations.ThreadSafe;
@@ -24,19 +25,18 @@
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
import java.io.File;
import java.io.IOException;
/**
- * @author Anja Helmbrecht-Schaar
+ * @author David Sondermann
*/
@ThreadSafe
class ConfigurationXmlParser {
private static final @NotNull Logger log = LoggerFactory.getLogger(ConfigurationXmlParser.class);
- //jaxb context is thread safe
+ // JAXB context is thread safe
private final @NotNull JAXBContext jaxb;
ConfigurationXmlParser() {
@@ -50,7 +50,7 @@ class ConfigurationXmlParser {
final @NotNull Config unmarshalExtensionConfig(final @NotNull File file) throws IOException {
try {
- final Unmarshaller unmarshaller = jaxb.createUnmarshaller();
+ final var unmarshaller = jaxb.createUnmarshaller();
return (Config) unmarshaller.unmarshal(file);
} catch (final JAXBException e) {
log.error("Error in the AWS CloudWatch Extension. Could not unmarshal XML configuration", e);
diff --git a/src/main/java/com/hivemq/extensions/aws/cloudwatch/configuration/ExtensionConfiguration.java b/src/main/java/com/hivemq/extensions/aws/cloudwatch/configuration/ExtensionConfiguration.java
index 52bcdf1..0c5f75b 100755
--- a/src/main/java/com/hivemq/extensions/aws/cloudwatch/configuration/ExtensionConfiguration.java
+++ b/src/main/java/com/hivemq/extensions/aws/cloudwatch/configuration/ExtensionConfiguration.java
@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package com.hivemq.extensions.aws.cloudwatch.configuration;
import com.hivemq.extensions.aws.cloudwatch.configuration.entities.Config;
-import com.hivemq.extensions.aws.cloudwatch.configuration.entities.Metric;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,12 +26,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
- * @author Anja Helmbrecht-Schaar
+ * @author David Sondermann
*/
public class ExtensionConfiguration {
@@ -58,9 +57,7 @@ public ExtensionConfiguration(final @NotNull File extensionHomeFolder) {
* @return the new config based on the file contents or null if the config is invalid
*/
private @NotNull Config read(final @NotNull File file) {
-
- final Config defaultConfig = new Config();
-
+ final var defaultConfig = new Config();
if (file.exists() && file.canRead() && file.length() > 0) {
return doRead(file, defaultConfig);
} else {
@@ -72,20 +69,17 @@ public ExtensionConfiguration(final @NotNull File extensionHomeFolder) {
private @NotNull Config doRead(final @NotNull File file, final @NotNull Config defaultConfig) {
try {
- final Config newConfig = configurationXmlParser.unmarshalExtensionConfig(file);
+ final var newConfig = configurationXmlParser.unmarshalExtensionConfig(file);
if (newConfig.getApiTimeout().isPresent() && newConfig.getApiTimeout().get() < 1) {
log.warn("Connection timeout must be greater than 0, using default timeout");
newConfig.setApiTimeout(defaultConfig.getApiTimeout().orElse(null));
}
-
if (newConfig.getReportInterval() < 1) {
log.warn("Report interval must be greater than 0, using default interval {}",
defaultConfig.getReportInterval());
newConfig.setReportInterval(defaultConfig.getReportInterval());
}
-
return newConfig;
-
} catch (final IOException e) {
log.warn("Could not read extension configuration file, reason: {}, using defaults {} ",
e.getMessage(),
@@ -96,7 +90,7 @@ public ExtensionConfiguration(final @NotNull File extensionHomeFolder) {
public @NotNull List