Skip to content

Commit cec2fc0

Browse files
added listeners to exit connector on user termination
1 parent ec94aa9 commit cec2fc0

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.spark.sql.execution.streaming.MicroBatchExecution;
2626
import org.apache.spark.sql.execution.streaming.StreamExecution;
2727
import org.apache.spark.unsafe.types.UTF8String;
28+
import org.apache.spark.util.TaskFailureListener;
2829

2930
import java.io.BufferedWriter;
3031
import java.io.File;
@@ -119,6 +120,13 @@ public boolean next() {
119120
this.solaceBroker.close();
120121
throw new SolaceSessionException(this.solaceBroker.getException());
121122
}
123+
124+
if (TaskContext.get() != null && TaskContext.get().isInterrupted()) {
125+
log.info("SolaceSparkConnector - Interrupted while waiting for next message");
126+
SolaceConnectionManager.close(this.solaceInputPartition.getId());
127+
SolaceMessageTracker.resetId(uniqueId);
128+
throw new RuntimeException("Task was interrupted.");
129+
}
122130
solaceMessage = getNextMessage();
123131
return solaceMessage != null;
124132
}
@@ -225,16 +233,24 @@ public void close() {
225233
}
226234
}
227235

236+
private void logShutdownMessage(TaskContext context) {
237+
log.info("SolaceSparkConnector - Closing connections to Solace as task {} is interrupted or failed", String.join(",", context.getLocalProperty(StreamExecution.QUERY_ID_KEY()),
238+
context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY()),
239+
Integer.toString(context.stageId()),
240+
Integer.toString(context.partitionId())));
241+
SolaceConnectionManager.close(this.solaceInputPartition.getId());
242+
SolaceMessageTracker.resetId(uniqueId);
243+
}
244+
228245
private void registerTaskListener() {
246+
this.taskContext.addTaskFailureListener((context, error) -> {
247+
log.error("SolaceSparkConnector - Input Partition {} failed with error", this.solaceInputPartition.getId(), error);
248+
logShutdownMessage(context);
249+
});
229250
this.taskContext.addTaskCompletionListener(context -> {
230251
log.info("SolaceSparkConnector - Task {} state is completed :: {}, failed :: {}, interrupted :: {}", uniqueId, context.isCompleted(), context.isFailed(), context.isInterrupted());
231252
if(context.isInterrupted() || context.isFailed()) {
232-
log.info("SolaceSparkConnector - Closing connections to Solace as task {} is interrupted or failed", String.join(",", context.getLocalProperty(StreamExecution.QUERY_ID_KEY()),
233-
context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY()),
234-
Integer.toString(context.stageId()),
235-
Integer.toString(context.partitionId())));
236-
SolaceConnectionManager.close(this.solaceInputPartition.getId());
237-
SolaceMessageTracker.resetId(uniqueId);
253+
logShutdownMessage(context);
238254
} else if (context.isCompleted()) {
239255
List<String> ids = SolaceMessageTracker.getIds();
240256
try {

src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,12 @@ public void onReceive(BytesXMLMessage msg) {
7676
SolaceSparkPartitionCheckpoint solaceSparkPartitionCheckpoint = this.checkpoints.stream().filter(checkpoint -> checkpoint.getPartitionId().equals(partitionKey)).findFirst().orElse(null);
7777
if(solaceSparkPartitionCheckpoint != null) {
7878
lastKnownMessageIDs = Arrays.stream(solaceSparkPartitionCheckpoint.getMessageIDs().split(",")).collect(Collectors.toList());
79-
compareMessageIds(lastKnownMessageIDs, messageID, msg);
79+
if(lastKnownMessageIDs.isEmpty()) {
80+
log.warn("SolaceSparkConnector - No checkpoint found. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system.");
81+
this.messages.add(new SolaceMessage(msg));
82+
} else {
83+
compareMessageIds(lastKnownMessageIDs, messageID, msg);
84+
}
8085
} else {
8186
log.warn("SolaceSparkConnector - No checkpoint found for partition key {}. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system.", partitionKey);
8287
this.messages.add(new SolaceMessage(msg));

0 commit comments

Comments
 (0)