Skip to content

Commit 02a5a21

Browse files
Updated integration test
1 parent fad0957 commit 02a5a21

File tree

1 file changed

+59
-0
lines changed

1 file changed

+59
-0
lines changed

pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingOAuthIT.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.junit.jupiter.api.*;
1717
import org.testcontainers.junit.jupiter.Testcontainers;
1818
import org.testcontainers.shaded.org.awaitility.Awaitility;
19+
import org.testcontainers.solace.Service;
1920

2021
import java.io.IOException;
2122
import java.nio.charset.StandardCharsets;
@@ -261,6 +262,64 @@ void Should_ReadAccessTokenFromFile_And_ProcessData() throws TimeoutException, I
261262
streamingQuery.stop();
262263
}
263264

265+
@Test
266+
@Order(6)
267+
void Should_ConnectToInSecureOAuthServer_And_ProcessData_And_PublishToSolace() throws TimeoutException, InterruptedException {
268+
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
269+
Path writePath = Paths.get("src", "test", "resources", "spark-checkpoint-3");
270+
DataStreamReader reader = sparkSession.readStream()
271+
.option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
272+
.option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
273+
.option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2)
274+
.option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false)
275+
.option(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL, "http://localhost:7777/realms/solace/protocol/openid-connect/token")
276+
.option(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID, "solace")
277+
.option(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET, "solace-secret")
278+
.option(SolaceSparkStreamingProperties.OAUTH_CLIENT_TOKEN_REFRESH_INTERVAL, "5")
279+
.option(SolaceSparkStreamingProperties.QUEUE, SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_NAME)
280+
.option(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_SSL_VALIDATE_CERTIFICATE, false)
281+
.option(SolaceSparkStreamingProperties.BATCH_SIZE, "50")
282+
.option("checkpointLocation", path.toAbsolutePath().toString())
283+
.format("solace");
284+
final long[] count = {0};
285+
final Object lock = new Object();
286+
Dataset<Row> dataset = reader.load();
287+
288+
SolaceSession session = new SolaceSession(containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL), containerResource.getSolaceOAuthContainer().getVpn(), containerResource.getSolaceOAuthContainer().getUsername(), containerResource.getSolaceOAuthContainer().getPassword());
289+
Topic topic = JCSMPFactory.onlyInstance().createTopic("random/topic");
290+
XMLMessageConsumer messageConsumer = null;
291+
try {
292+
messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() {
293+
@Override
294+
public void onReceive(BytesXMLMessage bytesXMLMessage) {
295+
count[0] = count[0] + 1;
296+
}
297+
298+
@Override
299+
public void onException(JCSMPException e) {
300+
// Not required for test
301+
}
302+
});
303+
session.getSession().addSubscription(topic);
304+
messageConsumer.start();
305+
} catch (JCSMPException e) {
306+
throw new RuntimeException(e);
307+
}
308+
309+
StreamingQuery streamingQuery = dataset.writeStream().option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
310+
.option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
311+
.option(SolaceSparkStreamingProperties.USERNAME, containerResource.getSolaceOAuthContainer().getUsername())
312+
.option(SolaceSparkStreamingProperties.PASSWORD, containerResource.getSolaceOAuthContainer().getPassword())
313+
.option(SolaceSparkStreamingProperties.MESSAGE_ID, "my-default-id")
314+
.option(SolaceSparkStreamingProperties.TOPIC, "random/topic")
315+
.option("checkpointLocation", writePath.toAbsolutePath().toString())
316+
.format("solace").start();
317+
318+
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertEquals(100, count[0]));
319+
Thread.sleep(3000); // add timeout to ack messages on queue
320+
streamingQuery.stop();
321+
}
322+
264323
@Test
265324
void Should_Fail_When_InvalidOAuthUrlIsProvided() {
266325
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");

0 commit comments

Comments
 (0)