Skip to content

Commit 8bf40f2

Browse files
Fix timestamp when publishing to Solace
Spark returns timestamp in micro seconds, which will cause timestamp parsing issues. Hence fixing it when publishing rather at consumption.
1 parent b685944 commit 8bf40f2

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public InternalRow get() {
167167
SolaceRecord solaceRecord = SolaceRecord.getMapper(this.properties.getOrDefault(SolaceSparkStreamingProperties.OFFSET_INDICATOR, SolaceSparkStreamingProperties.OFFSET_INDICATOR_DEFAULT)).map(solaceMessage.bytesXMLMessage);
168168
long timestamp = solaceRecord.getSenderTimestamp();
169169
if (solaceRecord.getSenderTimestamp() == 0) {
170-
timestamp = System.currentTimeMillis() / 1000;
170+
timestamp = System.currentTimeMillis();
171171
}
172172
InternalRow row;
173173
if (this.includeHeaders) {

src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ public XMLMessage createMessage(String applicationMessageId, String partitionKey
420420
xmlMessage.setCorrelationId(applicationMessageId);
421421
xmlMessage.setApplicationMessageId(applicationMessageId);
422422
if (timestamp > 0L) {
423-
xmlMessage.setSenderTimestamp(timestamp);
423+
xmlMessage.setSenderTimestamp(timestamp / 1000);
424424
}
425425
xmlMessage.setDeliveryMode(DeliveryMode.PERSISTENT);
426426

0 commit comments

Comments
 (0)