Skip to content

Commit 5abfd9a

Browse files
DATAGO-92181: Solace message replay support (#23)
1 parent 263a4c3 commit 5abfd9a

File tree

7 files changed

+396
-77
lines changed

7 files changed

+396
-77
lines changed

pubsubplus-connector-spark_3.x/src/docs/asciidoc/User-Guide.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ If rotating access token is present in file accessible by connector use below op
8585

8686
NOTE: When access token is read from file, it may lose some of it's expiry time by the time it is accessed by connector. It is recommended to have minimal time difference between writing to file and access by the connector so that a valid new token is updated in solace session before expiry of old token.
8787

88+
=== Message Replay
89+
90+
Solace Spark Connector can replay messages using Solace Replay Log. Connector can replay all messages or after specific replication group message id or after specific timestamp. Please refer to https://docs.solace.com/Features/Replay/Msg-Replay-Concepts-Config.htm[Message Replay Configuration] to enable replay log in Solace PubSub+ broker.
91+
8892
=== Solace Spark Streaming Schema Structure
8993

9094
Solace Spark Connector transforms the incoming message to Spark row with below schema definition.

pubsubplus-connector-spark_3.x/src/docs/sections/general/configuration/spark-config.adoc

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
| int
5656
| 0 - 60000
5757
| 3000
58-
| How much time in (MS) to wait between each attempt to connect or reconnect to a host. If a connect or reconnect attempt to host is not successful, the API waits for the amount of time set for reconnectRetryWaitInMillis, and then makes another connect or reconnect attempt.
58+
| How much time in (MS) to wait between each attempt to connect or reconnect to a host. If connect or reconnect attempt to host is not successful, the API waits for the amount of time set for reconnectRetryWaitInMillis, and then makes another connect or reconnect attempt.
5959

6060
| solace.apiProperties.<Property>
6161
| any
@@ -154,17 +154,37 @@ solace.apiProperties.client_channel_properties.keepAliveIntervalInMillis=3000
154154
| 1
155155
| Set number of messages to be processed in batch. The connector can stream data in batches to Spark based on configured size.
156156

157+
| replayStrategy
158+
| string
159+
| BEGINNING \| TIMEBASED \| REPLICATION-GROUP-MESSAGE-ID
160+
| empty
161+
| Set the replay strategy if messages need to be replayed from broker to connector. For more information refer to https://docs.solace.com/Features/Replay/Msg-Replay-Concepts-Config.htm#Types[SolaceReplayConfiguration]
162+
163+
| replayStrategyReplicationGroupMessageId
164+
| string
165+
| valid-replication-group-message-id
166+
| empty
167+
| Set the property if replay strategy is REPLICATION-GROUP-MESSAGE-ID. Message playback is started after this replication group message id.
168+
169+
| replayStrategyStartTime
170+
| string
171+
| datetime string<yyyy-MM-dd'T'HH:mm:ss>
172+
| empty
173+
| Set the property if replay strategy is TIMEBASED. Any messages in the replay log equal to, or newer than, the specified date and time that match the endpoint’s subscriptions are replayed to the connector. The date can't be earlier than the date the replay log was created, otherwise replay will fail.
174+
175+
| replayStrategyTimeZone
176+
| string
177+
| valid timezone
178+
| UTC
179+
| Set the property if replay strategy is TIMEBASED.
180+
157181
| ackLastProcessedMessages
158182
| boolean
159183
| true or false
160184
| false
161-
| Set this value to true if connector needs to identify and acknowledge processed messages in last run during restarts. The connector purely depends on checkpoint generated during Spark commit. We recommended enabling this configuration only when your downstream system has processed data in previous run.
185+
a| Set this value to true if connector needs to identify and acknowledge processed messages in last run during restarts. The connector purely depends on checkpoint generated during Spark commit. We recommended enabling this configuration only when your downstream system has processed data in previous run.
162186

163-
| skipDuplicates
164-
| boolean
165-
| true or false
166-
| false
167-
| Set this value to true if connector needs check for duplicates before adding to Spark row. This scenario occurs when a task is running late and new task is started. The new task may have duplicate message as messages from earlier are not acknowledged by the time it is start.
187+
NOTE: This property will be void if replay strategy is enabled.
168188

169189
| offsetIndicator
170190
| string

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public class SolaceMicroBatch implements MicroBatchStream, SupportsAdmissionCont
4040
private final int batchSize;
4141
private final int partitions;
4242
private final boolean ackLastProcessedMessages;
43-
private final boolean skipMessageReprocessingIfTasksAreRunningLate;
4443
private final boolean createFlowsOnSameSession;
4544
private final boolean includeHeaders;
45+
private final boolean initiateReplay;
4646

4747
public SolaceMicroBatch(StructType schema, Map<String, String> properties, CaseInsensitiveStringMap options) {
4848
log.info("SolaceSparkConnector - Initializing Solace Spark Connector");
@@ -112,7 +112,6 @@ public SolaceMicroBatch(StructType schema, Map<String, String> properties, CaseI
112112
}
113113

114114
ackLastProcessedMessages = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES, SolaceSparkStreamingProperties.ACK_LAST_PROCESSED_MESSAGES_DEFAULT));
115-
skipMessageReprocessingIfTasksAreRunningLate = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.SKIP_DUPLICATES, SolaceSparkStreamingProperties.SKIP_DUPLICATES_DEFAULT));
116115
log.info("SolaceSparkConnector - Ack Last processed messages is set to {}", ackLastProcessedMessages);
117116

118117
includeHeaders = Boolean.parseBoolean(properties.getOrDefault(SolaceSparkStreamingProperties.INCLUDE_HEADERS, SolaceSparkStreamingProperties.INCLUDE_HEADERS_DEFAULT));
@@ -133,11 +132,12 @@ public SolaceMicroBatch(StructType schema, Map<String, String> properties, CaseI
133132

134133
solaceConnectionManager = new SolaceConnectionManager();
135134
log.info("SolaceSparkConnector - Solace Connection Details Host : {}, VPN : {}, Username : {}", properties.get(SolaceSparkStreamingProperties.HOST), properties.get(SolaceSparkStreamingProperties.VPN), properties.get(SolaceSparkStreamingProperties.USERNAME));
136-
SolaceBroker solaceBroker = new SolaceBroker(properties.get(SolaceSparkStreamingProperties.HOST), properties.get(SolaceSparkStreamingProperties.VPN), properties.get(SolaceSparkStreamingProperties.USERNAME), properties.get(SolaceSparkStreamingProperties.PASSWORD), properties.get(SolaceSparkStreamingProperties.QUEUE), properties);
135+
this.initiateReplay = properties.getOrDefault(SolaceSparkStreamingProperties.REPLAY_STRATEGY ,null) != null;
136+
SolaceBroker solaceBroker = new SolaceBroker(properties);
137137
solaceConnectionManager.addConnection(solaceBroker);
138138
for (int i = 0; i < partitions; i++) {
139139
if(!createFlowsOnSameSession && i > 0) {
140-
solaceBroker = new SolaceBroker(properties.get(SolaceSparkStreamingProperties.HOST), properties.get(SolaceSparkStreamingProperties.VPN), properties.get(SolaceSparkStreamingProperties.USERNAME), properties.get(SolaceSparkStreamingProperties.PASSWORD), properties.get(SolaceSparkStreamingProperties.QUEUE), properties);
140+
solaceBroker = new SolaceBroker(properties);
141141
solaceConnectionManager.addConnection(solaceBroker);
142142
}
143143
EventListener eventListener = new EventListener((i + 1));
@@ -198,7 +198,7 @@ private InputPartition[] splitDataOnPartitions() {
198198
this.messages.put(solaceRecord.getMessageId(), solaceMessage);
199199
} else {
200200
this.messages.put(solaceRecord.getMessageId(), solaceMessage);
201-
if (ackLastProcessedMessages) {
201+
if (ackLastProcessedMessages && !this.initiateReplay) {
202202
log.info("SolaceSparkConnector - Ack last processed messages is enabled. Checking if message is already processed based on available offsets.");
203203
// based on last successful offset, extract the message ID and see if same message is received, if so ack the message
204204
if (offsetJson != null && offsetJson.has("messageIDs")) {

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/properties/SolaceSparkStreamingProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ public class SolaceSparkStreamingProperties {
88
public static final String QUEUE = "queue";
99
public static final String BATCH_SIZE = "batchSize";
1010
public static final String BATCH_SIZE_DEFAULT = "1";
11+
public static final String REPLAY_STRATEGY = "replayStrategy";
12+
public static final String REPLAY_STRATEGY_REPLICATION_GROUP_MESSAGE_ID = "replayStrategyReplicationGroupMessageId";
13+
public static final String REPLAY_STRATEGY_START_TIME = "replayStrategyStartTime";
14+
public static final String REPLAY_STRATEGY_TIMEZONE = "replayStrategyTimeZone";
1115
public static final String ACK_LAST_PROCESSED_MESSAGES = "ackLastProcessedMessages";
1216
public static final String ACK_LAST_PROCESSED_MESSAGES_DEFAULT = "false";
1317
public static final String SKIP_DUPLICATES = "skipDuplicates";

0 commit comments

Comments
 (0)