Skip to content

Commit 176e535

Browse files
hotfix
1 parent 711af81 commit 176e535

File tree

4 files changed

+16
-13
lines changed

4 files changed

+16
-13
lines changed

src/docs/asciidoc/User-Guide.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ The following pre-requisites are applicable for LVQ that are provisioned by Sola
8080
3. Owner of the Queue should be the client username used by the micro integration
8181
4. Non-Owner access should be set to No Access to prevent unauthorized access
8282
5. Add a topic subscription
83-
6. Ensure the ACL applied to the user has publish access to the topic subscribed by LVQ
83+
6. Ensure the ACL applied to the user has publish/subscribe access to the topic subscribed by LVQ
8484

8585
The following pre-requisites are applicable if the micro integration need to create LVQ if it doesn't exist.
8686

src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public Offset deserializeOffset(String json) {
217217
if(solaceSourceOffset.getCheckpoints() != null && solaceSourceOffset.getCheckpoints().isEmpty()) {
218218
log.info("SolaceSparkConnector - No offset is available in spark checkpoint location. New checkpoint state will be created");
219219
} else {
220-
log.info("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset));
220+
log.trace("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset));
221221
}
222222
lastKnownOffsetId = solaceSourceOffset.getOffset();
223223
currentCheckpoint = solaceSourceOffset.getCheckpoints();
@@ -289,7 +289,8 @@ public void commit(Offset end) {
289289

290290
if(!offsetToCommit.isEmpty()) {
291291
currentCheckpoint = offsetToCommit;
292-
log.trace("SolaceSparkConnector - Final checkpoint publishing to LVQ {}", new Gson().toJson(offsetToCommit));
292+
log.info("SolaceSparkConnector - Final checkpoint published to LVQ on topic {}", properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_TOPIC, SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_DEFAULT_TOPIC));
293+
log.trace("SolaceSparkConnector - Final checkpoint publishing to LVQ {} on topic {}", new Gson().toJson(offsetToCommit), properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_TOPIC, SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_DEFAULT_TOPIC));
293294
this.solaceBroker.publishMessage(properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_TOPIC, SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_DEFAULT_TOPIC), new Gson().toJson(offsetToCommit));
294295
checkException();
295296
offsetToCommit.clear();

src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class SolaceInputPartitionReader implements PartitionReader<InternalRow>,
5959
private int messages = 0;
6060
private Iterator<SolaceMessage> iterator;
6161
private boolean shouldTrackMessage = true;
62-
62+
private boolean isPartitionQueue = false;
6363
public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean includeHeaders, Map<String, String> properties,
6464
TaskContext taskContext, CopyOnWriteArrayList<SolaceSparkPartitionCheckpoint> checkpoints, String checkpointLocation) {
6565

@@ -96,7 +96,7 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
9696
log.info("SolaceSparkConnector - Acknowledging any processed messages to Solace as commit is successful");
9797
long startTime = System.currentTimeMillis();
9898
SolaceMessageTracker.ackMessages(uniqueId);
99-
log.trace("SolaceSparkConnector - Total time taken to acknowledge messages {} ms", (System.currentTimeMillis() - startTime));
99+
log.info("SolaceSparkConnector - Total time taken to acknowledge messages {} ms", (System.currentTimeMillis() - startTime));
100100
} else {
101101
log.info("SolaceSparkConnector - Spark Batch with id {} is requesting data again. It may be because of multiple operations on same dataframe.", currentBatchId);
102102
isCommitTriggered = false;
@@ -185,7 +185,7 @@ public InternalRow get() {
185185
}
186186
// No need to add message to tracker as the call is from same dataframe operation.
187187
if (shouldTrackMessage) {
188-
if (solaceRecord.getPartitionKey() != null && !solaceRecord.getPartitionKey().isEmpty()) {
188+
if (solaceRecord.getPartitionKey() != null && !solaceRecord.getPartitionKey().isEmpty() && isPartitionQueue) {
189189
SolaceMessageTracker.addMessageID(solaceRecord.getPartitionKey(), solaceRecord.getMessageId());
190190
} else {
191191
SolaceMessageTracker.addMessageID(this.uniqueId, solaceRecord.getMessageId());
@@ -339,15 +339,15 @@ private void registerTaskListener() {
339339
logShutdownMessage(context);
340340
} else if (context.isCompleted()) {
341341
String processedMessageIDs = SolaceMessageTracker.getProcessedMessagesIDs(this.solaceInputPartition.getId());
342+
Path path = Paths.get(this.checkpointLocation + "/" + this.solaceInputPartition.getId() + ".txt");
343+
log.info("SolaceSparkConnector - File path {} to store checkpoint processed in worker node {}", path.toString(), this.solaceInputPartition.getPreferredLocation());
342344
if(processedMessageIDs != null && !processedMessageIDs.isEmpty()) {
343345
try {
344-
Path path = Paths.get(this.checkpointLocation + "/" + this.solaceInputPartition.getId() + ".txt");
345-
log.trace("SolaceSparkConnector - File path {} to store checkpoint processed in worker node {}", path.toString(), this.solaceInputPartition.getPreferredLocation());
346346
Path parentDir = path.getParent();
347347
if (parentDir != null) {
348348
// Create the directory and all nonexistent parent directories
349349
Files.createDirectories(parentDir);
350-
log.trace("SolaceSparkConnector - Created parent directory {} for file path {}", parentDir.toString(), path.toString());
350+
log.info("SolaceSparkConnector - Created parent directory {} for file path {}", parentDir.toString(), path.toString());
351351
}
352352
// overwrite checkpoint to preserve latest value
353353
try (BufferedWriter writer = Files.newBufferedWriter(path, StandardOpenOption.CREATE,
@@ -359,7 +359,7 @@ private void registerTaskListener() {
359359
// Publish state to checkpoint. On commit the state is published to Solace LVQ.
360360
writer.write(new Gson().toJson(solaceSparkPartitionCheckpoints));
361361
writer.newLine();
362-
log.trace("SolaceSparkConnector - Checkpoint {} stored in file path {}", new Gson().toJson(solaceSparkPartitionCheckpoints), path.toString());
362+
log.info("SolaceSparkConnector - Checkpoint {} stored in file path {}", new Gson().toJson(solaceSparkPartitionCheckpoints), path.toString());
363363
SolaceMessageTracker.removeProcessedMessagesIDs(this.solaceInputPartition.getId());
364364
// }
365365
}
@@ -368,9 +368,10 @@ private void registerTaskListener() {
368368
this.solaceBroker.close();
369369
throw new RuntimeException(e);
370370
}
371+
} else {
372+
log.info("SolaceSparkConnector - No processed message id's available for input partition {} and nothing is written to checkpoint {}", this.solaceInputPartition.getId(), this.checkpointLocation);
371373
}
372374

373-
374375
log.info("SolaceSparkConnector - Total time taken by executor is {} ms for Task {}", context.taskMetrics().executorRunTime(), uniqueId);
375376

376377
if (closeReceiversOnPartitionClose) {

src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,8 +206,8 @@ private void setReceiver(EventListener eventListener) {
206206
}
207207

208208
public void createLVQIfNotExist() {
209+
log.info("SolaceSparkConnector - Configured LVQ name {} and topic {}", this.lvqName, this.lvqTopic);
209210
lvq = JCSMPFactory.onlyInstance().createQueue(this.lvqName);
210-
211211
EndpointProperties endpoint_props = new EndpointProperties();
212212
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
213213
endpoint_props.setQuota(0);
@@ -226,7 +226,7 @@ public void createLVQIfNotExist() {
226226
if(e instanceof JCSMPErrorResponseException) {
227227
JCSMPErrorResponseException jce = (JCSMPErrorResponseException) e;
228228
if(jce.getResponsePhrase().contains("Subscription Already Exists")) {
229-
log.warn("SolaceSparkConnector - Subscription Already Exists on LVQ {}", this.lvqName);
229+
log.warn("SolaceSparkConnector - Subscription {} Already Exists on LVQ {}", this.lvqTopic, this.lvqName);
230230
} else {
231231
close();
232232
this.isException = true;
@@ -377,6 +377,7 @@ public void publishMessage(String topic, Object msg) {
377377
Destination destination = JCSMPFactory.onlyInstance().createTopic(topic);
378378
try {
379379
this.producer.send(xmlMessage, destination);
380+
log.info("SolaceSparkConnector - Published checkpoint to LVQ topic {}", topic);
380381
} catch (JCSMPException e) {
381382
log.error("SolaceSparkConnector - Exception publishing lvq message to Solace", e);
382383
handleException("SolaceSparkConnector - Exception publishing lvq message to Solace ", e);

0 commit comments

Comments
 (0)