Skip to content
Merged
19 changes: 17 additions & 2 deletions src/docs/asciidoc/User-Guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ This guide assumes you are familiar with Spark set up and Spark Structured Strea

=== Supported Platforms

The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments.
The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled and 16.4 LTS(includes Apache Spark 3.5.2, Scala 2.12)). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments.

=== Quick Start common steps

Expand Down Expand Up @@ -67,12 +67,27 @@ Solace Spark connector relies on Spark Checkpointing mechanism to resume from la

=== Checkpoint Handling

Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This give us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to store checkpoint along with Spark Checkpoint.
Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This gives us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to store checkpoint along with Spark Checkpoint.

NOTE: In case of recovery, connector uses offset state from LVQ to identify last successfully processed messages. Hence, it is recommended not to delete or modify offset state in LVQ.

In some cases, there might be checkpoint failures as spark may fail to write to checkpoint during instance crash or unavailability or other reasons. Though the connector will handle duplicates in most cases, we recommend to keep your downstream systems idempotent.

==== Pre-requisites for LVQ creation
The following pre-requisites are applicable for LVQ that are provisioned by Solace Administrator.
1. The Queue should be of type Exclusive
2. Spool Quota should be set to 0
3. Owner of the Queue should be the client username used by the micro integration
4. Non-Owner access should be set to No Access to prevent unauthorized access
5. Add a topic subscription
6. Ensure the ACL applied to the user has publish access to the topic subscribed by LVQ

The following pre-requisites are applicable if the micro integration need to create LVQ if it doesn't exist.

1. Ensure the Client Profile applied to the user has "Allow Client to Create Endpoints" enabled.
2. Ensure the ACL applied to the user has publish and subscribe access to the topic subscribed by LVQ. The Subscribe access is required to programmatically apply the topic subscription to LVQ.
3. The Micro Integration will create a LVQ with pre-requisites mentioned in above section.

=== User Authentication

Solace Spark Connector supports Basic, Client Certificate and OAuth authentication to Solace. Client Credentials flow is supported when connecting using OAuth.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,20 +204,6 @@ a| Set this value to true if connector needs to identify and acknowledge process

NOTE: This property will be void if replay strategy is enabled.

| offsetIndicator
| string
| any
| MESSAGE_ID, CORRELATION_ID, APPLICATION_MESSAGE_ID, <CUSTOM_USER_PROPERTY>
a| Set this value if your Solace Message has unique ID in message header. Supported Values are

* MESSAGE_ID
* CORRELATION_ID
* APPLICATION_MESSAGE_ID
* <CUSTOM_USER_PROPERTY> - refers to one of headers in user properties
Header.

Note: Default value uses replication group message ID property as offset indicator. ReplicationGroupMessageId is a unique message id across a broker cluster.

| includeHeaders
| boolean
| true or false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ private Optional<String> getExecutorLocation(List<String> executorLocations, int
@Override
public PartitionReaderFactory createReaderFactory() {
log.info("SolaceSparkConnector - Create reader factory with includeHeaders :: {}", this.includeHeaders);
if(currentCheckpoint != null && currentCheckpoint.isEmpty()) {
currentCheckpoint = this.getCheckpoint();
}
return new SolaceDataSourceReaderFactory(this.includeHeaders, this.properties, currentCheckpoint, this.checkpointLocation);
}

Expand All @@ -201,20 +204,24 @@ public Offset initialOffset() {
if(existingCheckpoints != null && !existingCheckpoints.isEmpty()) {
currentCheckpoint = existingCheckpoints;
existingCheckpoints.forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));

log.info("SolaceSparkConnector - Checkpoint available from LVQ {}", new Gson().toJson(existingCheckpoints));
return new SolaceSourceOffset(lastKnownOffsetId, existingCheckpoints);
}

log.info("SolaceSparkConnector - Initial Offset from LVQ is not available, the micro integration will use the available offset in checkpoint else a new checkpoint state will be created");
return new SolaceSourceOffset(lastKnownOffsetId, new CopyOnWriteArrayList<>());
}

@Override
public Offset deserializeOffset(String json) {
SolaceSourceOffset solaceSourceOffset = getDeserializedOffset(json);
if(solaceSourceOffset != null) {
lastKnownOffsetId = solaceSourceOffset.getOffset();
solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));
if(solaceSourceOffset.getCheckpoints() != null && solaceSourceOffset.getCheckpoints().isEmpty()) {
log.info("SolaceSparkConnector - No offset is available in spark checkpoint location. New checkpoint state will be created");
} else {
log.info("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset));
}
lastKnownOffsetId = solaceSourceOffset.getOffset();
currentCheckpoint = solaceSourceOffset.getCheckpoints();
solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));

return solaceSourceOffset;
}
Expand All @@ -229,6 +236,8 @@ private SolaceSourceOffset getDeserializedOffset(String json) {
} else {
return migrate(solaceSourceOffset.getOffset(), "");
}
} else {
return solaceSourceOffset;
}
} catch (Exception e) {
log.warn("SolaceSparkConnector - Exception when deserializing offset. May be due incompatible formats. Connector will try to migrate to latest offset format.");
Expand All @@ -241,11 +250,9 @@ private SolaceSourceOffset getDeserializedOffset(String json) {
}
} catch (Exception e2) {
log.error("SolaceSparkConnector - Exception when migrating offset to latest format.");
throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format.", e);
throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format. Please delete the checkpoint and restart the micro integration", e);
}
}

return null;
}

private SolaceSourceOffset migrate(int offset, String messageIds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.concurrent.CopyOnWriteArrayList;

public final class SolaceMessageTracker implements Serializable {
private static String lastBatchId = "";
private static ConcurrentHashMap<String, String> lastBatchId = new ConcurrentHashMap<>();
private static final Logger logger = LogManager.getLogger(SolaceMessageTracker.class);
private static ConcurrentHashMap<String, CopyOnWriteArrayList<SolaceMessage>> messages = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, String> lastProcessedMessageId = new ConcurrentHashMap<>();
Expand All @@ -35,7 +35,7 @@ public static void addMessage(String uniqueId, SolaceMessage message) {
if(messages.containsKey(uniqueId)) {
messageList = messages.get(uniqueId);
}
messageList.add(message);
messageList.addIfAbsent(message);
messages.put(uniqueId, messageList);
}

Expand Down Expand Up @@ -77,11 +77,11 @@ public static void resetId(String uniqueId) {
logger.info("SolaceSparkConnector - Cleared all messages from Offset Manager for {}", uniqueId);
}

public static String getLastBatchId() {
return lastBatchId;
public static String getLastBatchId(String uniqueId) {
return lastBatchId.get(uniqueId);
}

public static void setLastBatchId(String lastBatchId) {
SolaceMessageTracker.lastBatchId = lastBatchId;
public static void setLastBatchId(String uniqueId, String batchId) {
lastBatchId.put(uniqueId, batchId);
}
}
Loading
Loading