Skip to content

Commit c7cbb0b

Browse files
DATAGO-118237: Set non-owner permission to default when creating LVQ by default & other enhancements (#45)
1 parent 0ed13bd commit c7cbb0b

File tree

10 files changed

+404
-105
lines changed

10 files changed

+404
-105
lines changed

src/docs/asciidoc/User-Guide.adoc

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ This guide assumes you are familiar with Spark set up and Spark Structured Strea
3535

3636
=== Supported Platforms
3737

38-
The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments.
38+
The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled and 16.4 LTS(includes Apache Spark 3.5.2, Scala 2.12)). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments.
3939

4040
=== Quick Start common steps
4141

@@ -67,12 +67,27 @@ Solace Spark connector relies on Spark Checkpointing mechanism to resume from la
6767

6868
=== Checkpoint Handling
6969

70-
Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This give us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to store checkpoint along with Spark Checkpoint.
70+
Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This gives us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to store checkpoint along with Spark Checkpoint.
7171

7272
NOTE: In case of recovery, connector uses offset state from LVQ to identify last successfully processed messages. Hence, it is recommended not to delete or modify offset state in LVQ.
7373

7474
In some cases, there might be checkpoint failures as spark may fail to write to checkpoint during instance crash or unavailability or other reasons. Though the connector will handle duplicates in most cases, we recommend to keep your downstream systems idempotent.
7575

76+
==== Pre-requisites for LVQ creation
77+
The following pre-requisites are applicable for LVQ that are provisioned by Solace Administrator.
78+
1. The Queue should be of type Exclusive
79+
2. Spool Quota should be set to 0
80+
3. Owner of the Queue should be the client username used by the micro integration
81+
4. Non-Owner access should be set to No Access to prevent unauthorized access
82+
5. Add a topic subscription
83+
6. Ensure the ACL applied to the user has publish access to the topic subscribed by LVQ
84+
85+
The following pre-requisites are applicable if the micro integration need to create LVQ if it doesn't exist.
86+
87+
1. Ensure the Client Profile applied to the user has "Allow Client to Create Endpoints" enabled.
88+
2. Ensure the ACL applied to the user has publish and subscribe access to the topic subscribed by LVQ. The Subscribe access is required to programmatically apply the topic subscription to LVQ.
89+
3. The Micro Integration will create a LVQ with pre-requisites mentioned in above section.
90+
7691
=== User Authentication
7792

7893
Solace Spark Connector supports Basic, Client Certificate and OAuth authentication to Solace. Client Credentials flow is supported when connecting using OAuth.

src/docs/sections/general/configuration/solace-spark-source-config.adoc

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -204,20 +204,6 @@ a| Set this value to true if connector needs to identify and acknowledge process
204204

205205
NOTE: This property will be void if replay strategy is enabled.
206206

207-
| offsetIndicator
208-
| string
209-
| any
210-
| MESSAGE_ID, CORRELATION_ID, APPLICATION_MESSAGE_ID, <CUSTOM_USER_PROPERTY>
211-
a| Set this value if your Solace Message has unique ID in message header. Supported Values are
212-
213-
* MESSAGE_ID
214-
* CORRELATION_ID
215-
* APPLICATION_MESSAGE_ID
216-
* <CUSTOM_USER_PROPERTY> - refers to one of headers in user properties
217-
Header.
218-
219-
Note: Default value uses replication group message ID property as offset indicator. ReplicationGroupMessageId is a unique message id across a broker cluster.
220-
221207
| includeHeaders
222208
| boolean
223209
| true or false

src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ private Optional<String> getExecutorLocation(List<String> executorLocations, int
192192
@Override
193193
public PartitionReaderFactory createReaderFactory() {
194194
log.info("SolaceSparkConnector - Create reader factory with includeHeaders :: {}", this.includeHeaders);
195+
if(currentCheckpoint != null && currentCheckpoint.isEmpty()) {
196+
currentCheckpoint = this.getCheckpoint();
197+
}
195198
return new SolaceDataSourceReaderFactory(this.includeHeaders, this.properties, currentCheckpoint, this.checkpointLocation);
196199
}
197200

@@ -201,20 +204,24 @@ public Offset initialOffset() {
201204
if(existingCheckpoints != null && !existingCheckpoints.isEmpty()) {
202205
currentCheckpoint = existingCheckpoints;
203206
existingCheckpoints.forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));
204-
207+
log.info("SolaceSparkConnector - Checkpoint available from LVQ {}", new Gson().toJson(existingCheckpoints));
205208
return new SolaceSourceOffset(lastKnownOffsetId, existingCheckpoints);
206209
}
207-
210+
log.info("SolaceSparkConnector - Initial Offset from LVQ is not available, the micro integration will use the available offset in checkpoint else a new checkpoint state will be created");
208211
return new SolaceSourceOffset(lastKnownOffsetId, new CopyOnWriteArrayList<>());
209212
}
210213

211214
@Override
212215
public Offset deserializeOffset(String json) {
213216
SolaceSourceOffset solaceSourceOffset = getDeserializedOffset(json);
214-
if(solaceSourceOffset != null) {
215-
lastKnownOffsetId = solaceSourceOffset.getOffset();
216-
solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));
217+
if(solaceSourceOffset.getCheckpoints() != null && solaceSourceOffset.getCheckpoints().isEmpty()) {
218+
log.info("SolaceSparkConnector - No offset is available in spark checkpoint location. New checkpoint state will be created");
219+
} else {
220+
log.info("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset));
217221
}
222+
lastKnownOffsetId = solaceSourceOffset.getOffset();
223+
currentCheckpoint = solaceSourceOffset.getCheckpoints();
224+
solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));
218225

219226
return solaceSourceOffset;
220227
}
@@ -229,6 +236,8 @@ private SolaceSourceOffset getDeserializedOffset(String json) {
229236
} else {
230237
return migrate(solaceSourceOffset.getOffset(), "");
231238
}
239+
} else {
240+
return solaceSourceOffset;
232241
}
233242
} catch (Exception e) {
234243
log.warn("SolaceSparkConnector - Exception when deserializing offset. May be due incompatible formats. Connector will try to migrate to latest offset format.");
@@ -241,11 +250,9 @@ private SolaceSourceOffset getDeserializedOffset(String json) {
241250
}
242251
} catch (Exception e2) {
243252
log.error("SolaceSparkConnector - Exception when migrating offset to latest format.");
244-
throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format.", e);
253+
throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format. Please delete the checkpoint and restart the micro integration", e);
245254
}
246255
}
247-
248-
return null;
249256
}
250257

251258
private SolaceSourceOffset migrate(int offset, String messageIds) {

src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import java.util.concurrent.CopyOnWriteArrayList;
1212

1313
public final class SolaceMessageTracker implements Serializable {
14-
private static String lastBatchId = "";
14+
private static ConcurrentHashMap<String, String> lastBatchId = new ConcurrentHashMap<>();
1515
private static final Logger logger = LogManager.getLogger(SolaceMessageTracker.class);
1616
private static ConcurrentHashMap<String, CopyOnWriteArrayList<SolaceMessage>> messages = new ConcurrentHashMap<>();
1717
private static ConcurrentHashMap<String, String> lastProcessedMessageId = new ConcurrentHashMap<>();
@@ -35,7 +35,7 @@ public static void addMessage(String uniqueId, SolaceMessage message) {
3535
if(messages.containsKey(uniqueId)) {
3636
messageList = messages.get(uniqueId);
3737
}
38-
messageList.add(message);
38+
messageList.addIfAbsent(message);
3939
messages.put(uniqueId, messageList);
4040
}
4141

@@ -77,11 +77,11 @@ public static void resetId(String uniqueId) {
7777
logger.info("SolaceSparkConnector - Cleared all messages from Offset Manager for {}", uniqueId);
7878
}
7979

80-
public static String getLastBatchId() {
81-
return lastBatchId;
80+
public static String getLastBatchId(String uniqueId) {
81+
return lastBatchId.get(uniqueId);
8282
}
8383

84-
public static void setLastBatchId(String lastBatchId) {
85-
SolaceMessageTracker.lastBatchId = lastBatchId;
84+
public static void setLastBatchId(String uniqueId, String batchId) {
85+
lastBatchId.put(uniqueId, batchId);
8686
}
8787
}

0 commit comments

Comments
 (0)