Skip to content

Commit b1b0261

Browse files
DATAGO-120936: Hotfix - Disable partition key handling for checkpointing (#46)
1 parent 6453f56 commit b1b0261

File tree

4 files changed

+21
-15
lines changed

4 files changed

+21
-15
lines changed

src/docs/asciidoc/User-Guide.adoc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ NOTE: Above sample code used parquet as example data source. You can configure y
4545

4646
NOTE: In case of databricks deployment, it is recommended to store and retrieve sensitive credentials from Databricks secrets. Please refer to <<Using Databricks Secret Management>> on how to configure secrets and use them in notebook.
4747

48+
NOTE: Solace Partitioned Queue's is not supported.
49+
4850
=== Databricks Considerations
4951

5052
In case if you are using Shared compute cluster, make sure your cluster has https://docs.databricks.com/en/data-governance/unity-catalog/manage-privileges/allowlist.html[appropriate permissions] to install connector from maven central and access the jars. Please contact your Databricks administrator for required permissions.
@@ -73,14 +75,15 @@ NOTE: In case of recovery, connector uses offset state from LVQ to identify last
7375

7476
In some cases, there might be checkpoint failures as spark may fail to write to checkpoint during instance crash or unavailability or other reasons. Though the connector will handle duplicates in most cases, we recommend to keep your downstream systems idempotent.
7577

76-
==== Pre-requisites for LVQ creation
77-
The following pre-requisites are applicable for LVQ that are provisioned by Solace Administrator.
78+
==== Prerequisites for LVQ creation
79+
The following prerequisites are applicable for LVQ that are provisioned by Solace Administrator.
80+
7881
1. The Queue should be of type Exclusive
7982
2. Spool Quota should be set to 0
8083
3. Owner of the Queue should be the client username used by the micro integration
8184
4. Non-Owner access should be set to No Access to prevent unauthorized access
8285
5. Add a topic subscription
83-
6. Ensure the ACL applied to the user has publish access to the topic subscribed by LVQ
86+
6. Ensure the ACL applied to the user has publish and subscribe access to the topic subscribed by LVQ
8487

8588
The following pre-requisites are applicable if the micro integration need to create LVQ if it doesn't exist.
8689

src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public Offset deserializeOffset(String json) {
217217
if(solaceSourceOffset.getCheckpoints() != null && solaceSourceOffset.getCheckpoints().isEmpty()) {
218218
log.info("SolaceSparkConnector - No offset is available in spark checkpoint location. New checkpoint state will be created");
219219
} else {
220-
log.info("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset));
220+
log.trace("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset));
221221
}
222222
lastKnownOffsetId = solaceSourceOffset.getOffset();
223223
currentCheckpoint = solaceSourceOffset.getCheckpoints();
@@ -289,7 +289,8 @@ public void commit(Offset end) {
289289

290290
if(!offsetToCommit.isEmpty()) {
291291
currentCheckpoint = offsetToCommit;
292-
log.trace("SolaceSparkConnector - Final checkpoint publishing to LVQ {}", new Gson().toJson(offsetToCommit));
292+
log.info("SolaceSparkConnector - Final checkpoint published to LVQ on topic {}", properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_TOPIC, SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_DEFAULT_TOPIC));
293+
log.trace("SolaceSparkConnector - Final checkpoint publishing to LVQ {} on topic {}", new Gson().toJson(offsetToCommit), properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_TOPIC, SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_DEFAULT_TOPIC));
293294
this.solaceBroker.publishMessage(properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_TOPIC, SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_DEFAULT_TOPIC), new Gson().toJson(offsetToCommit));
294295
checkException();
295296
offsetToCommit.clear();

src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class SolaceInputPartitionReader implements PartitionReader<InternalRow>,
5959
private int messages = 0;
6060
private Iterator<SolaceMessage> iterator;
6161
private boolean shouldTrackMessage = true;
62-
62+
private boolean isPartitionQueue = false;
6363
public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean includeHeaders, Map<String, String> properties,
6464
TaskContext taskContext, CopyOnWriteArrayList<SolaceSparkPartitionCheckpoint> checkpoints, String checkpointLocation) {
6565

@@ -96,7 +96,7 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
9696
log.info("SolaceSparkConnector - Acknowledging any processed messages to Solace as commit is successful");
9797
long startTime = System.currentTimeMillis();
9898
SolaceMessageTracker.ackMessages(uniqueId);
99-
log.trace("SolaceSparkConnector - Total time taken to acknowledge messages {} ms", (System.currentTimeMillis() - startTime));
99+
log.info("SolaceSparkConnector - Total time taken to acknowledge messages {} ms", (System.currentTimeMillis() - startTime));
100100
} else {
101101
log.info("SolaceSparkConnector - Spark Batch with id {} is requesting data again. It may be because of multiple operations on same dataframe.", currentBatchId);
102102
isCommitTriggered = false;
@@ -185,7 +185,7 @@ public InternalRow get() {
185185
}
186186
// No need to add message to tracker as the call is from same dataframe operation.
187187
if (shouldTrackMessage) {
188-
if (solaceRecord.getPartitionKey() != null && !solaceRecord.getPartitionKey().isEmpty()) {
188+
if (solaceRecord.getPartitionKey() != null && !solaceRecord.getPartitionKey().isEmpty() && isPartitionQueue) {
189189
SolaceMessageTracker.addMessageID(solaceRecord.getPartitionKey(), solaceRecord.getMessageId());
190190
} else {
191191
SolaceMessageTracker.addMessageID(this.uniqueId, solaceRecord.getMessageId());
@@ -339,15 +339,15 @@ private void registerTaskListener() {
339339
logShutdownMessage(context);
340340
} else if (context.isCompleted()) {
341341
String processedMessageIDs = SolaceMessageTracker.getProcessedMessagesIDs(this.solaceInputPartition.getId());
342+
Path path = Paths.get(this.checkpointLocation + "/" + this.solaceInputPartition.getId() + ".txt");
343+
log.info("SolaceSparkConnector - File path {} to store checkpoint processed in worker node {}", path.toString(), this.solaceInputPartition.getPreferredLocation());
342344
if(processedMessageIDs != null && !processedMessageIDs.isEmpty()) {
343345
try {
344-
Path path = Paths.get(this.checkpointLocation + "/" + this.solaceInputPartition.getId() + ".txt");
345-
log.trace("SolaceSparkConnector - File path {} to store checkpoint processed in worker node {}", path.toString(), this.solaceInputPartition.getPreferredLocation());
346346
Path parentDir = path.getParent();
347347
if (parentDir != null) {
348348
// Create the directory and all nonexistent parent directories
349349
Files.createDirectories(parentDir);
350-
log.trace("SolaceSparkConnector - Created parent directory {} for file path {}", parentDir.toString(), path.toString());
350+
log.info("SolaceSparkConnector - Created parent directory {} for file path {}", parentDir.toString(), path.toString());
351351
}
352352
// overwrite checkpoint to preserve latest value
353353
try (BufferedWriter writer = Files.newBufferedWriter(path, StandardOpenOption.CREATE,
@@ -359,7 +359,7 @@ private void registerTaskListener() {
359359
// Publish state to checkpoint. On commit the state is published to Solace LVQ.
360360
writer.write(new Gson().toJson(solaceSparkPartitionCheckpoints));
361361
writer.newLine();
362-
log.trace("SolaceSparkConnector - Checkpoint {} stored in file path {}", new Gson().toJson(solaceSparkPartitionCheckpoints), path.toString());
362+
log.info("SolaceSparkConnector - Checkpoint {} stored in file path {}", new Gson().toJson(solaceSparkPartitionCheckpoints), path.toString());
363363
SolaceMessageTracker.removeProcessedMessagesIDs(this.solaceInputPartition.getId());
364364
// }
365365
}
@@ -368,9 +368,10 @@ private void registerTaskListener() {
368368
this.solaceBroker.close();
369369
throw new RuntimeException(e);
370370
}
371+
} else {
372+
log.info("SolaceSparkConnector - No processed message id's available for input partition {} and nothing is written to checkpoint {}", this.solaceInputPartition.getId(), this.checkpointLocation);
371373
}
372374

373-
374375
log.info("SolaceSparkConnector - Total time taken by executor is {} ms for Task {}", context.taskMetrics().executorRunTime(), uniqueId);
375376

376377
if (closeReceiversOnPartitionClose) {

src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ private void setReceiver(EventListener eventListener) {
206206
}
207207

208208
public void createLVQIfNotExist() {
209+
log.info("SolaceSparkConnector - Configured LVQ name {} and topic {}", this.lvqName, this.lvqTopic);
209210
lvq = JCSMPFactory.onlyInstance().createQueue(this.lvqName);
210-
211211
EndpointProperties endpoint_props = new EndpointProperties();
212212
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
213213
endpoint_props.setQuota(0);
@@ -226,7 +226,7 @@ public void createLVQIfNotExist() {
226226
if(e instanceof JCSMPErrorResponseException) {
227227
JCSMPErrorResponseException jce = (JCSMPErrorResponseException) e;
228228
if(jce.getResponsePhrase().contains("Subscription Already Exists")) {
229-
log.warn("SolaceSparkConnector - Subscription Already Exists on LVQ {}", this.lvqName);
229+
log.warn("SolaceSparkConnector - Subscription {} Already Exists on LVQ {}", this.lvqTopic, this.lvqName);
230230
} else {
231231
close();
232232
this.isException = true;
@@ -377,6 +377,7 @@ public void publishMessage(String topic, Object msg) {
377377
Destination destination = JCSMPFactory.onlyInstance().createTopic(topic);
378378
try {
379379
this.producer.send(xmlMessage, destination);
380+
log.info("SolaceSparkConnector - Published checkpoint to LVQ topic {}", topic);
380381
} catch (JCSMPException e) {
381382
log.error("SolaceSparkConnector - Exception publishing lvq message to Solace", e);
382383
handleException("SolaceSparkConnector - Exception publishing lvq message to Solace ", e);

0 commit comments

Comments
 (0)