Skip to content

Commit 2cc973f

Browse files
Fix failing test
1 parent 5a1cf51 commit 2cc973f

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingSourceIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public void beforeAll() throws ApiException {
6767
queue.permission(MsgVpnQueue.PermissionEnum.DELETE);
6868
queue.ingressEnabled(true);
6969
queue.egressEnabled(true);
70+
queue.setMaxDeliveredUnackedMsgsPerFlow(50L);
7071

7172
MsgVpnQueueSubscription subscription = new MsgVpnQueueSubscription();
7273
subscription.setSubscriptionTopic("solace/spark/streaming");
@@ -248,13 +249,20 @@ void Should_ProcessSolaceTextMessage() throws TimeoutException, InterruptedExcep
248249
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals("Hello Spark!", payload.get()));
249250
Thread.sleep(3000); // add timeout to ack messages on queue
250251
streamingQuery.stop();
252+
253+
sparkSession.stop();
254+
sparkSession.close();
251255
}
252256

253257

254258

255259
@Test
256260
@Order(4)
257261
void Should_CreateMultipleConsumersOnDifferentSessions_And_ProcessData() throws TimeoutException, InterruptedException, com.solace.semp.v2.monitor.ApiException {
262+
sparkSession = SparkSession.builder()
263+
.appName("data_source_test_1")
264+
.master("local[*]")
265+
.getOrCreate();
258266
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-2");
259267
DataStreamReader reader = sparkSession.readStream()
260268
.option(SolaceSparkStreamingProperties.HOST, solaceContainer.getOrigin(Service.SMF))

0 commit comments

Comments
 (0)