Skip to content

Commit 85d1b03

Browse files
Added integration test for handling multiple operations on dataframe with parallel consumers
1 parent 5adf76f commit 85d1b03

File tree

1 file changed

+126
-0
lines changed

1 file changed

+126
-0
lines changed

src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSinkIT.java

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,132 @@ public void onException(JCSMPException e) {
680680

681681
@Test
682682
@Order(10)
683+
void Should_ProcessData_WithSingleConsumer_And_Publish_To_Solace_With_MultipleOperations_On_Dataframe() throws TimeoutException, InterruptedException {
684+
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
685+
final long[] dataFrameCount = {0};
686+
DataStreamReader reader = sparkSession.readStream()
687+
.option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF))
688+
.option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn())
689+
.option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername())
690+
.option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword())
691+
.option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0")
692+
.option(SolaceSparkStreamingProperties.BATCH_SIZE, "50")
693+
.option(SolaceSparkStreamingProperties.INCLUDE_HEADERS, true)
694+
.option("checkpointLocation", path.toAbsolutePath().toString())
695+
.format("solace");
696+
final long[] count = {0};
697+
Dataset<Row> dataset = reader.load();
698+
699+
StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2<Dataset<Row>, Long>) (dataset1, batchId) -> {
700+
if(!dataset1.isEmpty()) {
701+
dataFrameCount[0] = dataFrameCount[0] + dataset1.count();
702+
dataset1 = dataset1.drop("TimeStamp", "PartitionKey", "Headers");
703+
dataset1.write()
704+
.option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF))
705+
.option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn())
706+
.option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername())
707+
.option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword())
708+
.option(SolaceSparkStreamingProperties.BATCH_SIZE, 0)
709+
.option(SolaceSparkStreamingProperties.TOPIC, "random/topic")
710+
.mode(SaveMode.Append)
711+
.format("solace").save();
712+
}
713+
}).start();
714+
715+
SolaceSession session = new SolaceSession(solaceContainer.getOrigin(Service.SMF), solaceContainer.getVpn(), solaceContainer.getUsername(), solaceContainer.getPassword());
716+
Topic topic = JCSMPFactory.onlyInstance().createTopic("random/topic");
717+
XMLMessageConsumer messageConsumer = null;
718+
try {
719+
messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() {
720+
@Override
721+
public void onReceive(BytesXMLMessage bytesXMLMessage) {
722+
count[0] = count[0] + 1;
723+
}
724+
725+
@Override
726+
public void onException(JCSMPException e) {
727+
// Not required for test
728+
729+
}
730+
});
731+
session.getSession().addSubscription(topic);
732+
messageConsumer.start();
733+
} catch (JCSMPException e) {
734+
throw new RuntimeException(e);
735+
}
736+
737+
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals(100, dataFrameCount[0]));
738+
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals(100, count[0]));
739+
Thread.sleep(3000); // add timeout to ack messages on queue
740+
streamingQuery.stop();
741+
}
742+
743+
@Test
744+
@Order(11)
745+
void Should_ProcessData_WithMultipleConsumer_And_Publish_To_Solace_With_MultipleOperations_On_Dataframe() throws TimeoutException, InterruptedException {
746+
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
747+
final long[] dataFrameCount = {0};
748+
DataStreamReader reader = sparkSession.readStream()
749+
.option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF))
750+
.option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn())
751+
.option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername())
752+
.option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword())
753+
.option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0")
754+
.option(SolaceSparkStreamingProperties.BATCH_SIZE, "50")
755+
.option(SolaceSparkStreamingProperties.INCLUDE_HEADERS, true)
756+
.option(SolaceSparkStreamingProperties.PARTITIONS, "3")
757+
.option(SolaceSparkStreamingProperties.QUEUE_RECEIVE_WAIT_TIMEOUT, 1000)
758+
.option("checkpointLocation", path.toAbsolutePath().toString())
759+
.format("solace");
760+
final long[] count = {0};
761+
Dataset<Row> dataset = reader.load();
762+
763+
StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2<Dataset<Row>, Long>) (dataset1, batchId) -> {
764+
if(!dataset1.isEmpty()) {
765+
dataFrameCount[0] = dataFrameCount[0] + dataset1.count();
766+
dataset1 = dataset1.drop("TimeStamp", "PartitionKey", "Headers");
767+
dataset1.write()
768+
.option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF))
769+
.option(SolaceSparkStreamingProperties.VPN, solaceContainer.getVpn())
770+
.option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername())
771+
.option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword())
772+
.option(SolaceSparkStreamingProperties.BATCH_SIZE, 0)
773+
.option(SolaceSparkStreamingProperties.TOPIC, "random/topic")
774+
.mode(SaveMode.Append)
775+
.format("solace").save();
776+
}
777+
}).start();
778+
779+
SolaceSession session = new SolaceSession(solaceContainer.getOrigin(Service.SMF), solaceContainer.getVpn(), solaceContainer.getUsername(), solaceContainer.getPassword());
780+
Topic topic = JCSMPFactory.onlyInstance().createTopic("random/topic");
781+
XMLMessageConsumer messageConsumer = null;
782+
try {
783+
messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() {
784+
@Override
785+
public void onReceive(BytesXMLMessage bytesXMLMessage) {
786+
count[0] = count[0] + 1;
787+
}
788+
789+
@Override
790+
public void onException(JCSMPException e) {
791+
// Not required for test
792+
793+
}
794+
});
795+
session.getSession().addSubscription(topic);
796+
messageConsumer.start();
797+
} catch (JCSMPException e) {
798+
throw new RuntimeException(e);
799+
}
800+
801+
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals(100, dataFrameCount[0]));
802+
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals(100, count[0]));
803+
Thread.sleep(3000); // add timeout to ack messages on queue
804+
streamingQuery.stop();
805+
}
806+
807+
@Test
808+
@Order(12)
683809
void Should_Not_ProcessData_When_QueueIsEmpty() throws TimeoutException, InterruptedException {
684810
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
685811
final long[] batchTriggerCount = {0};

0 commit comments

Comments
 (0)