Skip to content

Commit a92a613

Browse files
Updated UserGuide
1 parent cec2fc0 commit a92a613

File tree

2 files changed

+13
-23
lines changed

2 files changed

+13
-23
lines changed

src/docs/asciidoc/User-Guide.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ This guide assumes you are familiar with Spark set up and Spark Structured Strea
3535

3636
=== Supported Platforms
3737

38-
The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(16.4 LTS (includes Apache Spark 3.5.2, Scala 2.12) with photon acceleration disabled). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments.
38+
The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments.
3939

4040
=== Quick Start common steps
4141

src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,18 @@
2525
import org.apache.spark.sql.execution.streaming.MicroBatchExecution;
2626
import org.apache.spark.sql.execution.streaming.StreamExecution;
2727
import org.apache.spark.unsafe.types.UTF8String;
28-
import org.apache.spark.util.TaskFailureListener;
2928

3029
import java.io.BufferedWriter;
31-
import java.io.File;
3230
import java.io.IOException;
3331
import java.io.Serializable;
34-
import java.net.URI;
35-
import java.net.URISyntaxException;
3632
import java.nio.charset.StandardCharsets;
3733
import java.nio.file.Files;
3834
import java.nio.file.Path;
3935
import java.nio.file.Paths;
4036
import java.nio.file.StandardOpenOption;
4137
import java.sql.Timestamp;
4238
import java.util.*;
43-
import java.util.concurrent.CopyOnWriteArrayList;
44-
import java.util.concurrent.LinkedBlockingQueue;
45-
import java.util.concurrent.TimeUnit;
46-
import java.util.stream.Collectors;
39+
import java.util.concurrent.*;
4740

4841
public class SolaceInputPartitionReader implements PartitionReader<InternalRow>, Serializable {
4942
private final transient Logger log = LogManager.getLogger(SolaceInputPartitionReader.class);
@@ -54,6 +47,7 @@ public class SolaceInputPartitionReader implements PartitionReader<InternalRow>,
5447
private SolaceBroker solaceBroker;
5548
private final int batchSize;
5649
private int messages = 0;
50+
private final long taskId;
5751
private final String uniqueId;
5852
private final String checkpointLocation;
5953
private final long receiveWaitTimeout;
@@ -68,8 +62,10 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
6862
this.solaceInputPartition = inputPartition;
6963
this.uniqueId = this.solaceInputPartition.getId();
7064

71-
// Currently solace can ack messages on consumer flow. So ack previous messages before starting to process new ones.
72-
// If Spark starts new input partition it indicates previous batch of data is successful. So we can acknowledge messages here.
65+
/* Currently solace can ack messages on consumer flow. So ack previous messages before starting to process new ones.
66+
* If Spark starts new input partition it indicates previous batch of data is successful. So we can acknowledge messages here.
67+
* Solace connection is always active and acknowledgements should be successful. It might throw exception if connection is lost
68+
* */
7369
log.info("SolaceSparkConnector - Acknowledging any processed messages to Solace as commit is successful");
7470
long startTime = System.currentTimeMillis();
7571
SolaceMessageTracker.ackMessages(uniqueId);
@@ -78,6 +74,7 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
7874
this.includeHeaders = includeHeaders;
7975
this.properties = properties;
8076
this.taskContext = taskContext;
77+
this.taskId = taskContext.taskAttemptId();
8178
this.checkpoints = checkpoints;
8279
this.checkpointLocation = checkpointLocation;
8380
this.batchSize = Integer.parseInt(properties.getOrDefault(SolaceSparkStreamingProperties.BATCH_SIZE, SolaceSparkStreamingProperties.BATCH_SIZE_DEFAULT));
@@ -91,15 +88,10 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
9188

9289
log.info("SolaceSparkConnector - Checking for connection {}", inputPartition.getId());
9390

91+
// Get existing connection if a new task is scheduled on executor or create a new one
9492
if (SolaceConnectionManager.getConnection(inputPartition.getId()) != null) {
95-
solaceBroker = SolaceConnectionManager.getConnection(inputPartition.getId());
96-
if (solaceBroker != null) {
97-
if (closeReceiversOnPartitionClose) {
98-
createReceiver(inputPartition.getId(), ackLastProcessedMessages);
99-
}
100-
} else {
101-
log.warn("SolaceSparkConnector - Existing Solace connection not available for partition {}. Creating new connection", inputPartition.getId());
102-
createNewConnection(inputPartition.getId(), ackLastProcessedMessages);
93+
if (closeReceiversOnPartitionClose) {
94+
createReceiver(inputPartition.getId(), ackLastProcessedMessages);
10395
}
10496
} else {
10597
createNewConnection(inputPartition.getId(), ackLastProcessedMessages);
@@ -262,7 +254,7 @@ private void registerTaskListener() {
262254
Files.createDirectories(parentDir);
263255
log.trace("SolaceSparkConnector - Created parent directory {} for file path {}", parentDir.toString(), path.toString());
264256
}
265-
257+
// overwrite checkpoint to preserve latest value
266258
try(BufferedWriter writer = Files.newBufferedWriter(path, StandardOpenOption.CREATE,
267259
StandardOpenOption.TRUNCATE_EXISTING)) {
268260
for (String id : ids) {
@@ -271,7 +263,7 @@ private void registerTaskListener() {
271263
SolaceSparkPartitionCheckpoint solaceSparkPartitionCheckpoint = new SolaceSparkPartitionCheckpoint(processedMessageIDs, id);
272264
CopyOnWriteArrayList<SolaceSparkPartitionCheckpoint> solaceSparkPartitionCheckpoints = new CopyOnWriteArrayList<>();
273265
solaceSparkPartitionCheckpoints.add(solaceSparkPartitionCheckpoint);
274-
// publish state to checkpoint. On commit the state is published to Solace LVQ.
266+
// Publish state to checkpoint. On commit the state is published to Solace LVQ.
275267
writer.write(new Gson().toJson(solaceSparkPartitionCheckpoints));
276268
writer.newLine();
277269
log.trace("SolaceSparkConnector - Checkpoint {} stored in file path {}", new Gson().toJson(solaceSparkPartitionCheckpoints), path.toString());
@@ -285,8 +277,6 @@ private void registerTaskListener() {
285277
throw new RuntimeException(e);
286278
}
287279

288-
289-
// ack messages
290280
log.info("SolaceSparkConnector - Total time taken by executor is {} ms for Task {}", context.taskMetrics().executorRunTime(),uniqueId);
291281

292282
if(closeReceiversOnPartitionClose) {

0 commit comments

Comments
 (0)