From 0edc510d297e044db98a1293a6bdac15ca52ac8e Mon Sep 17 00:00:00 2001 From: Sravan Thotakura <83568543+SravanThotakura05@users.noreply.github.com> Date: Wed, 19 Nov 2025 23:29:48 +0530 Subject: [PATCH 1/9] Handled isEmpty operation on dataframe, logging enhancements and fixes to checkpoint handling --- .../spark/streaming/SolaceMicroBatch.java | 17 +++++++----- .../offset/SolaceMessageTracker.java | 10 +++---- .../SolaceInputPartitionReader.java | 6 ++--- .../spark/streaming/solace/EventListener.java | 9 +++---- .../spark/streaming/solace/SolaceBroker.java | 27 +++++++++++++++---- 5 files changed, 43 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java b/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java index 5532fee..29a0141 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java @@ -201,20 +201,23 @@ public Offset initialOffset() { if(existingCheckpoints != null && !existingCheckpoints.isEmpty()) { currentCheckpoint = existingCheckpoints; existingCheckpoints.forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs())); - + log.info("SolaceSparkConnector - Checkpoint available from LVQ {}", new Gson().toJson(existingCheckpoints)); return new SolaceSourceOffset(lastKnownOffsetId, existingCheckpoints); } - + log.info("SolaceSparkConnector - Initial Offset from LVQ is not available, the micro integration will use the available offset in checkpoint else a new checkpoint state will be created"); return new SolaceSourceOffset(lastKnownOffsetId, new CopyOnWriteArrayList<>()); } @Override public Offset deserializeOffset(String json) { SolaceSourceOffset solaceSourceOffset = getDeserializedOffset(json); - if(solaceSourceOffset != null) { - lastKnownOffsetId = solaceSourceOffset.getOffset(); - solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs())); + if(solaceSourceOffset.getCheckpoints() != null && solaceSourceOffset.getCheckpoints().isEmpty()) { + log.info("SolaceSparkConnector - No offset is available in spark checkpoint location. New checkpoint state will be created"); + } else { + log.info("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset)); } + lastKnownOffsetId = solaceSourceOffset.getOffset(); + solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs())); return solaceSourceOffset; } @@ -229,6 +232,8 @@ private SolaceSourceOffset getDeserializedOffset(String json) { } else { return migrate(solaceSourceOffset.getOffset(), ""); } + } else { + return solaceSourceOffset; } } catch (Exception e) { log.warn("SolaceSparkConnector - Exception when deserializing offset. May be due incompatible formats. Connector will try to migrate to latest offset format."); @@ -244,8 +249,6 @@ private SolaceSourceOffset getDeserializedOffset(String json) { throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format.", e); } } - - return null; } private SolaceSourceOffset migrate(int offset, String messageIds) { diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java b/src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java index 9366bb8..e5ab877 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java @@ -11,7 +11,7 @@ import java.util.concurrent.CopyOnWriteArrayList; public final class SolaceMessageTracker implements Serializable { - private static String lastBatchId = ""; + private static ConcurrentHashMap lastBatchId = new ConcurrentHashMap<>(); private static final Logger logger = LogManager.getLogger(SolaceMessageTracker.class); private static ConcurrentHashMap> messages = new ConcurrentHashMap<>(); private static ConcurrentHashMap lastProcessedMessageId = new ConcurrentHashMap<>(); @@ -77,11 +77,11 @@ public static void resetId(String uniqueId) { logger.info("SolaceSparkConnector - Cleared all messages from Offset Manager for {}", uniqueId); } - public static String getLastBatchId() { - return lastBatchId; + public static String getLastBatchId(String uniqueId) { + return lastBatchId.get(uniqueId); } - public static void setLastBatchId(String lastBatchId) { - SolaceMessageTracker.lastBatchId = lastBatchId; + public static void setLastBatchId(String uniqueId, String batchId) { + lastBatchId.put(uniqueId, batchId); } } diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java b/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java index 2fd39ba..12eeac4 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java @@ -72,7 +72,7 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i * In case when multiple operations are performed on dataframe, input partition will be called as part of Spark scan. * We need to acknowledge messages only if new batch is started. In case of same batch we will return the same messages. */ - if (!currentBatchId.equals(SolaceMessageTracker.getLastBatchId())) { + if (!currentBatchId.equals(SolaceMessageTracker.getLastBatchId(this.uniqueId))) { /* Currently solace can ack messages on consumer flow. So ack previous messages before starting to process new ones. * If Spark starts new input partition it indicates previous batch of data is successful. So we can acknowledge messages here. * Solace connection is always active and acknowledgements should be successful. It might throw exception if connection is lost @@ -90,7 +90,7 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i } } - SolaceMessageTracker.setLastBatchId(currentBatchId); + SolaceMessageTracker.setLastBatchId(this.uniqueId, currentBatchId); this.includeHeaders = includeHeaders; this.properties = properties; @@ -239,7 +239,7 @@ private SolaceMessage getNextMessage() { while (shouldProcessMoreMessages(batchSize, messages)) { try { if (iterator.hasNext()) { - shouldTrackMessage = false; +// shouldTrackMessage = false; solaceMessage = iterator.next(); if (solaceMessage == null) { return null; diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java b/src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java index d3385a2..2fa435b 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java @@ -48,10 +48,7 @@ public void onReceive(BytesXMLMessage msg) { if(!this.checkpoints.isEmpty()) { List lastKnownMessageIDs = new ArrayList<>(); String messageID = SolaceUtils.getMessageID(msg, this.offsetIndicator); - boolean hasPartitionKey = false; - if(msg.getProperties() != null && msg.getProperties().containsKey(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)) { - hasPartitionKey = true; - } + boolean hasPartitionKey = msg.getProperties() != null && msg.getProperties().containsKey(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY); if(!hasPartitionKey) { for(SolaceSparkPartitionCheckpoint checkpoint : this.checkpoints) { lastKnownMessageIDs.addAll(Arrays.stream(checkpoint.getMessageIDs().split(",")).collect(Collectors.toList())); @@ -87,7 +84,7 @@ public void onReceive(BytesXMLMessage msg) { this.messages.add(new SolaceMessage(msg)); } } else { - log.warn("SolaceSparkConnector - Incoming message has no partition key but messages in checkpoint has partition id's. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system."); + log.warn("SolaceSparkConnector - Incoming message partition key is either null or empty. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system."); this.messages.add(new SolaceMessage(msg)); } } @@ -112,7 +109,7 @@ private void compareMessageIds(List lastKnownMessageIDs, String messageI } else { this.messages.add(new SolaceMessage(msg)); if (lastKnownMessageIDs.size() > 1) { - log.warn("SolaceSparkConnector - Message ID {} not acknowledged as it may be a previously failed message received before newer checkpointed ones(parallel consumers). The connector will not check against checkpointed message IDs. It will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system.", messageID); + log.warn("SolaceSparkConnector - Message ID {} was not acknowledged because it may be an older, previously failed message received ahead of more recent checkpointed messages (due to parallel consumers). The connector does not validate message IDs against checkpoints, so this message will be reprocessed to ensure reliability. Any resulting duplicates must be handled by the downstream system.”", messageID); break; } } diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java b/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java index 3fbcc42..0a23a5f 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java @@ -210,18 +210,35 @@ public void createLVQIfNotExist() { EndpointProperties endpoint_props = new EndpointProperties(); endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE); - endpoint_props.setPermission(EndpointProperties.PERMISSION_CONSUME); endpoint_props.setQuota(0); try { this.session.provision(lvq, endpoint_props, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS | JCSMPSession.WAIT_FOR_CONFIRM); } catch (JCSMPException e) { log.error("SolaceSparkConnector - Exception creating LVQ. Shutting down consumer ", e); close(); + this.isException = true; + this.exception = e; + throw new RuntimeException(e); } try { this.session.addSubscription(lvq, JCSMPFactory.onlyInstance().createTopic(this.lvqTopic), JCSMPSession.WAIT_FOR_CONFIRM); } catch (JCSMPException e) { - log.warn("SolaceSparkConnector - Subscription already exists on LVQ. Ignoring error"); + if(e instanceof JCSMPErrorResponseException) { + JCSMPErrorResponseException jce = (JCSMPErrorResponseException) e; + if(jce.getResponsePhrase().contains("Subscription Already Exists")) { + log.warn("SolaceSparkConnector - Subscription Already Exists on LVQ {}", this.lvqName); + } else { + close(); + this.isException = true; + this.exception = e; + throw new RuntimeException(e); + } + } else { + close(); + this.isException = true; + this.exception = e; + throw new RuntimeException(e); + } } } @@ -233,10 +250,10 @@ public CopyOnWriteArrayList browseLVQ() { try { Browser myBrowser = session.createBrowser(br_prop); BytesXMLMessage rx_msg = null; - log.info("SolaceSparkConnector - Browsing message from LVQ {}", this.lvqName); + log.info("SolaceSparkConnector - Browsing checkpoint from LVQ {}", this.lvqName); rx_msg = myBrowser.getNext(); if (rx_msg != null) { - log.info("SolaceSparkConnector - Browsed message from LVQ {}", this.lvqName); + log.info("SolaceSparkConnector - Browsed checkpoint from LVQ {}", this.lvqName); byte[] msgData = new byte[0]; if (rx_msg.getContentLength() != 0) { msgData = rx_msg.getBytes(); @@ -247,7 +264,7 @@ public CopyOnWriteArrayList browseLVQ() { lastKnownCheckpoint = new Gson().fromJson(new String(msgData, StandardCharsets.UTF_8), new TypeToken>(){}.getType()); log.info("SolaceSparkConnector - Checkpoint from LVQ {}", new String(msgData, StandardCharsets.UTF_8)); } else { - log.info("SolaceSparkConnector - No message available from LVQ {}", this.lvqName); + log.info("SolaceSparkConnector - No checkpoint available in LVQ {}", this.lvqName); } myBrowser.close(); return lastKnownCheckpoint; From 0ed8d8429b24c6256b5abdafed77663e5f3333c8 Mon Sep 17 00:00:00 2001 From: Sravan Thotakura <83568543+SravanThotakura05@users.noreply.github.com> Date: Fri, 21 Nov 2025 19:52:16 +0530 Subject: [PATCH 2/9] Enhancements to LVQ Creation, handling operations on multiple data frame and improvements to ack handling, logging & documentation --- src/docs/asciidoc/User-Guide.adoc | 19 ++- .../solace-spark-source-config.adoc | 14 -- .../spark/streaming/SolaceMicroBatch.java | 6 +- .../offset/SolaceMessageTracker.java | 2 +- .../SolaceInputPartitionReader.java | 126 +++++++++++------- .../spark/streaming/solace/EventListener.java | 32 ++++- .../spark/streaming/solace/SolaceBroker.java | 14 +- 7 files changed, 138 insertions(+), 75 deletions(-) diff --git a/src/docs/asciidoc/User-Guide.adoc b/src/docs/asciidoc/User-Guide.adoc index d2469c0..2a6de5c 100644 --- a/src/docs/asciidoc/User-Guide.adoc +++ b/src/docs/asciidoc/User-Guide.adoc @@ -35,7 +35,7 @@ This guide assumes you are familiar with Spark set up and Spark Structured Strea === Supported Platforms -The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments. +The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled and 16.4 LTS(includes Apache Spark 3.5.2, Scala 2.12)). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments. === Quick Start common steps @@ -67,12 +67,27 @@ Solace Spark connector relies on Spark Checkpointing mechanism to resume from la === Checkpoint Handling -Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This give us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to store checkpoint along with Spark Checkpoint. +Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This gives us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to store checkpoint along with Spark Checkpoint. NOTE: In case of recovery, connector uses offset state from LVQ to identify last successfully processed messages. Hence, it is recommended not to delete or modify offset state in LVQ. In some cases, there might be checkpoint failures as spark may fail to write to checkpoint during instance crash or unavailability or other reasons. Though the connector will handle duplicates in most cases, we recommend to keep your downstream systems idempotent. +==== Pre-requisites for LVQ creation +The following pre-requisites are applicable for LVQ that are provisioned by Solace Administrator. +1. The Queue should be of type Exclusive +2. Spool Quota should be set to 0 +3. Owner of the Queue should be the client username used by the micro integration +4. Non-Owner access should be set to No Access to prevent unauthorized access +5. Add a topic subscription +6. Ensure the ACL applied to the user has publish access to the topic subscribed by LVQ + +The following pre-requisites are applicable if the micro integration need to create LVQ if it doesn't exist. + +1. Ensure the Client Profile applied to the user has "Allow Client to Create Endpoints" enabled. +2. Ensure the ACL applied to the user has publish and subscribe access to the topic subscribed by LVQ. The Subscribe access is required to programmatically apply the topic subscription to LVQ. +3. The Micro Integration will create a LVQ with pre-requisites mentioned in above section. + === User Authentication Solace Spark Connector supports Basic, Client Certificate and OAuth authentication to Solace. Client Credentials flow is supported when connecting using OAuth. diff --git a/src/docs/sections/general/configuration/solace-spark-source-config.adoc b/src/docs/sections/general/configuration/solace-spark-source-config.adoc index 967fdd9..6191247 100644 --- a/src/docs/sections/general/configuration/solace-spark-source-config.adoc +++ b/src/docs/sections/general/configuration/solace-spark-source-config.adoc @@ -204,20 +204,6 @@ a| Set this value to true if connector needs to identify and acknowledge process NOTE: This property will be void if replay strategy is enabled. -| offsetIndicator -| string -| any -| MESSAGE_ID, CORRELATION_ID, APPLICATION_MESSAGE_ID, -a| Set this value if your Solace Message has unique ID in message header. Supported Values are - - * MESSAGE_ID - * CORRELATION_ID - * APPLICATION_MESSAGE_ID - * - refers to one of headers in user properties -Header. - -Note: Default value uses replication group message ID property as offset indicator. ReplicationGroupMessageId is a unique message id across a broker cluster. - | includeHeaders | boolean | true or false diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java b/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java index 29a0141..efed32a 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java @@ -192,6 +192,9 @@ private Optional getExecutorLocation(List executorLocations, int @Override public PartitionReaderFactory createReaderFactory() { log.info("SolaceSparkConnector - Create reader factory with includeHeaders :: {}", this.includeHeaders); + if(currentCheckpoint != null && currentCheckpoint.isEmpty()) { + currentCheckpoint = this.getCheckpoint(); + } return new SolaceDataSourceReaderFactory(this.includeHeaders, this.properties, currentCheckpoint, this.checkpointLocation); } @@ -217,6 +220,7 @@ public Offset deserializeOffset(String json) { log.info("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset)); } lastKnownOffsetId = solaceSourceOffset.getOffset(); + currentCheckpoint = solaceSourceOffset.getCheckpoints(); solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs())); return solaceSourceOffset; @@ -246,7 +250,7 @@ private SolaceSourceOffset getDeserializedOffset(String json) { } } catch (Exception e2) { log.error("SolaceSparkConnector - Exception when migrating offset to latest format."); - throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format.", e); + throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format. Please delete the checkpoint and restart the micro integration", e); } } } diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java b/src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java index e5ab877..e3e90d6 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java @@ -35,7 +35,7 @@ public static void addMessage(String uniqueId, SolaceMessage message) { if(messages.containsKey(uniqueId)) { messageList = messages.get(uniqueId); } - messageList.add(message); + messageList.addIfAbsent(message); messages.put(uniqueId, messageList); } diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java b/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java index 12eeac4..4f1c665 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java @@ -67,6 +67,21 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i this.solaceInputPartition = inputPartition; this.uniqueId = this.solaceInputPartition.getId(); + this.includeHeaders = includeHeaders; + this.properties = properties; + this.taskContext = taskContext; + this.taskId = taskContext.taskAttemptId(); + this.checkpoints = checkpoints; + this.checkpointLocation = checkpointLocation; + this.batchSize = Integer.parseInt(properties.getOrDefault(SolaceSparkStreamingProperties.BATCH_SIZE, SolaceSparkStreamingProperties.BATCH_SIZE_DEFAULT)); + this.receiveWaitTimeout = Long.parseLong(properties.getOrDefault(SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT, SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT_DEFAULT)); + this.closeReceiversOnPartitionClose = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.CLOSE_RECEIVERS_ON_PARTITION_CLOSE, SolaceSparkStreamingProperties.CLOSE_RECEIVERS_ON_PARTITION_CLOSE_DEFAULT)); + boolean ackLastProcessedMessages = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES, SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES_DEFAULT)); + String replayStrategy = this.properties.getOrDefault(SolaceSparkStreamingProperties.REPLAY_STRATEGY, null); + if (replayStrategy != null && !replayStrategy.isEmpty()) { + ackLastProcessedMessages = false; + } + String currentBatchId = taskContext.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY()); /* * In case when multiple operations are performed on dataframe, input partition will be called as part of Spark scan. @@ -83,29 +98,26 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i SolaceMessageTracker.ackMessages(uniqueId); log.trace("SolaceSparkConnector - Total time taken to acknowledge messages {} ms", (System.currentTimeMillis() - startTime)); } else { + log.info("SolaceSparkConnector - Spark Batch with id {} is requesting data again. It may be because of multiple operations on same dataframe.", currentBatchId); isCommitTriggered = false; CopyOnWriteArrayList messageList = SolaceMessageTracker.getMessages(uniqueId); if (messageList != null) { iterator = messageList.iterator(); + if(messageList.size() < batchSize) { + if(messageList.size() == 1) { + log.info("SolaceSparkConnector - Only {} message is available from earlier request. This is most likely due to the result of isEmpty or similar operation on dataframe. Spark will immediately terminate input partition if one message is available from source.", messageList.size()); + } + log.info("SolaceSparkConnector - Since only {} messages are available from earlier request. The remaining {} messages will be consumed from queue, if available, to fill the configured batch size of {}", messageList.size(), (batchSize - messageList.size()), this.batchSize); + } else { + log.info("SolaceSparkConnector - {} messages available from the earlier request, matching the configured batch size of {}. The same messages will be returned as batch is not yet completed.", this.batchSize, messageList.size()); + } + } } SolaceMessageTracker.setLastBatchId(this.uniqueId, currentBatchId); - this.includeHeaders = includeHeaders; - this.properties = properties; - this.taskContext = taskContext; - this.taskId = taskContext.taskAttemptId(); - this.checkpoints = checkpoints; - this.checkpointLocation = checkpointLocation; - this.batchSize = Integer.parseInt(properties.getOrDefault(SolaceSparkStreamingProperties.BATCH_SIZE, SolaceSparkStreamingProperties.BATCH_SIZE_DEFAULT)); - this.receiveWaitTimeout = Long.parseLong(properties.getOrDefault(SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT, SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT_DEFAULT)); - this.closeReceiversOnPartitionClose = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.CLOSE_RECEIVERS_ON_PARTITION_CLOSE, SolaceSparkStreamingProperties.CLOSE_RECEIVERS_ON_PARTITION_CLOSE_DEFAULT)); - boolean ackLastProcessedMessages = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES, SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES_DEFAULT)); - String replayStrategy = this.properties.getOrDefault(SolaceSparkStreamingProperties.REPLAY_STRATEGY, null); - if (replayStrategy == null || replayStrategy.isEmpty()) { - ackLastProcessedMessages = false; - } + log.info("SolaceSparkConnector - Checking for connection {}", inputPartition.getId()); @@ -131,7 +143,7 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i createNewConnection(inputPartition.getId(), ackLastProcessedMessages); } checkException(); - log.info("SolaceSparkConnector - Acquired connection to Solace broker for partition {}", inputPartition.getId()); + log.info("SolaceSparkConnector - Current consumer session on input partition {} is {}", inputPartition.getId(), this.solaceBroker.getUniqueName()); registerTaskListener(); } @@ -209,7 +221,7 @@ private SolaceMessage getNextMessage() { If commit is triggered or messageList is null we need to fetch messages from Solace. In case of same batch just return the available messages in message tracker. */ - if (this.isCommitTriggered || iterator == null || !iterator.hasNext()) { + if (this.isCommitTriggered || iterator == null) { LinkedBlockingQueue queue = solaceBroker.getMessages(0); if (queue != null) { while (shouldProcessMoreMessages(batchSize, messages)) { @@ -254,6 +266,29 @@ private SolaceMessage getNextMessage() { return solaceMessage; } } else { + LinkedBlockingQueue queue = solaceBroker.getMessages(0); + if (queue != null) { + try { + solaceMessage = queue.poll(receiveWaitTimeout, TimeUnit.MILLISECONDS); + if (solaceMessage == null) { + return null; + } + + if (batchSize > 0) { + messages++; + } + if (isMessageAlreadyProcessed(solaceMessage)) { + log.info("Message is added to previous partitions for processing. Moving to next message"); + } else { + return solaceMessage; + } + + } catch (InterruptedException | SDTException e) { + log.warn("No messages available within specified receiveWaitTimeout", e); + Thread.currentThread().interrupt(); + return null; + } + } return null; } } catch (Exception e) { @@ -303,39 +338,40 @@ private void registerTaskListener() { if (context.isInterrupted() || context.isFailed()) { logShutdownMessage(context); } else if (context.isCompleted()) { - List ids = SolaceMessageTracker.getIds(); - try { - Path path = Paths.get(this.checkpointLocation + "/" + this.solaceInputPartition.getId() + ".txt"); - log.trace("SolaceSparkConnector - File path {} to store checkpoint processed in worker node {}", path.toString(), this.solaceInputPartition.getPreferredLocation()); - Path parentDir = path.getParent(); - if (parentDir != null) { - // Create the directory and all nonexistent parent directories - Files.createDirectories(parentDir); - log.trace("SolaceSparkConnector - Created parent directory {} for file path {}", parentDir.toString(), path.toString()); - } - // overwrite checkpoint to preserve latest value - try (BufferedWriter writer = Files.newBufferedWriter(path, StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING)) { - for (String id : ids) { - String processedMessageIDs = SolaceMessageTracker.getProcessedMessagesIDs(id); - if (processedMessageIDs != null) { - SolaceSparkPartitionCheckpoint solaceSparkPartitionCheckpoint = new SolaceSparkPartitionCheckpoint(processedMessageIDs, id); - CopyOnWriteArrayList solaceSparkPartitionCheckpoints = new CopyOnWriteArrayList<>(); - solaceSparkPartitionCheckpoints.add(solaceSparkPartitionCheckpoint); - // Publish state to checkpoint. On commit the state is published to Solace LVQ. - writer.write(new Gson().toJson(solaceSparkPartitionCheckpoints)); - writer.newLine(); - log.trace("SolaceSparkConnector - Checkpoint {} stored in file path {}", new Gson().toJson(solaceSparkPartitionCheckpoints), path.toString()); - SolaceMessageTracker.removeProcessedMessagesIDs(id); - } + String processedMessageIDs = SolaceMessageTracker.getProcessedMessagesIDs(this.solaceInputPartition.getId()); + System.out.println(this.solaceInputPartition.getId() + " - " + processedMessageIDs); + if(processedMessageIDs != null && !processedMessageIDs.isEmpty()) { + try { + Path path = Paths.get(this.checkpointLocation + "/" + this.solaceInputPartition.getId() + ".txt"); + log.trace("SolaceSparkConnector - File path {} to store checkpoint processed in worker node {}", path.toString(), this.solaceInputPartition.getPreferredLocation()); + Path parentDir = path.getParent(); + if (parentDir != null) { + // Create the directory and all nonexistent parent directories + Files.createDirectories(parentDir); + log.trace("SolaceSparkConnector - Created parent directory {} for file path {}", parentDir.toString(), path.toString()); } + // overwrite checkpoint to preserve latest value + try (BufferedWriter writer = Files.newBufferedWriter(path, StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING)) { +// for (String id : ids) { + SolaceSparkPartitionCheckpoint solaceSparkPartitionCheckpoint = new SolaceSparkPartitionCheckpoint(processedMessageIDs, this.solaceInputPartition.getId()); + CopyOnWriteArrayList solaceSparkPartitionCheckpoints = new CopyOnWriteArrayList<>(); + solaceSparkPartitionCheckpoints.add(solaceSparkPartitionCheckpoint); + // Publish state to checkpoint. On commit the state is published to Solace LVQ. + writer.write(new Gson().toJson(solaceSparkPartitionCheckpoints)); + writer.newLine(); + log.trace("SolaceSparkConnector - Checkpoint {} stored in file path {}", new Gson().toJson(solaceSparkPartitionCheckpoints), path.toString()); + SolaceMessageTracker.removeProcessedMessagesIDs(this.solaceInputPartition.getId()); + // } + } + } catch (IOException e) { + log.error("SolaceSparkConnector - Exception when writing checkpoint to path {}", this.checkpointLocation, e); + this.solaceBroker.close(); + throw new RuntimeException(e); } - } catch (IOException e) { - log.error("SolaceSparkConnector - Exception when writing checkpoint to path {}", this.checkpointLocation, e); - this.solaceBroker.close(); - throw new RuntimeException(e); } + log.info("SolaceSparkConnector - Total time taken by executor is {} ms for Task {}", context.taskMetrics().executorRunTime(), uniqueId); if (closeReceiversOnPartitionClose) { diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java b/src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java index 2fa435b..3cc1f86 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java @@ -18,7 +18,7 @@ public class EventListener implements XMLMessageListener, Serializable { - private static Logger log = LoggerFactory.getLogger(EventListener.class); + private static final Logger log = LoggerFactory.getLogger(EventListener.class); private final LinkedBlockingQueue messages; private final String id; private CopyOnWriteArrayList checkpoints = new CopyOnWriteArrayList<>(); @@ -55,7 +55,7 @@ public void onReceive(BytesXMLMessage msg) { }; if(lastKnownMessageIDs.isEmpty()) { - log.warn("SolaceSparkConnector - No checkpoint found. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system."); + log.warn("SolaceSparkConnector - No message ids available in checkpoint. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system."); this.messages.add(new SolaceMessage(msg)); } else { lastKnownMessageIDs.sort((o1, o2) -> { @@ -74,13 +74,13 @@ public void onReceive(BytesXMLMessage msg) { if(solaceSparkPartitionCheckpoint != null) { lastKnownMessageIDs = Arrays.stream(solaceSparkPartitionCheckpoint.getMessageIDs().split(",")).collect(Collectors.toList()); if(lastKnownMessageIDs.isEmpty()) { - log.warn("SolaceSparkConnector - No checkpoint found. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system."); + log.warn("SolaceSparkConnector - No message ids available in checkpoint. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system."); this.messages.add(new SolaceMessage(msg)); } else { compareMessageIds(lastKnownMessageIDs, messageID, msg); } } else { - log.warn("SolaceSparkConnector - No checkpoint found for partition key {}. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system.", partitionKey); + log.warn("SolaceSparkConnector - No checkpoint found for partition key {} and message id {}. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system.", partitionKey, messageID); this.messages.add(new SolaceMessage(msg)); } } else { @@ -105,13 +105,31 @@ private void compareMessageIds(List lastKnownMessageIDs, String messageI if ((currentMessageId.compare(checkpointMsgId) < 0 || currentMessageId.compare(checkpointMsgId) == 0) && lastKnownMessageIDs.size() == 1) { msg.ackMessage(); - log.info("SolaceSparkConnector - Acknowledged message with ID {} present in last known offset as user has set ackLastProcessedMessages to true in configuration", messageID); + log.info("SolaceSparkConnector - Acknowledged message with ID {} as user has set ackLastProcessedMessages to true in configuration", messageID); } else { - this.messages.add(new SolaceMessage(msg)); if (lastKnownMessageIDs.size() > 1) { - log.warn("SolaceSparkConnector - Message ID {} was not acknowledged because it may be an older, previously failed message received ahead of more recent checkpointed messages (due to parallel consumers). The connector does not validate message IDs against checkpoints, so this message will be reprocessed to ensure reliability. Any resulting duplicates must be handled by the downstream system.”", messageID); + log.info("SolaceSparkConnector - Checkpoint has more than one message ids {}. This might be due to parallel consumers. The message {} will be acknowledged only if it is older than available message ids else will be sent for reprocessing", lastKnownMessageIDs, currentMessageId); + int verificationCount = 0; + for(String id: lastKnownMessageIDs) { + ReplicationGroupMessageId idToReplicationGroupMessageId = JCSMPFactory.onlyInstance().createReplicationGroupMessageId(id); + if ((currentMessageId.compare(idToReplicationGroupMessageId) < 0 || currentMessageId.compare(idToReplicationGroupMessageId) == 0)) { + verificationCount++; + } + } + + if(verificationCount == lastKnownMessageIDs.size()) { + msg.ackMessage(); + log.info("SolaceSparkConnector - Acknowledged message with ID {} as user has set ackLastProcessedMessages to true in configuration and it is older than checkpoint message ids {}", messageID, lastKnownMessageIDs); + } else { + this.messages.add(new SolaceMessage(msg)); + log.info("SolaceSparkConnector - Message Id {} is added for reprocessing as it failed checkpoint validation", currentMessageId); + } break; + } else { + this.messages.add(new SolaceMessage(msg)); + log.info("SolaceSparkConnector - Message Id {} is added for reprocessing as it failed checkpoint validation", currentMessageId); } + } } } diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java b/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java index 0a23a5f..8f67c3d 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java @@ -334,8 +334,9 @@ public boolean isQueueFull() { retryCount++; } while (true); } catch (JCSMPException e) { - log.error("SolaceSparkConnector - Exception creating Queue Browser ", e); - handleException("SolaceSparkConnector - Exception creating Queue Browser ", e); + log.error("SolaceSparkConnector - Exception creating monitoring consumer on queue {}", this.queue, e); + close(); + handleException("SolaceSparkConnector - Exception creating monitoring consumer on queue " + this.queue, e); return false; } } @@ -484,6 +485,7 @@ public boolean isConnected() { } public LinkedBlockingQueue getMessages(int index) { +// log.info("SolaceSparkConnector - {} messages received on Solace Consumer Session {} are ready for processing", (index < this.eventListeners.size() ? this.eventListeners.get(index).getMessages().size() : 0), this.uniqueName); return index < this.eventListeners.size() ? this.eventListeners.get(index).getMessages() : null; } @@ -597,14 +599,16 @@ private void startWatchdog() { } if (lastMessageTimestamp > 0 && (System.currentTimeMillis() - lastMessageTimestamp) > timeout) { - log.info("SolaceSparkConnector - Inactivity timeout. Last message processed at {} Shutting down Solace Session.", lastMessageTimestamp); + log.info("SolaceSparkConnector - Inactivity timeout for consumer session {}. Last message processed at {}. Closing Session.", this.uniqueName, lastMessageTimestamp); close(); + } else if(lastMessageTimestamp == 0){ + log.info("SolaceSparkConnector - No messages are processed yet by consumer session {}, skipping idle timeout check.", this.uniqueName); } else { - log.info("SolaceSparkConnector - No messages are processed yet, skipping idle timeout check."); + log.info("SolaceSparkConnector - Last message is processed by consumer session {} at {} and current timestamp is {}", this.uniqueName, this.lastMessageTimestamp, System.currentTimeMillis()); } }, interval, interval, TimeUnit.MILLISECONDS); // initial delay, then interval } else { - log.info("SolaceSparkConnector - No connection idle timeout is configured. Connector will not check for idle connections."); + log.info("SolaceSparkConnector - No connection idle timeout is configured. Micro Integration will not check for idle connections."); } } From 048a007a5a36a91dc51b6a9d5700c620ba321751 Mon Sep 17 00:00:00 2001 From: Sravan Thotakura <83568543+SravanThotakura05@users.noreply.github.com> Date: Sun, 23 Nov 2025 15:39:29 +0530 Subject: [PATCH 3/9] added test for LVQ unauthorized access --- .../spark/SolaceSparkStreamingSourceIT.java | 80 ++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java index 2b0bb23..bc82ff7 100644 --- a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java +++ b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java @@ -50,7 +50,7 @@ public class SolaceSparkStreamingSourceIT { .withShmSize(SHM_SIZE) .withUlimits(ulimitList) .withCpuCount(1l); - }).withExposedPorts(8080, 55555).withTopic("solace/spark/streaming", Service.SMF).withTopic("solace/spark/connector/offset", Service.SMF); + }).withExposedPorts(8080, 55555).withTopic("solace/spark/streaming", Service.SMF).withTopic("solace/spark/connector/offset", Service.SMF).withTopic("solace/spark/streaming/offset", Service.SMF); private SparkSession sparkSession; @BeforeAll public void beforeAll() throws ApiException { @@ -73,6 +73,22 @@ public void beforeAll() throws ApiException { sempV2Api.config().createMsgVpnQueue("default", queue, null, null); sempV2Api.config().createMsgVpnQueueSubscription("default", "Solace/Queue/0", subscription, null, null); + + MsgVpnQueue lvq = new MsgVpnQueue(); + lvq.queueName("Solace/Queue/lvq/0"); + lvq.accessType(MsgVpnQueue.AccessTypeEnum.EXCLUSIVE); + lvq.permission(MsgVpnQueue.PermissionEnum.NO_ACCESS); + lvq.setOwner("default"); + lvq.ingressEnabled(true); + lvq.egressEnabled(true); + lvq.setMaxMsgSpoolUsage(0l); + + MsgVpnQueueSubscription lvqSubscription = new MsgVpnQueueSubscription(); + lvqSubscription.setSubscriptionTopic("solace/spark/streaming/offset"); + + sempV2Api.config().createMsgVpnQueue("default", lvq, null, null); + sempV2Api.config().createMsgVpnQueueSubscription("default", "Solace/Queue/lvq/0", lvqSubscription, null, null); + } else { throw new RuntimeException("Solace Container is not started yet"); } @@ -742,4 +758,66 @@ void Should_Fail_IfLVQTopic_Has_No_Permission_To_Publish() { assertTrue(e instanceof StreamingQueryException); } } + + @Test + void Should_Fail_IfLVQ_Has_No_Permission_To_Access() { + Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1"); + try{ + DataStreamReader reader = sparkSession.readStream() + .option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF)) + .option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn()) + .option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername()) + .option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword()) + .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES, 1) + .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES, 1) + .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES_PER_HOST, 1) + .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES_WAIT_TIME, 100) + .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+"sub_ack_window_threshold", 75) + .option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0") + .option(SolaceSparkStreamingProperties.BATCH_SIZE, "1") + .option(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_NAME, "Solace/Queue/lvq/0") + .option(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_TOPIC, "solace/spark/streaming/offset") + .option("checkpointLocation", path.toAbsolutePath().toString()) + .format("solace"); + Dataset dataset = reader.load(); + StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2, Long>) (dataset1, batchId) -> { + System.out.println(dataset1.count()); + }).start(); + streamingQuery.awaitTermination(); + } catch (Exception e) { + System.out.println(e); + assertTrue(e instanceof StreamingQueryException); + } + } + + @Test + void Should_Fail_IfLVQ_Has_No_Permission_To_Add_Subscription() { + Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1"); + try{ + DataStreamReader reader = sparkSession.readStream() + .option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF)) + .option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn()) + .option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername()) + .option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword()) + .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES, 1) + .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES, 1) + .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES_PER_HOST, 1) + .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES_WAIT_TIME, 100) + .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+"sub_ack_window_threshold", 75) + .option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0") + .option(SolaceSparkStreamingProperties.BATCH_SIZE, "1") + .option(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_NAME, "Solace/Queue/lvq/0") + .option(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_TOPIC, "invalid/topic") + .option("checkpointLocation", path.toAbsolutePath().toString()) + .format("solace"); + Dataset dataset = reader.load(); + StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2, Long>) (dataset1, batchId) -> { + System.out.println(dataset1.count()); + }).start(); + streamingQuery.awaitTermination(); + } catch (Exception e) { + System.out.println(e); + assertTrue(e instanceof StreamingQueryException); + } + } } \ No newline at end of file From 5adf76fe161bdc1e242b4de7532345e21057afdf Mon Sep 17 00:00:00 2001 From: Sravan Thotakura <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 24 Nov 2025 10:23:30 +0530 Subject: [PATCH 4/9] Add Cpu & Ulimits to handle container start up failed in integration tests --- .../streaming/partitions/SolaceInputPartitionReader.java | 1 - .../connectors/spark/oauth/SolaceOAuthContainer.java | 8 ++++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java b/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java index 4f1c665..ed99e1a 100644 --- a/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java +++ b/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java @@ -339,7 +339,6 @@ private void registerTaskListener() { logShutdownMessage(context); } else if (context.isCompleted()) { String processedMessageIDs = SolaceMessageTracker.getProcessedMessagesIDs(this.solaceInputPartition.getId()); - System.out.println(this.solaceInputPartition.getId() + " - " + processedMessageIDs); if(processedMessageIDs != null && !processedMessageIDs.isEmpty()) { try { Path path = Paths.get(this.checkpointLocation + "/" + this.solaceInputPartition.getId() + ".txt"); diff --git a/src/test/java/com/solacecoe/connectors/spark/oauth/SolaceOAuthContainer.java b/src/test/java/com/solacecoe/connectors/spark/oauth/SolaceOAuthContainer.java index 71c15b7..dba7714 100644 --- a/src/test/java/com/solacecoe/connectors/spark/oauth/SolaceOAuthContainer.java +++ b/src/test/java/com/solacecoe/connectors/spark/oauth/SolaceOAuthContainer.java @@ -1,6 +1,7 @@ package com.solacecoe.connectors.spark.oauth; import com.github.dockerjava.api.command.InspectContainerResponse; +import com.github.dockerjava.api.model.Ulimit; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.images.builder.Transferable; @@ -59,12 +60,15 @@ public SolaceOAuthContainer(String dockerImageName) { public SolaceOAuthContainer(DockerImageName dockerImageName) { super(dockerImageName); dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + Ulimit ulimit = new Ulimit("nofile", 2448, 1048576); + List ulimitList = new ArrayList<>(); + ulimitList.add(ulimit); withCreateContainerCmdModifier(cmd -> { cmd.withUser("1000"); cmd.getHostConfig() .withShmSize(SHM_SIZE) - .withMemorySwap(-1L) - .withMemoryReservation(0L); + .withUlimits(ulimitList) + .withCpuCount(1l); }); this.waitStrategy = Wait.forLogMessage(SOLACE_READY_MESSAGE, 1).withStartupTimeout(Duration.ofSeconds(60)); withExposedPorts(8080); From 85d1b039ca218229ca2eb95b3121759c89824cd5 Mon Sep 17 00:00:00 2001 From: Sravan Thotakura <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 24 Nov 2025 13:33:17 +0530 Subject: [PATCH 5/9] Added integration test for handling multiple operations on dataframe with parallel consumers --- .../spark/SolaceSparkStreamingSinkIT.java | 126 ++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSinkIT.java b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSinkIT.java index 7c01e08..464807b 100644 --- a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSinkIT.java +++ b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSinkIT.java @@ -680,6 +680,132 @@ public void onException(JCSMPException e) { @Test @Order(10) + void Should_ProcessData_WithSingleConsumer_And_Publish_To_Solace_With_MultipleOperations_On_Dataframe() throws TimeoutException, InterruptedException { + Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1"); + final long[] dataFrameCount = {0}; + DataStreamReader reader = sparkSession.readStream() + .option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF)) + .option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn()) + .option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername()) + .option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword()) + .option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0") + .option(SolaceSparkStreamingProperties.BATCH_SIZE, "50") + .option(SolaceSparkStreamingProperties.INCLUDE_HEADERS, true) + .option("checkpointLocation", path.toAbsolutePath().toString()) + .format("solace"); + final long[] count = {0}; + Dataset dataset = reader.load(); + + StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2, Long>) (dataset1, batchId) -> { + if(!dataset1.isEmpty()) { + dataFrameCount[0] = dataFrameCount[0] + dataset1.count(); + dataset1 = dataset1.drop("TimeStamp", "PartitionKey", "Headers"); + dataset1.write() + .option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF)) + .option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn()) + .option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername()) + .option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword()) + .option(SolaceSparkStreamingProperties.BATCH_SIZE, 0) + .option(SolaceSparkStreamingProperties.TOPIC, "random/topic") + .mode(SaveMode.Append) + .format("solace").save(); + } + }).start(); + + SolaceSession session = new SolaceSession(solaceContainer.getOrigin(Service.SMF), solaceContainer.getVpn(), solaceContainer.getUsername(), solaceContainer.getPassword()); + Topic topic = JCSMPFactory.onlyInstance().createTopic("random/topic"); + XMLMessageConsumer messageConsumer = null; + try { + messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() { + @Override + public void onReceive(BytesXMLMessage bytesXMLMessage) { + count[0] = count[0] + 1; + } + + @Override + public void onException(JCSMPException e) { + // Not required for test + + } + }); + session.getSession().addSubscription(topic); + messageConsumer.start(); + } catch (JCSMPException e) { + throw new RuntimeException(e); + } + + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals(100, dataFrameCount[0])); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals(100, count[0])); + Thread.sleep(3000); // add timeout to ack messages on queue + streamingQuery.stop(); + } + + @Test + @Order(11) + void Should_ProcessData_WithMultipleConsumer_And_Publish_To_Solace_With_MultipleOperations_On_Dataframe() throws TimeoutException, InterruptedException { + Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1"); + final long[] dataFrameCount = {0}; + DataStreamReader reader = sparkSession.readStream() + .option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF)) + .option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn()) + .option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername()) + .option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword()) + .option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0") + .option(SolaceSparkStreamingProperties.BATCH_SIZE, "50") + .option(SolaceSparkStreamingProperties.INCLUDE_HEADERS, true) + .option(SolaceSparkStreamingProperties.PARTITIONS, "3") + .option(SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT, 1000) + .option("checkpointLocation", path.toAbsolutePath().toString()) + .format("solace"); + final long[] count = {0}; + Dataset dataset = reader.load(); + + StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2, Long>) (dataset1, batchId) -> { + if(!dataset1.isEmpty()) { + dataFrameCount[0] = dataFrameCount[0] + dataset1.count(); + dataset1 = dataset1.drop("TimeStamp", "PartitionKey", "Headers"); + dataset1.write() + .option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF)) + .option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn()) + .option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername()) + .option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword()) + .option(SolaceSparkStreamingProperties.BATCH_SIZE, 0) + .option(SolaceSparkStreamingProperties.TOPIC, "random/topic") + .mode(SaveMode.Append) + .format("solace").save(); + } + }).start(); + + SolaceSession session = new SolaceSession(solaceContainer.getOrigin(Service.SMF), solaceContainer.getVpn(), solaceContainer.getUsername(), solaceContainer.getPassword()); + Topic topic = JCSMPFactory.onlyInstance().createTopic("random/topic"); + XMLMessageConsumer messageConsumer = null; + try { + messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() { + @Override + public void onReceive(BytesXMLMessage bytesXMLMessage) { + count[0] = count[0] + 1; + } + + @Override + public void onException(JCSMPException e) { + // Not required for test + + } + }); + session.getSession().addSubscription(topic); + messageConsumer.start(); + } catch (JCSMPException e) { + throw new RuntimeException(e); + } + + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals(100, dataFrameCount[0])); + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals(100, count[0])); + Thread.sleep(3000); // add timeout to ack messages on queue + streamingQuery.stop(); + } + + @Test + @Order(12) void Should_Not_ProcessData_When_QueueIsEmpty() throws TimeoutException, InterruptedException { Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1"); final long[] batchTriggerCount = {0}; From 492d1586d118bc72c71353415925a50805e04117 Mon Sep 17 00:00:00 2001 From: Sravan Thotakura <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 24 Nov 2025 15:02:02 +0530 Subject: [PATCH 6/9] Update SolaceSparkStreamingSinkIT.java --- .../connectors/spark/SolaceSparkStreamingSinkIT.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSinkIT.java b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSinkIT.java index 464807b..8dcf3aa 100644 --- a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSinkIT.java +++ b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSinkIT.java @@ -63,7 +63,7 @@ public void beforeAll() throws ApiException { SempV2Api sempV2Api = new SempV2Api(String.format("http://%s:%d", solaceContainer.getHost(), solaceContainer.getMappedPort(8080)), "admin", "admin"); MsgVpnQueue queue = new MsgVpnQueue(); queue.queueName("Solace/Queue/0"); - queue.accessType(MsgVpnQueue.AccessTypeEnum.EXCLUSIVE); + queue.accessType(MsgVpnQueue.AccessTypeEnum.NON_EXCLUSIVE); queue.permission(MsgVpnQueue.PermissionEnum.DELETE); queue.ingressEnabled(true); queue.egressEnabled(true); @@ -741,7 +741,7 @@ public void onException(JCSMPException e) { } @Test - @Order(11) + @Order(12) void Should_ProcessData_WithMultipleConsumer_And_Publish_To_Solace_With_MultipleOperations_On_Dataframe() throws TimeoutException, InterruptedException { Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1"); final long[] dataFrameCount = {0}; @@ -805,7 +805,7 @@ public void onException(JCSMPException e) { } @Test - @Order(12) + @Order(13) void Should_Not_ProcessData_When_QueueIsEmpty() throws TimeoutException, InterruptedException { Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1"); final long[] batchTriggerCount = {0}; @@ -960,6 +960,7 @@ void Should_Fail_Publish_IfMessageTopicIsMissing() { } @Test + @Order(11) void Should_Fail_Publish_IfMessagePayloadIsMissing() { Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1"); try { From 5a1cf518f140476d687d124342a2bef898a1cc33 Mon Sep 17 00:00:00 2001 From: Sravan Thotakura <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 24 Nov 2025 15:22:05 +0530 Subject: [PATCH 7/9] Update SolaceSparkStreamingSourceIT.java --- .../connectors/spark/SolaceSparkStreamingSourceIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java index bc82ff7..928b016 100644 --- a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java +++ b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java @@ -63,7 +63,7 @@ public void beforeAll() throws ApiException { sempV2Api = new SempV2Api(String.format("http://%s:%d", solaceContainer.getHost(), solaceContainer.getMappedPort(8080)), "admin", "admin"); MsgVpnQueue queue = new MsgVpnQueue(); queue.queueName("Solace/Queue/0"); - queue.accessType(MsgVpnQueue.AccessTypeEnum.EXCLUSIVE); + queue.accessType(MsgVpnQueue.AccessTypeEnum.NON_EXCLUSIVE); queue.permission(MsgVpnQueue.PermissionEnum.DELETE); queue.ingressEnabled(true); queue.egressEnabled(true); From 2cc973f6e6d75658bf64a459cab8dd8a86ecbaaf Mon Sep 17 00:00:00 2001 From: Sravan Thotakura <83568543+SravanThotakura05@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:44:19 +0800 Subject: [PATCH 8/9] Fix failing test --- .../connectors/spark/SolaceSparkStreamingSourceIT.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java index 928b016..877c9ad 100644 --- a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java +++ b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java @@ -67,6 +67,7 @@ public void beforeAll() throws ApiException { queue.permission(MsgVpnQueue.PermissionEnum.DELETE); queue.ingressEnabled(true); queue.egressEnabled(true); + queue.setMaxDeliveredUnackedMsgsPerFlow(50L); MsgVpnQueueSubscription subscription = new MsgVpnQueueSubscription(); subscription.setSubscriptionTopic("solace/spark/streaming"); @@ -248,6 +249,9 @@ void Should_ProcessSolaceTextMessage() throws TimeoutException, InterruptedExcep Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals("Hello Spark!", payload.get())); Thread.sleep(3000); // add timeout to ack messages on queue streamingQuery.stop(); + + sparkSession.stop(); + sparkSession.close(); } @@ -255,6 +259,10 @@ void Should_ProcessSolaceTextMessage() throws TimeoutException, InterruptedExcep @Test @Order(4) void Should_CreateMultipleConsumersOnDifferentSessions_And_ProcessData() throws TimeoutException, InterruptedException, com.solace.semp.v2.monitor.ApiException { + sparkSession = SparkSession.builder() + .appName("data_source_test_1") + .master("local[*]") + .getOrCreate(); Path path = Paths.get("src", "test", "resources", "spark-checkpoint-2"); DataStreamReader reader = sparkSession.readStream() .option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF)) From a54aa2b96ab13fd1341c6f1e96957b879818249b Mon Sep 17 00:00:00 2001 From: Sravan Thotakura <83568543+SravanThotakura05@users.noreply.github.com> Date: Tue, 25 Nov 2025 17:12:59 +0800 Subject: [PATCH 9/9] Update SolaceSparkStreamingSourceIT.java --- .../connectors/spark/SolaceSparkStreamingSourceIT.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java index 877c9ad..f7ae29e 100644 --- a/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java +++ b/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java @@ -8,6 +8,7 @@ import com.solacecoe.connectors.spark.base.SempV2Api; import com.solacecoe.connectors.spark.base.SolaceSession; import com.solacecoe.connectors.spark.streaming.properties.SolaceSparkStreamingProperties; +import com.solacecoe.connectors.spark.streaming.solace.SolaceConnectionManager; import com.solacesystems.jcsmp.*; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; @@ -159,6 +160,8 @@ public void afterEach() throws IOException { if(Files.exists(path2)) { FileUtils.deleteDirectory(path2.toAbsolutePath().toFile()); } + + SolaceConnectionManager.closeAllConnections(); } @Test