Skip to content

Commit 4434ce5

Browse files
Check for exception before starting input partition
1 parent 597beca commit 4434ce5

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public Offset latestOffset() {
106106

107107
@Override
108108
public InputPartition[] planInputPartitions(Offset start, Offset end) {
109+
checkException();
109110
if(partitions == 0) {
110111
partitions = getTotalExecutors();
111112
}
@@ -275,6 +276,7 @@ public void commit(Offset end) {
275276
if(!lastKnownOffset.isEmpty()) {
276277
log.trace("SolaceSparkConnector - Final checkpoint publishing to LVQ {}", new Gson().toJson(this.lastKnownOffset));
277278
this.solaceBroker.publishMessage(properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_TOPIC, SolaceSparkStreamingProperties.SOLACE_SPARK_CONNECTOR_LVQ_DEFAULT_TOPIC), new Gson().toJson(this.lastKnownOffset));
279+
checkException();
278280
lastKnownOffset.clear();
279281
}
280282
}
@@ -289,6 +291,12 @@ private CopyOnWriteArrayList<SolaceSparkPartitionCheckpoint> getCheckpoint() {
289291
return this.solaceBroker.getOffsetFromLvq();
290292
}
291293

294+
private void checkException() {
295+
if(this.solaceBroker.isException()) {
296+
throw new RuntimeException(this.solaceBroker.getException());
297+
}
298+
}
299+
292300
private String convertCheckpointURIToStringPath(String checkpointLocation) {
293301
if (checkpointLocation.startsWith("dbfs:/")) {
294302
// Strip "dbfs:/" and prepend "/dbfs/"

src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public static void addMessage(String uniqueId, SolaceMessage message) {
4040

4141
public static void ackMessages(String uniqueId) {
4242
if(messages.containsKey(uniqueId)) {
43-
logger.trace("SolaceSparkConnector - Acknowledging {} messages ", messages.get(uniqueId).size());
4443
messages.get(uniqueId).forEach(message -> message.bytesXMLMessage.ackMessage());
44+
logger.trace("SolaceSparkConnector - Acknowledged {} messages ", messages.get(uniqueId).size());
4545
messages.remove(uniqueId);
4646
}
4747
}

0 commit comments

Comments
 (0)