Skip to content

Commit 489b56a

Browse files
added comments
1 parent 7c9e38d commit 489b56a

File tree

2 files changed

+3
-0
lines changed

2 files changed

+3
-0
lines changed

src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,8 @@ public XMLMessage createMessage(String applicationMessageId, String partitionKey
420420
xmlMessage.setCorrelationId(applicationMessageId);
421421
xmlMessage.setApplicationMessageId(applicationMessageId);
422422
if (timestamp > 0L) {
423+
// Spark TimestampType by default return's in microseconds(https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/TimestampType.html) whether it is millis or seconds. So division by 1000 is required
424+
// as microseconds format will be too long to parse
423425
long senderTimestamp = timestamp/1000;
424426
xmlMessage.setSenderTimestamp(senderTimestamp);
425427
}

src/main/java/com/solacecoe/connectors/spark/streaming/write/SolaceDataWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ private void publishMessages(UnsafeRow projectedRow) {
8383
}
8484
long timestamp = 0L;
8585
if(projectedRow.get(4, DataTypes.TimestampType) != null) {
86+
// TimestampType always returns long. So safe to use getLong
8687
timestamp = projectedRow.getLong(4);
8788
}
8889
UnsafeMapData headersMap = new UnsafeMapData();

0 commit comments

Comments
 (0)