Skip to content

Commit 0edc510

Browse files
Handled isEmpty operation on dataframe, logging enhancements and fixes to checkpoint handling
1 parent 0ed13bd commit 0edc510

File tree

5 files changed

+43
-26
lines changed

5 files changed

+43
-26
lines changed

src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,20 +201,23 @@ public Offset initialOffset() {
201201
if(existingCheckpoints != null && !existingCheckpoints.isEmpty()) {
202202
currentCheckpoint = existingCheckpoints;
203203
existingCheckpoints.forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));
204-
204+
log.info("SolaceSparkConnector - Checkpoint available from LVQ {}", new Gson().toJson(existingCheckpoints));
205205
return new SolaceSourceOffset(lastKnownOffsetId, existingCheckpoints);
206206
}
207-
207+
log.info("SolaceSparkConnector - Initial Offset from LVQ is not available, the micro integration will use the available offset in checkpoint else a new checkpoint state will be created");
208208
return new SolaceSourceOffset(lastKnownOffsetId, new CopyOnWriteArrayList<>());
209209
}
210210

211211
@Override
212212
public Offset deserializeOffset(String json) {
213213
SolaceSourceOffset solaceSourceOffset = getDeserializedOffset(json);
214-
if(solaceSourceOffset != null) {
215-
lastKnownOffsetId = solaceSourceOffset.getOffset();
216-
solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));
214+
if(solaceSourceOffset.getCheckpoints() != null && solaceSourceOffset.getCheckpoints().isEmpty()) {
215+
log.info("SolaceSparkConnector - No offset is available in spark checkpoint location. New checkpoint state will be created");
216+
} else {
217+
log.info("SolaceSparkConnector - Deserialized offset {}", new Gson().toJson(solaceSourceOffset));
217218
}
219+
lastKnownOffsetId = solaceSourceOffset.getOffset();
220+
solaceSourceOffset.getCheckpoints().forEach(checkpoint -> lastKnownMessageIds = String.join(",", lastKnownMessageIds, checkpoint.getMessageIDs()));
218221

219222
return solaceSourceOffset;
220223
}
@@ -229,6 +232,8 @@ private SolaceSourceOffset getDeserializedOffset(String json) {
229232
} else {
230233
return migrate(solaceSourceOffset.getOffset(), "");
231234
}
235+
} else {
236+
return solaceSourceOffset;
232237
}
233238
} catch (Exception e) {
234239
log.warn("SolaceSparkConnector - Exception when deserializing offset. May be due incompatible formats. Connector will try to migrate to latest offset format.");
@@ -244,8 +249,6 @@ private SolaceSourceOffset getDeserializedOffset(String json) {
244249
throw new RuntimeException("SolaceSparkConnector - Exception when migrating offset to latest format.", e);
245250
}
246251
}
247-
248-
return null;
249252
}
250253

251254
private SolaceSourceOffset migrate(int offset, String messageIds) {

src/main/java/com/solacecoe/connectors/spark/streaming/offset/SolaceMessageTracker.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import java.util.concurrent.CopyOnWriteArrayList;
1212

1313
public final class SolaceMessageTracker implements Serializable {
14-
private static String lastBatchId = "";
14+
private static ConcurrentHashMap<String, String> lastBatchId = new ConcurrentHashMap<>();
1515
private static final Logger logger = LogManager.getLogger(SolaceMessageTracker.class);
1616
private static ConcurrentHashMap<String, CopyOnWriteArrayList<SolaceMessage>> messages = new ConcurrentHashMap<>();
1717
private static ConcurrentHashMap<String, String> lastProcessedMessageId = new ConcurrentHashMap<>();
@@ -77,11 +77,11 @@ public static void resetId(String uniqueId) {
7777
logger.info("SolaceSparkConnector - Cleared all messages from Offset Manager for {}", uniqueId);
7878
}
7979

80-
public static String getLastBatchId() {
81-
return lastBatchId;
80+
public static String getLastBatchId(String uniqueId) {
81+
return lastBatchId.get(uniqueId);
8282
}
8383

84-
public static void setLastBatchId(String lastBatchId) {
85-
SolaceMessageTracker.lastBatchId = lastBatchId;
84+
public static void setLastBatchId(String uniqueId, String batchId) {
85+
lastBatchId.put(uniqueId, batchId);
8686
}
8787
}

src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
7272
* In case when multiple operations are performed on dataframe, input partition will be called as part of Spark scan.
7373
* We need to acknowledge messages only if new batch is started. In case of same batch we will return the same messages.
7474
*/
75-
if (!currentBatchId.equals(SolaceMessageTracker.getLastBatchId())) {
75+
if (!currentBatchId.equals(SolaceMessageTracker.getLastBatchId(this.uniqueId))) {
7676
/* Currently solace can ack messages on consumer flow. So ack previous messages before starting to process new ones.
7777
* If Spark starts new input partition it indicates previous batch of data is successful. So we can acknowledge messages here.
7878
* Solace connection is always active and acknowledgements should be successful. It might throw exception if connection is lost
@@ -90,7 +90,7 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
9090
}
9191
}
9292

93-
SolaceMessageTracker.setLastBatchId(currentBatchId);
93+
SolaceMessageTracker.setLastBatchId(this.uniqueId, currentBatchId);
9494

9595
this.includeHeaders = includeHeaders;
9696
this.properties = properties;
@@ -239,7 +239,7 @@ private SolaceMessage getNextMessage() {
239239
while (shouldProcessMoreMessages(batchSize, messages)) {
240240
try {
241241
if (iterator.hasNext()) {
242-
shouldTrackMessage = false;
242+
// shouldTrackMessage = false;
243243
solaceMessage = iterator.next();
244244
if (solaceMessage == null) {
245245
return null;

src/main/java/com/solacecoe/connectors/spark/streaming/solace/EventListener.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ public void onReceive(BytesXMLMessage msg) {
4848
if(!this.checkpoints.isEmpty()) {
4949
List<String> lastKnownMessageIDs = new ArrayList<>();
5050
String messageID = SolaceUtils.getMessageID(msg, this.offsetIndicator);
51-
boolean hasPartitionKey = false;
52-
if(msg.getProperties() != null && msg.getProperties().containsKey(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)) {
53-
hasPartitionKey = true;
54-
}
51+
boolean hasPartitionKey = msg.getProperties() != null && msg.getProperties().containsKey(XMLMessage.MessageUserPropertyConstants.QUEUE_PARTITION_KEY);
5552
if(!hasPartitionKey) {
5653
for(SolaceSparkPartitionCheckpoint checkpoint : this.checkpoints) {
5754
lastKnownMessageIDs.addAll(Arrays.stream(checkpoint.getMessageIDs().split(",")).collect(Collectors.toList()));
@@ -87,7 +84,7 @@ public void onReceive(BytesXMLMessage msg) {
8784
this.messages.add(new SolaceMessage(msg));
8885
}
8986
} else {
90-
log.warn("SolaceSparkConnector - Incoming message has no partition key but messages in checkpoint has partition id's. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system.");
87+
log.warn("SolaceSparkConnector - Incoming message partition key is either null or empty. Message will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system.");
9188
this.messages.add(new SolaceMessage(msg));
9289
}
9390
}
@@ -112,7 +109,7 @@ private void compareMessageIds(List<String> lastKnownMessageIDs, String messageI
112109
} else {
113110
this.messages.add(new SolaceMessage(msg));
114111
if (lastKnownMessageIDs.size() > 1) {
115-
log.warn("SolaceSparkConnector - Message ID {} not acknowledged as it may be a previously failed message received before newer checkpointed ones(parallel consumers). The connector will not check against checkpointed message IDs. It will be reprocessed to ensure reliability—any duplicates must be handled by the downstream system.", messageID);
112+
log.warn("SolaceSparkConnector - Message ID {} was not acknowledged because it may be an older, previously failed message received ahead of more recent checkpointed messages (due to parallel consumers). The connector does not validate message IDs against checkpoints, so this message will be reprocessed to ensure reliability. Any resulting duplicates must be handled by the downstream system.", messageID);
116113
break;
117114
}
118115
}

src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,18 +210,35 @@ public void createLVQIfNotExist() {
210210

211211
EndpointProperties endpoint_props = new EndpointProperties();
212212
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
213-
endpoint_props.setPermission(EndpointProperties.PERMISSION_CONSUME);
214213
endpoint_props.setQuota(0);
215214
try {
216215
this.session.provision(lvq, endpoint_props, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS | JCSMPSession.WAIT_FOR_CONFIRM);
217216
} catch (JCSMPException e) {
218217
log.error("SolaceSparkConnector - Exception creating LVQ. Shutting down consumer ", e);
219218
close();
219+
this.isException = true;
220+
this.exception = e;
221+
throw new RuntimeException(e);
220222
}
221223
try {
222224
this.session.addSubscription(lvq, JCSMPFactory.onlyInstance().createTopic(this.lvqTopic), JCSMPSession.WAIT_FOR_CONFIRM);
223225
} catch (JCSMPException e) {
224-
log.warn("SolaceSparkConnector - Subscription already exists on LVQ. Ignoring error");
226+
if(e instanceof JCSMPErrorResponseException) {
227+
JCSMPErrorResponseException jce = (JCSMPErrorResponseException) e;
228+
if(jce.getResponsePhrase().contains("Subscription Already Exists")) {
229+
log.warn("SolaceSparkConnector - Subscription Already Exists on LVQ {}", this.lvqName);
230+
} else {
231+
close();
232+
this.isException = true;
233+
this.exception = e;
234+
throw new RuntimeException(e);
235+
}
236+
} else {
237+
close();
238+
this.isException = true;
239+
this.exception = e;
240+
throw new RuntimeException(e);
241+
}
225242
}
226243
}
227244

@@ -233,10 +250,10 @@ public CopyOnWriteArrayList<SolaceSparkPartitionCheckpoint> browseLVQ() {
233250
try {
234251
Browser myBrowser = session.createBrowser(br_prop);
235252
BytesXMLMessage rx_msg = null;
236-
log.info("SolaceSparkConnector - Browsing message from LVQ {}", this.lvqName);
253+
log.info("SolaceSparkConnector - Browsing checkpoint from LVQ {}", this.lvqName);
237254
rx_msg = myBrowser.getNext();
238255
if (rx_msg != null) {
239-
log.info("SolaceSparkConnector - Browsed message from LVQ {}", this.lvqName);
256+
log.info("SolaceSparkConnector - Browsed checkpoint from LVQ {}", this.lvqName);
240257
byte[] msgData = new byte[0];
241258
if (rx_msg.getContentLength() != 0) {
242259
msgData = rx_msg.getBytes();
@@ -247,7 +264,7 @@ public CopyOnWriteArrayList<SolaceSparkPartitionCheckpoint> browseLVQ() {
247264
lastKnownCheckpoint = new Gson().fromJson(new String(msgData, StandardCharsets.UTF_8), new TypeToken<CopyOnWriteArrayList<SolaceSparkPartitionCheckpoint>>(){}.getType());
248265
log.info("SolaceSparkConnector - Checkpoint from LVQ {}", new String(msgData, StandardCharsets.UTF_8));
249266
} else {
250-
log.info("SolaceSparkConnector - No message available from LVQ {}", this.lvqName);
267+
log.info("SolaceSparkConnector - No checkpoint available in LVQ {}", this.lvqName);
251268
}
252269
myBrowser.close();
253270
return lastKnownCheckpoint;

0 commit comments

Comments
 (0)