From 5d274f57bd5ddfb9d9dbf46268c1d0dff2877c45 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Mon, 24 Mar 2025 14:39:10 -0500 Subject: [PATCH 1/2] Close localJsonTransformer resources when PacketToTransformingHttpHandlerFactory is closed Signed-off-by: Andre Kurait --- .../PacketToTransformingHttpHandlerFactory.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/PacketToTransformingHttpHandlerFactory.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/PacketToTransformingHttpHandlerFactory.java index 365b58ab08..326617458f 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/PacketToTransformingHttpHandlerFactory.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/PacketToTransformingHttpHandlerFactory.java @@ -1,5 +1,7 @@ package org.opensearch.migrations.replay; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; import org.opensearch.migrations.replay.datahandlers.IPacketFinalizingConsumer; @@ -15,11 +17,12 @@ @Slf4j public class PacketToTransformingHttpHandlerFactory - implements - PacketConsumerFactory> { + implements PacketConsumerFactory>, AutoCloseable { // Using ThreadLocal to ensure thread safety with the json transformers which will be reused private final ThreadLocal localJsonTransformer; + private final Set closeableResources = ConcurrentHashMap.newKeySet(); + // The authTransformerFactory is ThreadSafe and getAuthTransformer will be called for every request private final IAuthTransformerFactory authTransformerFactory; @@ -43,4 +46,12 @@ public IPacketFinalizingConsumer> create httpTransactionContext ); } + + @Override + public void close() throws Exception { + for (AutoCloseable resource : closeableResources) { + resource.close(); + } + localJsonTransformer.remove(); + } } From 0f63fec8e49e51c38ec2b8e019080017ce156eb7 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Mon, 24 Mar 2025 14:58:50 -0500 Subject: [PATCH 2/2] SonarQube fixes Signed-off-by: Andre Kurait --- .../migrations/bulkload/common/S3Repo.java | 2 +- .../migrations/parsing/ObjectNodeUtils.java | 1 + .../src/cluster_tools/base/main.py | 2 +- .../replay/ParsedHttpMessagesAsDicts.java | 17 ++++++++--------- .../HttpJsonRequestWithFaultingPayload.java | 2 ++ .../http/NettyJsonBodyAccumulateHandler.java | 2 +- .../replay/util/ActiveContextMonitor.java | 3 +-- 7 files changed, 15 insertions(+), 14 deletions(-) diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java index be65a6cdd9..f735803da2 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/S3Repo.java @@ -92,8 +92,8 @@ private void ensureFileExistsLocally(S3Uri s3Uri, Path localPath) { public static S3Repo create(Path s3LocalDir, S3Uri s3Uri, String s3Region) { S3AsyncClient s3Client = S3AsyncClient.crtBuilder() - .credentialsProvider(DefaultCredentialsProvider.create()) .region(Region.of(s3Region)) + .credentialsProvider(DefaultCredentialsProvider.create()) .retryConfiguration(r -> r.numRetries(3)) .targetThroughputInGbps(S3_TARGET_THROUGHPUT_GIBPS) .maxNativeMemoryLimitInBytes(S3_MAX_MEMORY_BYTES) diff --git a/RFS/src/main/java/org/opensearch/migrations/parsing/ObjectNodeUtils.java b/RFS/src/main/java/org/opensearch/migrations/parsing/ObjectNodeUtils.java index 8201fd81a1..674df281e1 100644 --- a/RFS/src/main/java/org/opensearch/migrations/parsing/ObjectNodeUtils.java +++ b/RFS/src/main/java/org/opensearch/migrations/parsing/ObjectNodeUtils.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; public class ObjectNodeUtils { + private ObjectNodeUtils() {} public static void removeFieldsByPath(ObjectNode node, String path) { var pathParts = path.split("\\."); diff --git a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/cluster_tools/src/cluster_tools/base/main.py b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/cluster_tools/src/cluster_tools/base/main.py index 48152b4e3d..d67d2fd07e 100644 --- a/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/cluster_tools/src/cluster_tools/base/main.py +++ b/TrafficCapture/dockerSolution/src/main/docker/migrationConsole/cluster_tools/src/cluster_tools/base/main.py @@ -69,7 +69,7 @@ def setup_parser(parser): message = f"The tool '{tool_name}' does not have a 'define_arguments' function. \ Please add one to specify its arguments." logger.error(message) - raise Exception(message) + raise ValueError(message) tool_parser.set_defaults(func=tool_module.main) # Set the main function as the handler except Exception as e: logger.error(f"An error occurred while importing the tool '{tool_name}': {e}") diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index 155507da09..c24a428cb6 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -245,15 +245,14 @@ private static Map convertResponse( } private static void encodeBinaryPayloadIfExists(HttpJsonMessageWithFaultingPayload message) { - if (message.payload() != null) { - if (message.payload().containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)) { - var byteBufBinaryBody = (ByteBuf) message.payload().get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY); - assert !message.payload().containsKey(JsonKeysForHttpMessage.INLINED_BASE64_BODY_DOCUMENT_KEY) : - "Expected " + JsonKeysForHttpMessage.INLINED_BASE64_BODY_DOCUMENT_KEY + " to not exist."; - message.payload().put(JsonKeysForHttpMessage.INLINED_BASE64_BODY_DOCUMENT_KEY, byteBufToBase64String(byteBufBinaryBody)); - message.payload().remove(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY); - byteBufBinaryBody.release(); - } + if (message.payload() != null && + message.payload().containsKey(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY)) { + var byteBufBinaryBody = (ByteBuf) message.payload().get(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY); + assert !message.payload().containsKey(JsonKeysForHttpMessage.INLINED_BASE64_BODY_DOCUMENT_KEY) : + "Expected " + JsonKeysForHttpMessage.INLINED_BASE64_BODY_DOCUMENT_KEY + " to not exist."; + message.payload().put(JsonKeysForHttpMessage.INLINED_BASE64_BODY_DOCUMENT_KEY, byteBufToBase64String(byteBufBinaryBody)); + message.payload().remove(JsonKeysForHttpMessage.INLINED_BINARY_BODY_DOCUMENT_KEY); + byteBufBinaryBody.release(); } } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonRequestWithFaultingPayload.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonRequestWithFaultingPayload.java index 69d7b87045..88d205231e 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonRequestWithFaultingPayload.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonRequestWithFaultingPayload.java @@ -32,10 +32,12 @@ public void setPath(String value) { this.put(JsonKeysForHttpMessage.URI_KEY, value); } + @Override public String protocol() { return (String) this.get(JsonKeysForHttpMessage.PROTOCOL_KEY); } + @Override public void setProtocol(String value) { this.put(JsonKeysForHttpMessage.PROTOCOL_KEY, value); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java index 35ed26b682..27dac403d0 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonBodyAccumulateHandler.java @@ -167,7 +167,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception private boolean hasRequestContentTypeMatching(HttpJsonMessageWithFaultingPayload message, Predicate contentTypeFilter) { // ContentType not text if specified and has a value with / and that value does not start with text/ - return Optional.ofNullable(capturedHttpJsonMessage.headers().insensitiveGet(HttpHeaderNames.CONTENT_TYPE.toString())) + return Optional.ofNullable(message.headers().insensitiveGet(HttpHeaderNames.CONTENT_TYPE.toString())) .map(s -> s.stream() .filter(v -> v.contains("/")) .filter(contentTypeFilter) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java index 8216e932ba..40e5464163 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/ActiveContextMonitor.java @@ -310,8 +310,7 @@ private static int contextDepth(IScopedInstrumentationAttributes activeScope, in } private String activityToString(OrderedWorkerTracker.TimeKeyAndFuture tkaf) { - var timeStr = "age=" + getAge(tkaf.nanoTimeKey); - return INDENT + timeStr + " " + formatWorkItem.apply(tkaf.future); + return INDENT + "age=" + getAge(tkaf.nanoTimeKey) + " " + formatWorkItem.apply(tkaf.future); } private String activityToString(IScopedInstrumentationAttributes context, int depthToInclude) {