Skip to content

Commit 0ed8d84

Browse files
Enhancements to LVQ Creation, handling operations on multiple data frame and improvements to ack handling, logging & documentation
1 parent 0edc510 commit 0ed8d84

File tree

7 files changed

+138
-75
lines changed

7 files changed

+138
-75
lines changed

src/docs/asciidoc/User-Guide.adoc

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ This guide assumes you are familiar with Spark set up and Spark Structured Strea
3535

3636
=== Supported Platforms
3737

38-
The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments.
38+
The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled and 16.4 LTS(includes Apache Spark 3.5.2, Scala 2.12)). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments.
3939

4040
=== Quick Start common steps
4141

@@ -67,12 +67,27 @@ Solace Spark connector relies on Spark Checkpointing mechanism to resume from la
6767

6868
=== Checkpoint Handling
6969

70-
Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This give us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to store checkpoint along with Spark Checkpoint.
70+
Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This gives us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to store checkpoint along with Spark Checkpoint.
7171

7272
NOTE: In case of recovery, connector uses offset state from LVQ to identify last successfully processed messages. Hence, it is recommended not to delete or modify offset state in LVQ.
7373

7474
In some cases, there might be checkpoint failures as spark may fail to write to checkpoint during instance crash or unavailability or other reasons. Though the connector will handle duplicates in most cases, we recommend to keep your downstream systems idempotent.
7575

76+
==== Pre-requisites for LVQ creation
77+
The following pre-requisites are applicable for LVQ that are provisioned by Solace Administrator.
78+
1. The Queue should be of type Exclusive
79+
2. Spool Quota should be set to 0
80+
3. Owner of the Queue should be the client username used by the micro integration
81+
4. Non-Owner access should be set to No Access to prevent unauthorized access
82+
5. Add a topic subscription
83+
6. Ensure the ACL applied to the user has publish access to the topic subscribed by LVQ
84+
85+
The following pre-requisites are applicable if the micro integration need to create LVQ if it doesn't exist.
86+
87+
1. Ensure the Client Profile applied to the user has "Allow Client to Create Endpoints" enabled.
88+
2. Ensure the ACL applied to the user has publish and subscribe access to the topic subscribed by LVQ. The Subscribe access is required to programmatically apply the topic subscription to LVQ.
89+
3. The Micro Integration will create a LVQ with pre-requisites mentioned in above section.
90+
7691
=== User Authentication
7792

7893
Solace Spark Connector supports Basic, Client Certificate and OAuth authentication to Solace. Client Credentials flow is supported when connecting using OAuth.

src/docs/sections/general/configuration/solace-spark-source-config.adoc

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -204,20 +204,6 @@ a| Set this value to true if connector needs to identify and acknowledge process
204204

205205
NOTE: This property will be void if replay strategy is enabled.
206206

207-
| offsetIndicator
208-
| string
209-
| any
210-
| MESSAGE_ID, CORRELATION_ID, APPLICATION_MESSAGE_ID, <CUSTOM_USER_PROPERTY>
211-
a| Set this value if your Solace Message has unique ID in message header. Supported Values are
212-
213-
* MESSAGE_ID
214-
* CORRELATION_ID
215-
* APPLICATION_MESSAGE_ID
216-
* <CUSTOM_USER_PROPERTY> - refers to one of headers in user properties
217-
Header.
218-
219-
Note: Default value uses replication group message ID property as offset indicator. ReplicationGroupMessageId is a unique message id across a broker cluster.
220-
221207
| includeHeaders
222208
| boolean
223209
| true or false

src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ private Optional<String> getExecutorLocation(List<String> executorLocations, int
192192
@Override
193193
public PartitionReaderFactory createReaderFactory() {
194194
log.info("SolaceSparkConnector - Create reader factory with includeHeaders :: {}", this.includeHeaders);
195+
if(currentCheckpoint != null && currentCheckpoint.isEmpty()) {
196+
currentCheckpoint = this.getCheckpoint();
197+
}
195198
return new SolaceDataSourceReaderFactory(this.includeHeaders, this.properties, currentCheckpoint, this.checkpointLocation);
196199
}
197200

@@ -217,6 +220,7 @@ public Offset deserializeOffset(String json) {
217220
log.info("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset));
218221
}
219222
lastKnownOffsetId = solaceSourceOffset.getOffset();
223+
currentCheckpoint = solaceSourceOffset.getCheckpoints();
220224
solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));
221225

222226
return solaceSourceOffset;
@@ -246,7 +250,7 @@ private SolaceSourceOffset getDeserializedOffset(String json) {
246250
}
247251
} catch (Exception e2) {
248252
log.error("SolaceSparkConnector - Exception when migrating offset to latest format.");
249-
throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format.", e);
253+
throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format. Please delete the checkpoint and restart the micro integration", e);
250254
}
251255
}
252256
}

src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public static void addMessage(String uniqueId, SolaceMessage message) {
3535
if(messages.containsKey(uniqueId)) {
3636
messageList = messages.get(uniqueId);
3737
}
38-
messageList.add(message);
38+
messageList.addIfAbsent(message);
3939
messages.put(uniqueId, messageList);
4040
}
4141

src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java

Lines changed: 81 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,21 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
6767

6868
this.solaceInputPartition = inputPartition;
6969
this.uniqueId = this.solaceInputPartition.getId();
70+
this.includeHeaders = includeHeaders;
71+
this.properties = properties;
72+
this.taskContext = taskContext;
73+
this.taskId = taskContext.taskAttemptId();
74+
this.checkpoints = checkpoints;
75+
this.checkpointLocation = checkpointLocation;
76+
this.batchSize = Integer.parseInt(properties.getOrDefault(SolaceSparkStreamingProperties.BATCH_SIZE, SolaceSparkStreamingProperties.BATCH_SIZE_DEFAULT));
77+
this.receiveWaitTimeout = Long.parseLong(properties.getOrDefault(SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT, SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT_DEFAULT));
78+
this.closeReceiversOnPartitionClose = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.CLOSE_RECEIVERS_ON_PARTITION_CLOSE, SolaceSparkStreamingProperties.CLOSE_RECEIVERS_ON_PARTITION_CLOSE_DEFAULT));
79+
boolean ackLastProcessedMessages = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES, SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES_DEFAULT));
80+
String replayStrategy = this.properties.getOrDefault(SolaceSparkStreamingProperties.REPLAY_STRATEGY, null);
81+
if (replayStrategy != null && !replayStrategy.isEmpty()) {
82+
ackLastProcessedMessages = false;
83+
}
84+
7085
String currentBatchId = taskContext.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY());
7186
/*
7287
* In case when multiple operations are performed on dataframe, input partition will be called as part of Spark scan.
@@ -83,29 +98,26 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
8398
SolaceMessageTracker.ackMessages(uniqueId);
8499
log.trace("SolaceSparkConnector - Total time taken to acknowledge messages {} ms", (System.currentTimeMillis() - startTime));
85100
} else {
101+
log.info("SolaceSparkConnector - Spark Batch with id {} is requesting data again. It may be because of multiple operations on same dataframe.", currentBatchId);
86102
isCommitTriggered = false;
87103
CopyOnWriteArrayList<SolaceMessage> messageList = SolaceMessageTracker.getMessages(uniqueId);
88104
if (messageList != null) {
89105
iterator = messageList.iterator();
106+
if(messageList.size() < batchSize) {
107+
if(messageList.size() == 1) {
108+
log.info("SolaceSparkConnector - Only {} message is available from earlier request. This is most likely due to the result of isEmpty or similar operation on dataframe. Spark will immediately terminate input partition if one message is available from source.", messageList.size());
109+
}
110+
log.info("SolaceSparkConnector - Since only {} messages are available from earlier request. The remaining {} messages will be consumed from queue, if available, to fill the configured batch size of {}", messageList.size(), (batchSize - messageList.size()), this.batchSize);
111+
} else {
112+
log.info("SolaceSparkConnector - {} messages available from the earlier request, matching the configured batch size of {}. The same messages will be returned as batch is not yet completed.", this.batchSize, messageList.size());
113+
}
114+
90115
}
91116
}
92117

93118
SolaceMessageTracker.setLastBatchId(this.uniqueId, currentBatchId);
94119

95-
this.includeHeaders = includeHeaders;
96-
this.properties = properties;
97-
this.taskContext = taskContext;
98-
this.taskId = taskContext.taskAttemptId();
99-
this.checkpoints = checkpoints;
100-
this.checkpointLocation = checkpointLocation;
101-
this.batchSize = Integer.parseInt(properties.getOrDefault(SolaceSparkStreamingProperties.BATCH_SIZE, SolaceSparkStreamingProperties.BATCH_SIZE_DEFAULT));
102-
this.receiveWaitTimeout = Long.parseLong(properties.getOrDefault(SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT, SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT_DEFAULT));
103-
this.closeReceiversOnPartitionClose = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.CLOSE_RECEIVERS_ON_PARTITION_CLOSE, SolaceSparkStreamingProperties.CLOSE_RECEIVERS_ON_PARTITION_CLOSE_DEFAULT));
104-
boolean ackLastProcessedMessages = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES, SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES_DEFAULT));
105-
String replayStrategy = this.properties.getOrDefault(SolaceSparkStreamingProperties.REPLAY_STRATEGY, null);
106-
if (replayStrategy == null || replayStrategy.isEmpty()) {
107-
ackLastProcessedMessages = false;
108-
}
120+
109121

110122
log.info("SolaceSparkConnector - Checking for connection {}", inputPartition.getId());
111123

@@ -131,7 +143,7 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
131143
createNewConnection(inputPartition.getId(), ackLastProcessedMessages);
132144
}
133145
checkException();
134-
log.info("SolaceSparkConnector - Acquired connection to Solace broker for partition {}", inputPartition.getId());
146+
log.info("SolaceSparkConnector - Current consumer session on input partition {} is {}", inputPartition.getId(), this.solaceBroker.getUniqueName());
135147
registerTaskListener();
136148
}
137149

@@ -209,7 +221,7 @@ private SolaceMessage getNextMessage() {
209221
If commit is triggered or messageList is null we need to fetch messages from Solace.
210222
In case of same batch just return the available messages in message tracker.
211223
*/
212-
if (this.isCommitTriggered || iterator == null || !iterator.hasNext()) {
224+
if (this.isCommitTriggered || iterator == null) {
213225
LinkedBlockingQueue<SolaceMessage> queue = solaceBroker.getMessages(0);
214226
if (queue != null) {
215227
while (shouldProcessMoreMessages(batchSize, messages)) {
@@ -254,6 +266,29 @@ private SolaceMessage getNextMessage() {
254266
return solaceMessage;
255267
}
256268
} else {
269+
LinkedBlockingQueue<SolaceMessage> queue = solaceBroker.getMessages(0);
270+
if (queue != null) {
271+
try {
272+
solaceMessage = queue.poll(receiveWaitTimeout, TimeUnit.MILLISECONDS);
273+
if (solaceMessage == null) {
274+
return null;
275+
}
276+
277+
if (batchSize > 0) {
278+
messages++;
279+
}
280+
if (isMessageAlreadyProcessed(solaceMessage)) {
281+
log.info("Message is added to previous partitions for processing. Moving to next message");
282+
} else {
283+
return solaceMessage;
284+
}
285+
286+
} catch (InterruptedException | SDTException e) {
287+
log.warn("No messages available within specified receiveWaitTimeout", e);
288+
Thread.currentThread().interrupt();
289+
return null;
290+
}
291+
}
257292
return null;
258293
}
259294
} catch (Exception e) {
@@ -303,39 +338,40 @@ private void registerTaskListener() {
303338
if (context.isInterrupted() || context.isFailed()) {
304339
logShutdownMessage(context);
305340
} else if (context.isCompleted()) {
306-
List<String> ids = SolaceMessageTracker.getIds();
307-
try {
308-
Path path = Paths.get(this.checkpointLocation + "/" + this.solaceInputPartition.getId() + ".txt");
309-
log.trace("SolaceSparkConnector - File path {} to store checkpoint processed in worker node {}", path.toString(), this.solaceInputPartition.getPreferredLocation());
310-
Path parentDir = path.getParent();
311-
if (parentDir != null) {
312-
// Create the directory and all nonexistent parent directories
313-
Files.createDirectories(parentDir);
314-
log.trace("SolaceSparkConnector - Created parent directory {} for file path {}", parentDir.toString(), path.toString());
315-
}
316-
// overwrite checkpoint to preserve latest value
317-
try (BufferedWriter writer = Files.newBufferedWriter(path, StandardOpenOption.CREATE,
318-
StandardOpenOption.TRUNCATE_EXISTING)) {
319-
for (String id : ids) {
320-
String processedMessageIDs = SolaceMessageTracker.getProcessedMessagesIDs(id);
321-
if (processedMessageIDs != null) {
322-
SolaceSparkPartitionCheckpoint solaceSparkPartitionCheckpoint = new SolaceSparkPartitionCheckpoint(processedMessageIDs, id);
323-
CopyOnWriteArrayList<SolaceSparkPartitionCheckpoint> solaceSparkPartitionCheckpoints = new CopyOnWriteArrayList<>();
324-
solaceSparkPartitionCheckpoints.add(solaceSparkPartitionCheckpoint);
325-
// Publish state to checkpoint. On commit the state is published to Solace LVQ.
326-
writer.write(new Gson().toJson(solaceSparkPartitionCheckpoints));
327-
writer.newLine();
328-
log.trace("SolaceSparkConnector - Checkpoint {} stored in file path {}", new Gson().toJson(solaceSparkPartitionCheckpoints), path.toString());
329-
SolaceMessageTracker.removeProcessedMessagesIDs(id);
330-
}
341+
String processedMessageIDs = SolaceMessageTracker.getProcessedMessagesIDs(this.solaceInputPartition.getId());
342+
System.out.println(this.solaceInputPartition.getId() + " - " + processedMessageIDs);
343+
if(processedMessageIDs != null && !processedMessageIDs.isEmpty()) {
344+
try {
345+
Path path = Paths.get(this.checkpointLocation + "/" + this.solaceInputPartition.getId() + ".txt");
346+
log.trace("SolaceSparkConnector - File path {} to store checkpoint processed in worker node {}", path.toString(), this.solaceInputPartition.getPreferredLocation());
347+
Path parentDir = path.getParent();
348+
if (parentDir != null) {
349+
// Create the directory and all nonexistent parent directories
350+
Files.createDirectories(parentDir);
351+
log.trace("SolaceSparkConnector - Created parent directory {} for file path {}", parentDir.toString(), path.toString());
331352
}
353+
// overwrite checkpoint to preserve latest value
354+
try (BufferedWriter writer = Files.newBufferedWriter(path, StandardOpenOption.CREATE,
355+
StandardOpenOption.TRUNCATE_EXISTING)) {
356+
// for (String id : ids) {
357+
SolaceSparkPartitionCheckpoint solaceSparkPartitionCheckpoint = new SolaceSparkPartitionCheckpoint(processedMessageIDs, this.solaceInputPartition.getId());
358+
CopyOnWriteArrayList<SolaceSparkPartitionCheckpoint> solaceSparkPartitionCheckpoints = new CopyOnWriteArrayList<>();
359+
solaceSparkPartitionCheckpoints.add(solaceSparkPartitionCheckpoint);
360+
// Publish state to checkpoint. On commit the state is published to Solace LVQ.
361+
writer.write(new Gson().toJson(solaceSparkPartitionCheckpoints));
362+
writer.newLine();
363+
log.trace("SolaceSparkConnector - Checkpoint {} stored in file path {}", new Gson().toJson(solaceSparkPartitionCheckpoints), path.toString());
364+
SolaceMessageTracker.removeProcessedMessagesIDs(this.solaceInputPartition.getId());
365+
// }
366+
}
367+
} catch (IOException e) {
368+
log.error("SolaceSparkConnector - Exception when writing checkpoint to path {}", this.checkpointLocation, e);
369+
this.solaceBroker.close();
370+
throw new RuntimeException(e);
332371
}
333-
} catch (IOException e) {
334-
log.error("SolaceSparkConnector - Exception when writing checkpoint to path {}", this.checkpointLocation, e);
335-
this.solaceBroker.close();
336-
throw new RuntimeException(e);
337372
}
338373

374+
339375
log.info("SolaceSparkConnector - Total time taken by executor is {} ms for Task {}", context.taskMetrics().executorRunTime(), uniqueId);
340376

341377
if (closeReceiversOnPartitionClose) {

0 commit comments

Comments
 (0)