Skip to content

Commit 6c4f92e

Browse files
Removed unnecessary files and added integration tests
1 parent 764e896 commit 6c4f92e

File tree

5 files changed

+127
-170
lines changed

5 files changed

+127
-170
lines changed

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/properties/SolaceSparkHeaders.java

Lines changed: 0 additions & 85 deletions
This file was deleted.

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/properties/SolaceSparkHeadersMeta.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/utils/SolaceUtils.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package com.solacecoe.connectors.spark.streaming.solace.utils;
22

33
import com.solacecoe.connectors.spark.streaming.properties.SolaceHeaderMeta;
4-
import com.solacecoe.connectors.spark.streaming.properties.SolaceSparkHeaders;
5-
import com.solacecoe.connectors.spark.streaming.properties.SolaceSparkHeadersMeta;
64
import com.solacesystems.jcsmp.*;
7-
import org.apache.commons.lang3.SerializationUtils;
85

96
import java.io.Serializable;
107
import java.nio.charset.StandardCharsets;
@@ -55,24 +52,26 @@ public static XMLMessage map(Object payload,
5552
BytesMessage bytesMessage = JCSMPFactory.onlyInstance().createMessage(BytesMessage.class);
5653
bytesMessage.setData((byte[]) payload);
5754
xmlMessage = bytesMessage;
58-
} else if (payload instanceof String) {
59-
TextMessage textMessage = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
60-
textMessage.setText((String) payload);
61-
xmlMessage = textMessage;
62-
} else if (payload instanceof SDTStream) {
63-
StreamMessage streamMessage = JCSMPFactory.onlyInstance().createMessage(StreamMessage.class);
64-
streamMessage.setStream((SDTStream) payload);
65-
xmlMessage = streamMessage;
66-
} else if (payload instanceof SDTMap) {
67-
MapMessage mapMessage = JCSMPFactory.onlyInstance().createMessage(MapMessage.class);
68-
mapMessage.setMap((SDTMap) payload);
69-
xmlMessage = mapMessage;
70-
} else if (payload instanceof Serializable) {
71-
BytesMessage bytesMessage = JCSMPFactory.onlyInstance().createMessage(BytesMessage.class);
72-
bytesMessage.setData(SerializationUtils.serialize((Serializable) payload));
73-
metadata.putBoolean(SolaceSparkHeaders.SERIALIZED_PAYLOAD, true);
74-
xmlMessage = bytesMessage;
75-
} else {
55+
}
56+
// else if (payload instanceof String) {
57+
// TextMessage textMessage = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
58+
// textMessage.setText((String) payload);
59+
// xmlMessage = textMessage;
60+
// } else if (payload instanceof SDTStream) {
61+
// StreamMessage streamMessage = JCSMPFactory.onlyInstance().createMessage(StreamMessage.class);
62+
// streamMessage.setStream((SDTStream) payload);
63+
// xmlMessage = streamMessage;
64+
// } else if (payload instanceof SDTMap) {
65+
// MapMessage mapMessage = JCSMPFactory.onlyInstance().createMessage(MapMessage.class);
66+
// mapMessage.setMap((SDTMap) payload);
67+
// xmlMessage = mapMessage;
68+
// } else if (payload instanceof Serializable) {
69+
// BytesMessage bytesMessage = JCSMPFactory.onlyInstance().createMessage(BytesMessage.class);
70+
// bytesMessage.setData(SerializationUtils.serialize((Serializable) payload));
71+
// metadata.putBoolean(SolaceSparkHeaders.SERIALIZED_PAYLOAD, true);
72+
// xmlMessage = bytesMessage;
73+
// }
74+
else {
7675
String msg = String.format(
7776
"Invalid payload received. Expected %s. Received: %s",
7877
String.join(", ",

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/write/SolaceDataWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,15 @@ public void write(InternalRow row) throws IOException {
122122
@Override
123123
public WriterCommitMessage commit() {
124124
checkForException();
125-
if(this.commitMessages.size() < Integer.parseInt(this.properties.getOrDefault(SolaceSparkStreamingProperties.BATCH_SIZE, SolaceSparkStreamingProperties.BATCH_SIZE_DEFAULT))) {
125+
int batchSize = Integer.parseInt(this.properties.getOrDefault(SolaceSparkStreamingProperties.BATCH_SIZE, SolaceSparkStreamingProperties.BATCH_SIZE_DEFAULT));
126+
if(batchSize == 0 || (batchSize > 0 && this.commitMessages.size() < Integer.parseInt(this.properties.getOrDefault(SolaceSparkStreamingProperties.BATCH_SIZE, SolaceSparkStreamingProperties.BATCH_SIZE_DEFAULT)))) {
126127
try {
127128
log.info("SolaceSparkConnector - Expected acknowledgements {}, Actual acknowledgements {}", this.properties.getOrDefault(SolaceSparkStreamingProperties.BATCH_SIZE, SolaceSparkStreamingProperties.BATCH_SIZE_DEFAULT), this.commitMessages.size());
128129
log.info("SolaceSparkConnector - Sleeping for 3000ms to check for pending acknowledgments");
129130
Thread.sleep(3000);
130131
} catch (InterruptedException e) {
131132
log.error("SolaceSparkConnector - Interrupted while waiting for pending acknowledgments", e);
133+
Thread.currentThread().interrupt();
132134
throw new RuntimeException(e);
133135
}
134136
}

pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingIT.java

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,19 @@
88
import com.solacecoe.connectors.spark.streaming.properties.SolaceSparkStreamingProperties;
99
import com.solacesystems.jcsmp.*;
1010
import org.apache.spark.api.java.function.VoidFunction2;
11-
import org.apache.spark.sql.Dataset;
12-
import org.apache.spark.sql.Row;
13-
import org.apache.spark.sql.SaveMode;
14-
import org.apache.spark.sql.SparkSession;
11+
import org.apache.spark.sql.*;
1512
import org.apache.spark.sql.streaming.DataStreamReader;
1613
import org.apache.spark.sql.streaming.StreamingQuery;
1714
import org.apache.spark.sql.streaming.StreamingQueryException;
1815
import org.junit.jupiter.api.*;
1916
import org.testcontainers.junit.jupiter.Testcontainers;
17+
import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
2018
import org.testcontainers.solace.Service;
2119
import org.testcontainers.solace.SolaceContainer;
2220

21+
import java.io.File;
22+
import java.io.IOException;
23+
import java.nio.file.Files;
2324
import java.nio.file.Path;
2425
import java.nio.file.Paths;
2526
import java.util.concurrent.ExecutorService;
@@ -88,6 +89,8 @@ public void handleErrorEx(Object o, JCSMPException e, long l) {
8889
TextMessage textMessage = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
8990
textMessage.setText("Hello Spark!");
9091
textMessage.setPriority(1);
92+
textMessage.setCorrelationId("test-correlation-id");
93+
textMessage.setDMQEligible(true);
9194
Topic topic = JCSMPFactory.onlyInstance().createTopic("solace/spark/streaming");
9295
messageProducer.send(textMessage, topic);
9396
}
@@ -99,6 +102,22 @@ public void handleErrorEx(Object o, JCSMPException e, long l) {
99102
}
100103
}
101104

105+
@AfterEach
106+
public void afterEach() throws IOException {
107+
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
108+
Path path1 = Paths.get("src", "test", "resources", "spark-checkpoint-2");
109+
Path path2 = Paths.get("src", "test", "resources", "spark-checkpoint-3");
110+
if(Files.exists(path)) {
111+
FileUtils.deleteDirectory(path.toAbsolutePath().toFile());
112+
}
113+
if(Files.exists(path1)) {
114+
FileUtils.deleteDirectory(path1.toAbsolutePath().toFile());
115+
}
116+
if(Files.exists(path2)) {
117+
FileUtils.deleteDirectory(path2.toAbsolutePath().toFile());
118+
}
119+
}
120+
102121
@Test
103122
public void Should_ProcessData() throws TimeoutException, StreamingQueryException {
104123
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
@@ -277,7 +296,7 @@ public void Should_CreateMultipleConsumersOnSameSession_And_ProcessData() throws
277296

278297
@Test
279298
public void Should_CreateMultipleConsumersOnDifferentSessions_And_ProcessData() throws TimeoutException, StreamingQueryException {
280-
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-3");
299+
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-2");
281300
// SparkSession sparkSession = SparkSession.builder()
282301
// .appName("data_source_test")
283302
// .master("local[*]")
@@ -662,6 +681,86 @@ public void Should_Fail_IfBatchSizeLessThan1() {
662681
});
663682
}
664683

684+
@Test
685+
public void Should_ProcessData_And_Publish_As_Stream_To_Solace() throws TimeoutException, StreamingQueryException {
686+
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
687+
Path writePath = Paths.get("src", "test", "resources", "spark-checkpoint-3");
688+
// SparkSession sparkSession = SparkSession.builder()
689+
// .appName("data_source_test")
690+
// .master("local[*]")
691+
// .getOrCreate();
692+
DataStreamReader reader = sparkSession.readStream()
693+
.option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF))
694+
.option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn())
695+
.option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername())
696+
.option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword())
697+
.option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0")
698+
.option(SolaceSparkStreamingProperties.BATCH_SIZE, "10")
699+
.option("checkpointLocation", path.toAbsolutePath().toString())
700+
.format("solace");
701+
final long[] count = {0};
702+
final boolean[] runProcess = {true};
703+
final String[] messageId = {""};
704+
Dataset<Row> dataset = reader.load();
705+
706+
StreamingQuery streamingQuery = dataset.writeStream().option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF))
707+
.option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn())
708+
.option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername())
709+
.option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword())
710+
// .option(SolaceSparkStreamingProperties.BATCH_SIZE, dataset.count())
711+
.option(SolaceSparkStreamingProperties.MESSAGE_ID, "my-default-id")
712+
.option("checkpointLocation", writePath.toAbsolutePath().toString())
713+
// .mode(SaveMode.Append)
714+
.format("solace").start();
715+
716+
SolaceSession session = new SolaceSession(solaceContainer.getOrigin(Service.SMF), solaceContainer.getVpn(), solaceContainer.getUsername(), solaceContainer.getPassword());
717+
Topic topic = JCSMPFactory.onlyInstance().createTopic("solace/spark/streaming");
718+
XMLMessageConsumer messageConsumer = null;
719+
try {
720+
messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() {
721+
@Override
722+
public void onReceive(BytesXMLMessage bytesXMLMessage) {
723+
count[0] = count[0] + 1;
724+
if(count[0] == 100) {
725+
messageId[0] = bytesXMLMessage.getApplicationMessageId();
726+
}
727+
}
728+
729+
@Override
730+
public void onException(JCSMPException e) {
731+
732+
}
733+
});
734+
session.getSession().addSubscription(topic);
735+
messageConsumer.start();
736+
} catch (JCSMPException e) {
737+
throw new RuntimeException(e);
738+
}
739+
740+
ExecutorService executorService = Executors.newFixedThreadPool(1);
741+
executorService.execute(() -> {
742+
do {
743+
if(count[0] == 100L) {
744+
runProcess[0] = false;
745+
try {
746+
Assertions.assertEquals("my-default-id", messageId[0], "MessageId mismatch");
747+
streamingQuery.stop();
748+
// sparkSession.close();
749+
executorService.shutdown();
750+
} catch (TimeoutException e) {
751+
throw new RuntimeException(e);
752+
}
753+
}
754+
try {
755+
Thread.sleep(100);
756+
} catch (InterruptedException e) {
757+
throw new RuntimeException(e);
758+
}
759+
} while (runProcess[0]);
760+
});
761+
streamingQuery.awaitTermination();
762+
}
763+
665764
@Test
666765
public void Should_ProcessData_And_Publish_With_CustomId_To_Solace() throws TimeoutException, StreamingQueryException {
667766
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");

0 commit comments

Comments
 (0)