diff --git a/pom.xml b/pom.xml
index c01844c..e262275 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
pom
PubSubPlus Connector Spark - Parent
Solace PubSub+ Connector for Spark streams data from Solace PubSub+ broker to Spark Data Sources.
- https://solace.com
+ https://solace.com/integration-hub/apache-spark/
pubsubplus-connector-spark_3.x
diff --git a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java
index bbb06f0..5d1a681 100644
--- a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java
+++ b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java
@@ -9,7 +9,7 @@
import com.solacecoe.connectors.spark.streaming.solace.LVQEventListener;
import com.solacecoe.connectors.spark.streaming.solace.SolaceBroker;
import com.solacecoe.connectors.spark.streaming.solace.exceptions.SolaceInvalidPropertyException;
-import com.solacesystems.jcsmp.JCSMPProperties;
+import com.solacecoe.connectors.spark.streaming.solace.utils.SolaceUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.SparkEnv;
@@ -48,48 +48,7 @@ public SolaceMicroBatch(Map properties) {
log.info("SolaceSparkConnector - Initializing Solace Spark Connector");
// Initialize classes required for Solace connectivity
- // User configuration validation
- if(!properties.containsKey(SolaceSparkStreamingProperties.HOST) || properties.get(SolaceSparkStreamingProperties.HOST) == null || properties.get(SolaceSparkStreamingProperties.HOST).isEmpty()) {
- throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Host name in configuration options");
- }
- if(!properties.containsKey(SolaceSparkStreamingProperties.VPN) || properties.get(SolaceSparkStreamingProperties.VPN) == null || properties.get(SolaceSparkStreamingProperties.VPN).isEmpty()) {
- throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
- }
-
- if(properties.containsKey(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME) &&
- properties.get(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME).equals(JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2)) {
- if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN)) {
- if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL).isEmpty()) {
- throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Authentication Server URL");
- }
-
- if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID).isEmpty()) {
- throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client ID");
- }
-
- if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET).isEmpty()) {
- throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Credentials Secret");
- }
-
- String clientCertificate = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_CLIENT_CERTIFICATE, null);
- if(clientCertificate != null) {
- String trustStoreFilePassword = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_TRUSTSTORE_PASSWORD, null);
- if (trustStoreFilePassword == null || trustStoreFilePassword.isEmpty()) {
- throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client TrustStore Password. If TrustStore file path is not configured, please provide password for default java truststore");
- }
- }
- } else if(properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null) == null || properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null).isEmpty()) {
- throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide valid access token input");
- }
- } else {
- if (!properties.containsKey(SolaceSparkStreamingProperties.USERNAME) || properties.get(SolaceSparkStreamingProperties.USERNAME) == null || properties.get(SolaceSparkStreamingProperties.USERNAME).isEmpty()) {
- throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Username in configuration options");
- }
-
- if (!properties.containsKey(SolaceSparkStreamingProperties.PASSWORD) || properties.get(SolaceSparkStreamingProperties.PASSWORD) == null || properties.get(SolaceSparkStreamingProperties.PASSWORD).isEmpty()) {
- throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Password in configuration options");
- }
- }
+ SolaceUtils.validateCommonProperties(properties);
if(!properties.containsKey(SolaceSparkStreamingProperties.QUEUE) || properties.get(SolaceSparkStreamingProperties.QUEUE) == null || properties.get(SolaceSparkStreamingProperties.QUEUE).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Queue in configuration options");
diff --git a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceDataSourceReaderFactory.java b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceDataSourceReaderFactory.java
index 35db5e1..c45fecd 100644
--- a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceDataSourceReaderFactory.java
+++ b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceDataSourceReaderFactory.java
@@ -31,11 +31,15 @@ public SolaceDataSourceReaderFactory(boolean includeHeaders, String lastKnownOff
@Override
public PartitionReader createReader(InputPartition partition) {
- TaskContext taskCtx = TaskContext.get();
- String queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY());
- String batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY());
- SolaceInputPartition solaceInputPartition = (SolaceInputPartition) partition;
- log.info("SolaceSparkConnector - Creating reader for input partition reader factory with query id {}, batch id {}, task id {} and partition id {}", queryId, batchId, taskCtx.taskAttemptId(), taskCtx.partitionId());
- return new SolaceInputPartitionReader(solaceInputPartition, includeHeaders, lastKnownOffset, properties, taskCtx, checkpoints);
+ try {
+ TaskContext taskCtx = TaskContext.get();
+ String queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY());
+ String batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY());
+ SolaceInputPartition solaceInputPartition = (SolaceInputPartition) partition;
+ log.info("SolaceSparkConnector - Creating reader for input partition reader factory with query id {}, batch id {}, task id {} and partition id {}", queryId, batchId, taskCtx.taskAttemptId(), taskCtx.partitionId());
+ return new SolaceInputPartitionReader(solaceInputPartition, includeHeaders, lastKnownOffset, properties, taskCtx, checkpoints);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java
index 8756265..0a62a39 100644
--- a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java
+++ b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java
@@ -93,6 +93,9 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
@Override
public boolean next() {
+ if(this.solaceBroker != null && this.solaceBroker.isException()) {
+ throw new SolaceSessionException(this.solaceBroker.getException());
+ }
solaceMessage = getNextMessage();
return solaceMessage != null;
}
diff --git a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java
index 9c3ae9c..3352dfd 100644
--- a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java
+++ b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java
@@ -91,8 +91,8 @@ public SolaceBroker(Map properties, String clientType) {
scheduleOAuthRefresh(interval);
}
} else {
- jcsmpProperties.setProperty(JCSMPProperties.USERNAME, properties.get(SolaceSparkStreamingProperties.USERNAME)); // client-username
- jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, properties.get(SolaceSparkStreamingProperties.PASSWORD)); // client-password
+ jcsmpProperties.setProperty(JCSMPProperties.USERNAME, properties.getOrDefault(SolaceSparkStreamingProperties.USERNAME, "")); // client-username
+ jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, properties.getOrDefault(SolaceSparkStreamingProperties.PASSWORD, "")); // client-password
}
this.uniqueName = JCSMPFactory.onlyInstance().createUniqueName("solace/spark/connector/"+clientType);
diff --git a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/utils/SolaceUtils.java b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/utils/SolaceUtils.java
index c03edc9..ef724c9 100644
--- a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/utils/SolaceUtils.java
+++ b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/utils/SolaceUtils.java
@@ -2,6 +2,7 @@
import com.solacecoe.connectors.spark.streaming.properties.SolaceHeaderMeta;
import com.solacecoe.connectors.spark.streaming.properties.SolaceSparkStreamingProperties;
+import com.solacecoe.connectors.spark.streaming.solace.exceptions.SolaceInvalidPropertyException;
import com.solacesystems.jcsmp.*;
import java.io.Serializable;
@@ -13,6 +14,51 @@
public class SolaceUtils {
+ public static void validateCommonProperties(Map properties) {
+ // User configuration validation
+ if(!properties.containsKey(SolaceSparkStreamingProperties.HOST) || properties.get(SolaceSparkStreamingProperties.HOST) == null || properties.get(SolaceSparkStreamingProperties.HOST).isEmpty()) {
+ throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Host name in configuration options");
+ }
+ if(!properties.containsKey(SolaceSparkStreamingProperties.VPN) || properties.get(SolaceSparkStreamingProperties.VPN) == null || properties.get(SolaceSparkStreamingProperties.VPN).isEmpty()) {
+ throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
+ }
+
+ if(properties.containsKey(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME) &&
+ properties.get(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME).equals(JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2)) {
+ if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN)) {
+ if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL).isEmpty()) {
+ throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Authentication Server URL");
+ }
+
+ if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID).isEmpty()) {
+ throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client ID");
+ }
+
+ if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET).isEmpty()) {
+ throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Credentials Secret");
+ }
+
+ String clientCertificate = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_CLIENT_CERTIFICATE, null);
+ if(clientCertificate != null) {
+ String trustStoreFilePassword = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_TRUSTSTORE_PASSWORD, null);
+ if (trustStoreFilePassword == null || trustStoreFilePassword.isEmpty()) {
+ throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client TrustStore Password. If TrustStore file path is not configured, please provide password for default java truststore");
+ }
+ }
+ } else if(properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null) == null || properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null).isEmpty()) {
+ throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide valid access token input");
+ }
+ } else if(properties.containsKey(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME) &&
+ properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC).equals(JCSMPProperties.AUTHENTICATION_SCHEME_BASIC)){
+ if (!properties.containsKey(SolaceSparkStreamingProperties.USERNAME) || properties.get(SolaceSparkStreamingProperties.USERNAME) == null || properties.get(SolaceSparkStreamingProperties.USERNAME).isEmpty()) {
+ throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Username in configuration options");
+ }
+
+ if (!properties.containsKey(SolaceSparkStreamingProperties.PASSWORD) || properties.get(SolaceSparkStreamingProperties.PASSWORD) == null || properties.get(SolaceSparkStreamingProperties.PASSWORD).isEmpty()) {
+ throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Password in configuration options");
+ }
+ }
+ }
public static String getMessageID(BytesXMLMessage message, String solaceOffsetIndicator) throws SDTException {
switch (solaceOffsetIndicator) {
case "CORRELATION_ID":
diff --git a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/write/SolaceStreamingWrite.java b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/write/SolaceStreamingWrite.java
index 2df5845..fcab38f 100644
--- a/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/write/SolaceStreamingWrite.java
+++ b/pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/write/SolaceStreamingWrite.java
@@ -1,5 +1,6 @@
package com.solacecoe.connectors.spark.streaming.write;
+import com.solacecoe.connectors.spark.streaming.solace.utils.SolaceUtils;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
@@ -22,24 +23,7 @@ public SolaceStreamingWrite(StructType schema, Map properties, C
this.properties = properties;
this.options = options;
- if(!properties.containsKey("host") || properties.get("host") == null || properties.get("host").isEmpty()) {
- log.error("SolaceSparkConnector - Please provide Solace Host name in configuration options");
- throw new RuntimeException("SolaceSparkConnector - Please provide Solace Host name in configuration options");
- }
- if(!properties.containsKey("vpn") || properties.get("vpn") == null || properties.get("vpn").isEmpty()) {
- log.error("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
- throw new RuntimeException("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
- }
-
- if(!properties.containsKey("username") || properties.get("username") == null || properties.get("username").isEmpty()) {
- log.error("SolaceSparkConnector - Please provide Solace Username in configuration options");
- throw new RuntimeException("SolaceSparkConnector - Please provide Solace Username in configuration options");
- }
-
- if(!properties.containsKey("password") || properties.get("password") == null || properties.get("password").isEmpty()) {
- log.error("SolaceSparkConnector - Please provide Solace Password in configuration options");
- throw new RuntimeException("SolaceSparkConnector - Please provide Solace Password in configuration options");
- }
+ SolaceUtils.validateCommonProperties(properties);
// if(!properties.containsKey("topic") || properties.get("topic") == null || properties.get("topic").isEmpty()) {
// log.error("SolaceSparkConnector - Please provide Solace Queue name in configuration options");
diff --git a/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingAuthenticationIT.java b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingAuthenticationIT.java
new file mode 100644
index 0000000..db646fd
--- /dev/null
+++ b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingAuthenticationIT.java
@@ -0,0 +1,360 @@
+package com.solacecoe.connectors.spark;
+
+import com.solacecoe.connectors.spark.base.SolaceSession;
+import com.solacecoe.connectors.spark.oauth.CertificateContainerResource;
+import com.solacecoe.connectors.spark.oauth.SolaceOAuthContainer;
+import com.solacecoe.connectors.spark.streaming.properties.SolaceSparkStreamingProperties;
+import com.solacesystems.jcsmp.*;
+import org.apache.spark.api.java.function.VoidFunction2;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamReader;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.junit.jupiter.api.*;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+@Testcontainers
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class SolaceSparkStreamingAuthenticationIT {
+ private SparkSession sparkSession;
+ private final CertificateContainerResource containerResource = new CertificateContainerResource(false);
+ @BeforeAll
+ public void beforeAll() {
+ containerResource.start();
+ if(containerResource.isRunning()) {
+ sparkSession = SparkSession.builder()
+ .appName("data_source_test")
+ .master("local[*]")
+ .getOrCreate();
+ } else {
+ throw new RuntimeException("Solace Container is not started yet");
+ }
+ }
+
+ @AfterAll
+ public void afterAll() throws IOException {
+ Path path1 = Paths.get("src", "test", "resources", "solace.jks");
+ Path path2 = Paths.get("src", "test", "resources", "solace_keystore.jks");
+
+ if(Files.exists(path1)) {
+ FileUtils.delete(path1.toAbsolutePath().toFile());
+ }
+
+ if(Files.exists(path2)) {
+ FileUtils.delete(path2.toAbsolutePath().toFile());
+ }
+
+ containerResource.stop();
+ }
+
+ @BeforeEach
+ public void beforeEach() throws JCSMPException {
+ if(containerResource.getSolaceOAuthContainer().isRunning()) {
+ SolaceSession session = new SolaceSession(containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF), containerResource.getSolaceOAuthContainer().getVpn(), containerResource.getSolaceOAuthContainer().getUsername(), containerResource.getSolaceOAuthContainer().getPassword());
+ XMLMessageProducer messageProducer = session.getSession().getMessageProducer(new JCSMPStreamingPublishCorrelatingEventHandler() {
+ @Override
+ public void responseReceivedEx(Object o) {
+ // not required in test
+ }
+ @Override
+ public void handleErrorEx(Object o, JCSMPException e, long l) {
+ // not required in test
+ }
+ });
+
+ for (int i = 0; i < 100; i++) {
+ TextMessage textMessage = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
+ textMessage.setText("Hello Spark!");
+ Topic topic = JCSMPFactory.onlyInstance().createTopic(SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION);
+ messageProducer.send(textMessage, topic);
+ }
+
+ messageProducer.close();
+ session.getSession().closeSession();
+ } else {
+ throw new RuntimeException("Solace Container is not started yet");
+ }
+ }
+
+ @AfterEach
+ public void afterEach() throws IOException {
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ Path path1 = Paths.get("src", "test", "resources", "spark-checkpoint-2");
+ Path path2 = Paths.get("src", "test", "resources", "spark-checkpoint-3");
+ if(Files.exists(path)) {
+ FileUtils.deleteDirectory(path.toAbsolutePath().toFile());
+ }
+ if(Files.exists(path1)) {
+ FileUtils.deleteDirectory(path1.toAbsolutePath().toFile());
+ }
+ if(Files.exists(path2)) {
+ FileUtils.deleteDirectory(path2.toAbsolutePath().toFile());
+ }
+ }
+
+ @Test
+ @Order(1)
+ void Should_ConnectUsingClientCertificateWithoutPassword() throws TimeoutException, InterruptedException {
+ Path resources = Paths.get("src", "test", "resources");
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ Path writePath = Paths.get("src", "test", "resources", "spark-checkpoint-3");
+ DataStreamReader reader = sparkSession.readStream()
+ .option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.USERNAME, "certificate-user")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE, resources.toAbsolutePath() + "/solace.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE, resources.toAbsolutePath() + "/solace_keystore.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_HOST, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_DATE, false)
+ .option(SolaceSparkStreamingProperties.QUEUE, SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_NAME)
+ .option(SolaceSparkStreamingProperties.BATCH_SIZE, "100")
+ .option("checkpointLocation", path.toAbsolutePath().toString())
+ .format("solace");
+ final long[] count = {0};
+ Dataset dataset = reader.load();
+
+ SolaceSession session = new SolaceSession(containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF), containerResource.getSolaceOAuthContainer().getVpn(), containerResource.getSolaceOAuthContainer().getUsername(), containerResource.getSolaceOAuthContainer().getPassword());
+ Topic topic = JCSMPFactory.onlyInstance().createTopic("random/topic");
+ XMLMessageConsumer messageConsumer = null;
+ try {
+ messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() {
+ @Override
+ public void onReceive(BytesXMLMessage bytesXMLMessage) {
+ count[0] = count[0] + 1;
+ }
+
+ @Override
+ public void onException(JCSMPException e) {
+ // Not required for test
+ }
+ });
+ session.getSession().addSubscription(topic);
+ messageConsumer.start();
+ } catch (JCSMPException e) {
+ throw new RuntimeException(e);
+ }
+
+ StreamingQuery streamingQuery = dataset.writeStream().option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.USERNAME, "certificate-user")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE, resources.toAbsolutePath() + "/solace.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE, resources.toAbsolutePath() + "/solace_keystore.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_HOST, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_DATE, false)
+ .option(SolaceSparkStreamingProperties.MESSAGE_ID, "my-default-id")
+ .option(SolaceSparkStreamingProperties.TOPIC, "random/topic")
+ .option("checkpointLocation", writePath.toAbsolutePath().toString())
+ .format("solace").start();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertTrue(count[0] > 0));
+ Thread.sleep(3000); // add timeout to ack messages on queue
+ streamingQuery.stop();
+ }
+
+ @Test
+ @Order(2)
+ void Should_ConnectUsingClientCertificateWithPassword() throws TimeoutException, InterruptedException {
+ Path resources = Paths.get("src", "test", "resources");
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ Path writePath = Paths.get("src", "test", "resources", "spark-checkpoint-3");
+ DataStreamReader reader = sparkSession.readStream()
+ .option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.USERNAME, "certificate-user-with-password")
+ .option(SolaceSparkStreamingProperties.PASSWORD, "certificate-user-with-password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE, resources.toAbsolutePath() + "/solace.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE, resources.toAbsolutePath() + "/solace_keystore.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_HOST, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_DATE, false)
+ .option(SolaceSparkStreamingProperties.QUEUE, SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_NAME)
+ .option(SolaceSparkStreamingProperties.BATCH_SIZE, "100")
+ .option("checkpointLocation", path.toAbsolutePath().toString())
+ .format("solace");
+ final long[] count = {0};
+ Dataset dataset = reader.load();
+
+ SolaceSession session = new SolaceSession(containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF), containerResource.getSolaceOAuthContainer().getVpn(), containerResource.getSolaceOAuthContainer().getUsername(), containerResource.getSolaceOAuthContainer().getPassword());
+ Topic topic = JCSMPFactory.onlyInstance().createTopic("random/topic");
+ XMLMessageConsumer messageConsumer = null;
+ try {
+ messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() {
+ @Override
+ public void onReceive(BytesXMLMessage bytesXMLMessage) {
+ count[0] = count[0] + 1;
+ }
+
+ @Override
+ public void onException(JCSMPException e) {
+ // Not required for test
+ }
+ });
+ session.getSession().addSubscription(topic);
+ messageConsumer.start();
+ } catch (JCSMPException e) {
+ throw new RuntimeException(e);
+ }
+
+ StreamingQuery streamingQuery = dataset.writeStream().option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.USERNAME, "certificate-user-with-password")
+ .option(SolaceSparkStreamingProperties.PASSWORD, "certificate-user-with-password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE, resources.toAbsolutePath() + "/solace.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE, resources.toAbsolutePath() + "/solace_keystore.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_HOST, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_DATE, false)
+ .option(SolaceSparkStreamingProperties.MESSAGE_ID, "my-default-id")
+ .option(SolaceSparkStreamingProperties.TOPIC, "random/topic")
+ .option("checkpointLocation", writePath.toAbsolutePath().toString())
+ .format("solace").start();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertTrue(count[0] > 0));
+ Thread.sleep(3000); // add timeout to ack messages on queue
+ streamingQuery.stop();
+ }
+
+ @Test
+ void Should_Fail_IfMandatoryUsernameIsMissing() {
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ assertThrows(StreamingQueryException.class, () -> {
+ DataStreamReader reader = sparkSession.readStream()
+ .option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+// .option(SolaceSparkStreamingProperties.USERNAME, solaceContainer.getUsername())
+ .option(SolaceSparkStreamingProperties.PASSWORD, containerResource.getSolaceOAuthContainer().getPassword())
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC)
+ .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES_PER_HOST, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES_WAIT_TIME, 100)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+"sub_ack_window_threshold", 75)
+ .option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0")
+ .option(SolaceSparkStreamingProperties.BATCH_SIZE, "1")
+ .option("checkpointLocation", path.toAbsolutePath().toString())
+ .format("solace");
+ Dataset dataset = reader.load();
+ StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2, Long>) (dataset1, batchId) -> {
+ System.out.println(dataset1.count());
+ }).start();
+ streamingQuery.awaitTermination();
+ });
+ }
+
+ @Test
+ void Should_Fail_IfMandatoryUsernameIsEmpty() {
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ assertThrows(StreamingQueryException.class, () -> {
+ DataStreamReader reader = sparkSession.readStream()
+ .option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.USERNAME, "")
+ .option(SolaceSparkStreamingProperties.PASSWORD, containerResource.getSolaceOAuthContainer().getPassword())
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC)
+ .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES_PER_HOST, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES_WAIT_TIME, 100)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+"sub_ack_window_threshold", 75)
+ .option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0")
+ .option(SolaceSparkStreamingProperties.BATCH_SIZE, "1")
+ .option("checkpointLocation", path.toAbsolutePath().toString())
+ .format("solace");
+ Dataset dataset = reader.load();
+ StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2, Long>) (dataset1, batchId) -> {
+ System.out.println(dataset1.count());
+ }).start();
+ streamingQuery.awaitTermination();
+ });
+ }
+
+ @Test
+ void Should_Fail_IfMandatoryPasswordIsMissing() {
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ assertThrows(StreamingQueryException.class, () -> {
+ DataStreamReader reader = sparkSession.readStream()
+ .option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.USERNAME, containerResource.getSolaceOAuthContainer().getUsername())
+// .option(SolaceSparkStreamingProperties.PASSWORD, solaceContainer.getPassword())
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC)
+ .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES_PER_HOST, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES_WAIT_TIME, 100)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+"sub_ack_window_threshold", 75)
+ .option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0")
+ .option(SolaceSparkStreamingProperties.BATCH_SIZE, "1")
+ .option("checkpointLocation", path.toAbsolutePath().toString())
+ .format("solace");
+ Dataset dataset = reader.load();
+ StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2, Long>) (dataset1, batchId) -> {
+ System.out.println(dataset1.count());
+ }).start();
+ streamingQuery.awaitTermination();
+ });
+ }
+
+ @Test
+ void Should_Fail_IfMandatoryPasswordIsEmpty() {
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ assertThrows(StreamingQueryException.class, () -> {
+ DataStreamReader reader = sparkSession.readStream()
+ .option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.USERNAME, containerResource.getSolaceOAuthContainer().getUsername())
+ .option(SolaceSparkStreamingProperties.PASSWORD, "")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC)
+ .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_CONNECT_RETRIES_PER_HOST, 1)
+ .option(SolaceSparkStreamingProperties.SOLACE_RECONNECT_RETRIES_WAIT_TIME, 100)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+"sub_ack_window_threshold", 75)
+ .option(SolaceSparkStreamingProperties.QUEUE, "Solace/Queue/0")
+ .option(SolaceSparkStreamingProperties.BATCH_SIZE, "1")
+ .option("checkpointLocation", path.toAbsolutePath().toString())
+ .format("solace");
+ Dataset dataset = reader.load();
+ StreamingQuery streamingQuery = dataset.writeStream().foreachBatch((VoidFunction2, Long>) (dataset1, batchId) -> {
+ System.out.println(dataset1.count());
+ }).start();
+ streamingQuery.awaitTermination();
+ });
+ }
+}
diff --git a/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingClientCertificateCNIT.java b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingClientCertificateCNIT.java
new file mode 100644
index 0000000..44bc1ca
--- /dev/null
+++ b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingClientCertificateCNIT.java
@@ -0,0 +1,173 @@
+package com.solacecoe.connectors.spark;
+
+import com.solacecoe.connectors.spark.base.SolaceSession;
+import com.solacecoe.connectors.spark.oauth.CertificateContainerResource;
+import com.solacecoe.connectors.spark.oauth.SolaceOAuthContainer;
+import com.solacecoe.connectors.spark.streaming.properties.SolaceSparkStreamingProperties;
+import com.solacesystems.jcsmp.*;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.streaming.DataStreamReader;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.junit.jupiter.api.*;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@Testcontainers
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class SolaceSparkStreamingClientCertificateCNIT {
+
+ private SparkSession sparkSession;
+ private final CertificateContainerResource containerResource = new CertificateContainerResource(true);
+ @BeforeAll
+ public void beforeAll() {
+ containerResource.start();
+ if(containerResource.isRunning()) {
+ sparkSession = SparkSession.builder()
+ .appName("data_source_test")
+ .master("local[*]")
+ .getOrCreate();
+ } else {
+ throw new RuntimeException("Solace Container is not started yet");
+ }
+ }
+
+ @AfterAll
+ public void afterAll() {
+ containerResource.stop();
+ }
+
+ @BeforeEach
+ public void beforeEach() throws JCSMPException {
+ if(containerResource.getSolaceOAuthContainer().isRunning()) {
+ SolaceSession session = new SolaceSession(containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF), containerResource.getSolaceOAuthContainer().getVpn(), containerResource.getSolaceOAuthContainer().getUsername(), containerResource.getSolaceOAuthContainer().getPassword());
+ XMLMessageProducer messageProducer = session.getSession().getMessageProducer(new JCSMPStreamingPublishCorrelatingEventHandler() {
+ @Override
+ public void responseReceivedEx(Object o) {
+ // not required in test
+ }
+ @Override
+ public void handleErrorEx(Object o, JCSMPException e, long l) {
+ // not required in test
+ }
+ });
+
+ for (int i = 0; i < 100; i++) {
+ TextMessage textMessage = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
+ textMessage.setText("Hello Spark!");
+ Topic topic = JCSMPFactory.onlyInstance().createTopic(SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION);
+ messageProducer.send(textMessage, topic);
+ }
+
+ messageProducer.close();
+ session.getSession().closeSession();
+ } else {
+ throw new RuntimeException("Solace Container is not started yet");
+ }
+ }
+
+ @AfterEach
+ public void afterEach() throws IOException {
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ Path path1 = Paths.get("src", "test", "resources", "spark-checkpoint-2");
+ Path path2 = Paths.get("src", "test", "resources", "spark-checkpoint-3");
+ Path path3 = Paths.get("src", "test", "resources", "solace.jks");
+ Path path4 = Paths.get("src", "test", "resources", "solace_keystore.jks");
+ if(Files.exists(path)) {
+ FileUtils.deleteDirectory(path.toAbsolutePath().toFile());
+ }
+ if(Files.exists(path1)) {
+ FileUtils.deleteDirectory(path1.toAbsolutePath().toFile());
+ }
+ if(Files.exists(path2)) {
+ FileUtils.deleteDirectory(path2.toAbsolutePath().toFile());
+ }
+
+ if(Files.exists(path3)) {
+ FileUtils.delete(path3.toAbsolutePath().toFile());
+ }
+
+ if(Files.exists(path4)) {
+ FileUtils.delete(path4.toAbsolutePath().toFile());
+ }
+ }
+
+ @Test
+ void Should_ConnectUsingClientCertificateCommonName() throws TimeoutException, InterruptedException {
+ Path resources = Paths.get("src", "test", "resources");
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ Path writePath = Paths.get("src", "test", "resources", "spark-checkpoint-3");
+ DataStreamReader reader = sparkSession.readStream()
+ .option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE, resources.toAbsolutePath() + "/solace.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE, resources.toAbsolutePath() + "/solace_keystore.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_HOST, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_DATE, false)
+ .option(SolaceSparkStreamingProperties.QUEUE, SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_NAME)
+ .option(SolaceSparkStreamingProperties.BATCH_SIZE, "100")
+ .option("checkpointLocation", path.toAbsolutePath().toString())
+ .format("solace");
+ final long[] count = {0};
+ Dataset dataset = reader.load();
+
+ SolaceSession session = new SolaceSession(containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF), containerResource.getSolaceOAuthContainer().getVpn(), containerResource.getSolaceOAuthContainer().getUsername(), containerResource.getSolaceOAuthContainer().getPassword());
+ Topic topic = JCSMPFactory.onlyInstance().createTopic("random/topic");
+ XMLMessageConsumer messageConsumer = null;
+ try {
+ messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() {
+ @Override
+ public void onReceive(BytesXMLMessage bytesXMLMessage) {
+ count[0] = count[0] + 1;
+ }
+
+ @Override
+ public void onException(JCSMPException e) {
+ // Not required for test
+ }
+ });
+ session.getSession().addSubscription(topic);
+ messageConsumer.start();
+ } catch (JCSMPException e) {
+ throw new RuntimeException(e);
+ }
+
+ StreamingQuery streamingQuery = dataset.writeStream().option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.USERNAME, "certificate-user")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE, resources.toAbsolutePath() + "/solace.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_TRUST_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE, resources.toAbsolutePath() + "/solace_keystore.jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_FORMAT, "jks")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_KEY_STORE_PASSWORD, "password")
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_HOST, false)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE_DATE, false)
+ .option(SolaceSparkStreamingProperties.MESSAGE_ID, "my-default-id")
+ .option(SolaceSparkStreamingProperties.TOPIC, "random/topic")
+ .option("checkpointLocation", writePath.toAbsolutePath().toString())
+ .format("solace").start();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertTrue(count[0] > 0));
+ Thread.sleep(3000); // add timeout to ack messages on queue
+ streamingQuery.stop();
+ }
+}
diff --git a/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingOAuthIT.java b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingOAuthIT.java
index 1cdbedb..779dbb5 100644
--- a/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingOAuthIT.java
+++ b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/SolaceSparkStreamingOAuthIT.java
@@ -15,6 +15,7 @@
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.junit.jupiter.api.*;
import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.shaded.org.apache.commons.io.FileUtils;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import java.io.IOException;
@@ -82,6 +83,22 @@ public void handleErrorEx(Object o, JCSMPException e, long l) {
}
}
+ @AfterEach
+ public void afterEach() throws IOException {
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ Path path1 = Paths.get("src", "test", "resources", "spark-checkpoint-2");
+ Path path2 = Paths.get("src", "test", "resources", "spark-checkpoint-3");
+ if(Files.exists(path)) {
+ FileUtils.deleteDirectory(path.toAbsolutePath().toFile());
+ }
+ if(Files.exists(path1)) {
+ FileUtils.deleteDirectory(path1.toAbsolutePath().toFile());
+ }
+ if(Files.exists(path2)) {
+ FileUtils.deleteDirectory(path2.toAbsolutePath().toFile());
+ }
+ }
+
@Test
@Order(1)
void Should_ConnectToOAuthServer_WithoutValidatingCertificates_And_ProcessData() throws TimeoutException, InterruptedException {
@@ -261,6 +278,69 @@ void Should_ReadAccessTokenFromFile_And_ProcessData() throws TimeoutException, I
streamingQuery.stop();
}
+ @Test
+ @Order(6)
+ void Should_ConnectToInSecureOAuthServer_And_ProcessData_And_PublishToSolace() throws TimeoutException, InterruptedException {
+ Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
+ Path writePath = Paths.get("src", "test", "resources", "spark-checkpoint-3");
+ sparkSession.sparkContext().setLogLevel("TRACE");
+ DataStreamReader reader = sparkSession.readStream()
+ .option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL, "http://localhost:7777/realms/solace/protocol/openid-connect/token")
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID, "solace")
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET, "solace-secret")
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_TOKEN_REFRESH_INTERVAL, "50")
+ .option(SolaceSparkStreamingProperties.QUEUE, SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_NAME)
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.BATCH_SIZE, "100")
+ .option("checkpointLocation", path.toAbsolutePath().toString())
+ .format("solace");
+ final long[] count = {0};
+ Dataset dataset = reader.load();
+
+ SolaceSession session = new SolaceSession(containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF), containerResource.getSolaceOAuthContainer().getVpn(), containerResource.getSolaceOAuthContainer().getUsername(), containerResource.getSolaceOAuthContainer().getPassword());
+ Topic topic = JCSMPFactory.onlyInstance().createTopic("random/topic");
+ XMLMessageConsumer messageConsumer = null;
+ try {
+ messageConsumer = session.getSession().getMessageConsumer(new XMLMessageListener() {
+ @Override
+ public void onReceive(BytesXMLMessage bytesXMLMessage) {
+ count[0] = count[0] + 1;
+ }
+
+ @Override
+ public void onException(JCSMPException e) {
+ // Not required for test
+ }
+ });
+ session.getSession().addSubscription(topic);
+ messageConsumer.start();
+ } catch (JCSMPException e) {
+ throw new RuntimeException(e);
+ }
+
+ StreamingQuery streamingQuery = dataset.writeStream().option(SolaceSparkStreamingProperties.HOST, containerResource.getSolaceOAuthContainer().getOrigin(SolaceOAuthContainer.Service.SMF_SSL))
+ .option(SolaceSparkStreamingProperties.VPN, containerResource.getSolaceOAuthContainer().getVpn())
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2)
+ .option(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX + JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL, "http://localhost:7777/realms/solace/protocol/openid-connect/token")
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID, "solace")
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET, "solace-secret")
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_TOKEN_REFRESH_INTERVAL, "50")
+ .option(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_SSL_VALIDATE_CERTIFICATE, false)
+ .option(SolaceSparkStreamingProperties.MESSAGE_ID, "my-default-id")
+ .option(SolaceSparkStreamingProperties.TOPIC, "random/topic")
+ .option("checkpointLocation", writePath.toAbsolutePath().toString())
+ .format("solace").start();
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> Assertions.assertTrue(count[0] > 0));
+ Thread.sleep(3000); // add timeout to ack messages on queue
+ streamingQuery.stop();
+ }
+
@Test
void Should_Fail_When_InvalidOAuthUrlIsProvided() {
Path path = Paths.get("src", "test", "resources", "spark-checkpoint-1");
diff --git a/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/CertificateContainerResource.java b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/CertificateContainerResource.java
new file mode 100644
index 0000000..b1e71fb
--- /dev/null
+++ b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/CertificateContainerResource.java
@@ -0,0 +1,138 @@
+package com.solacecoe.connectors.spark.oauth;
+
+import org.testcontainers.shaded.org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
+import org.testcontainers.shaded.org.bouncycastle.openssl.PEMParser;
+import org.testcontainers.shaded.org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.CertificateFactory;
+import java.security.cert.X509Certificate;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+public class CertificateContainerResource {
+ private SolaceOAuthContainer solaceOAuthContainer;
+ private boolean cnAsUsername;
+ public CertificateContainerResource(boolean cnAsUsername) {
+ this.cnAsUsername = cnAsUsername;
+ }
+ public void start() {
+ writeTrustStore();
+ writeKeyStore();
+
+ if(!cnAsUsername) {
+ solaceOAuthContainer = new SolaceOAuthContainer("solace/solace-pubsub-standard:latest");
+ solaceOAuthContainer.withCredentials("user", "pass")
+ .withClientCert(MountableFile.forClasspathResource("serverCertCombined.pem"),
+ MountableFile.forClasspathResource("MyRootCaCert.pem"), true)
+ .withExposedPorts(SolaceOAuthContainer.Service.SMF.getPort(), SolaceOAuthContainer.Service.SMF_SSL.getPort(), 1943, 8080)
+ .withPublishTopic(SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION, SolaceOAuthContainer.Service.SMF)
+ .withPublishTopic("random/topic", SolaceOAuthContainer.Service.SMF);
+ } else {
+ solaceOAuthContainer = new SolaceOAuthContainer("solace/solace-pubsub-standard:latest");
+ solaceOAuthContainer.withCredentials("user", "pass")
+ .withClientCert(MountableFile.forClasspathResource("serverCertCombined.pem"),
+ MountableFile.forClasspathResource("MyRootCaCert.pem"), true)
+ .withCNAsUsernameSource()
+ .withExposedPorts(SolaceOAuthContainer.Service.SMF.getPort(), SolaceOAuthContainer.Service.SMF_SSL.getPort(), 1943, 8080)
+ .withPublishTopic(SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION, SolaceOAuthContainer.Service.SMF)
+ .withPublishTopic("random/topic", SolaceOAuthContainer.Service.SMF);
+ }
+
+ solaceOAuthContainer.start();
+ await().until(() -> solaceOAuthContainer.isRunning());
+
+ }
+
+ private static void writeTrustStore() {
+ Path resources = Paths.get("src", "test", "resources");
+ String absolutePath = resources.toFile().getAbsolutePath();
+ File file = new File(absolutePath + "/MyRootCaCert.pem");
+ File keyFile = new File(absolutePath + "/MyRootCaKey.key");
+ try {
+ String path = absolutePath +"/solace.jks";
+ File yourFile = new File(absolutePath +"/solace.jks");
+ if(!yourFile.exists()) {
+ yourFile.createNewFile();
+ }
+ FileOutputStream fos = new FileOutputStream(path);
+ PrivateKey privateKey = loadPrivateKey(keyFile.getAbsolutePath());
+ createKeyStore(Files.readAllBytes(file.toPath()), privateKey, null, "solace", "jks").store(fos, "password".toCharArray());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void writeKeyStore() {
+ Path resources = Paths.get("src", "test", "resources");
+ String absolutePath = resources.toFile().getAbsolutePath();
+ File file = new File(absolutePath + "/clientCert1.pem");
+ File keyFile = new File(absolutePath + "/client1.key");
+ try {
+ File yourFile = new File(absolutePath +"/solace_keystore.jks");
+ if(!yourFile.exists()) {
+ yourFile.createNewFile();
+ }
+ FileOutputStream fos = new FileOutputStream(yourFile);
+ PrivateKey privateKey = loadPrivateKey(keyFile.getAbsolutePath());
+ createKeyStore(Files.readAllBytes(file.toPath()), privateKey, null, "solace-client", "jks").store(fos, "password".toCharArray());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // Method to load the private key from a PEM file
+ private static PrivateKey loadPrivateKey(String privateKeyPath) throws Exception {
+ try (FileReader keyReader = new FileReader(privateKeyPath)) {
+ PEMParser pemParser = new PEMParser(keyReader);
+ PrivateKeyInfo privateKeyInfo = (PrivateKeyInfo) pemParser.readObject();
+ JcaPEMKeyConverter converter = new JcaPEMKeyConverter();
+ return converter.getPrivateKey(privateKeyInfo);
+ }
+ }
+
+ private static KeyStore createKeyStore(byte[] ca, PrivateKey key, byte[] serviceCa, String name, String instance) {
+ try {
+ KeyStore keyStore = KeyStore.getInstance(instance);
+ keyStore.load(null);
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ if (ca != null) {
+ X509Certificate cert = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(ca));
+ keyStore.setCertificateEntry(name, cert);
+ if(key != null) {
+ keyStore.setKeyEntry(name + "-key", key , "password".toCharArray() ,new Certificate[]{cert});
+ }
+ }
+ if (serviceCa != null) {
+ keyStore.setCertificateEntry("service-ca",
+ cf.generateCertificate(new ByteArrayInputStream(serviceCa)));
+ }
+ return keyStore;
+ } catch (Exception ignored) {
+ return null;
+ }
+ }
+
+ public void stop() {
+ solaceOAuthContainer.stop();
+ }
+
+ public boolean isRunning() {
+ return solaceOAuthContainer.isRunning();
+ }
+
+
+ public SolaceOAuthContainer getSolaceOAuthContainer() {
+ return solaceOAuthContainer;
+ }
+}
\ No newline at end of file
diff --git a/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/ContainerResource.java b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/ContainerResource.java
index 5311cd9..c7fa59e 100644
--- a/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/ContainerResource.java
+++ b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/ContainerResource.java
@@ -32,7 +32,8 @@ public void start() {
MountableFile.forClasspathResource("keycloak.crt"), false)
.withOAuth()
.withExposedPorts(SolaceOAuthContainer.Service.SMF.getPort(), SolaceOAuthContainer.Service.SMF_SSL.getPort(), 1943, 8080)
- .withPublishTopic(SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION, SolaceOAuthContainer.Service.SMF);
+ .withPublishTopic(SolaceOAuthContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION, SolaceOAuthContainer.Service.SMF)
+ .withPublishTopic("random/topic", SolaceOAuthContainer.Service.SMF);
solaceOAuthContainer.start();
await().until(() -> solaceOAuthContainer.isRunning());
diff --git a/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/SolaceOAuthContainer.java b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/SolaceOAuthContainer.java
index b3f8953..e61d0d2 100644
--- a/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/SolaceOAuthContainer.java
+++ b/pubsubplus-connector-spark_3.x/src/test/java/com/solacecoe/connectors/spark/oauth/SolaceOAuthContainer.java
@@ -45,6 +45,7 @@ public class SolaceOAuthContainer extends GenericContainer
private boolean withClientCert;
private boolean withOAuth;
private boolean clientCertificateAuthority;
+ private boolean withCNAsUsernameSource;
/**
* Create a new solace container with the specified image name.
@@ -145,6 +146,25 @@ private Transferable createConfigurationScript() {
updateConfigScript(scriptBuilder, "no shutdown");
updateConfigScript(scriptBuilder, "exit");
+ updateConfigScript(scriptBuilder, "create client-username certificate-user message-vpn " + vpn);
+ updateConfigScript(scriptBuilder, "acl-profile default");
+ updateConfigScript(scriptBuilder, "client-profile default");
+ updateConfigScript(scriptBuilder, "no shutdown");
+ updateConfigScript(scriptBuilder, "exit");
+
+ updateConfigScript(scriptBuilder, "create client-username certificate-user-with-password message-vpn " + vpn);
+ updateConfigScript(scriptBuilder, "password certificate-user-with-password");
+ updateConfigScript(scriptBuilder, "acl-profile default");
+ updateConfigScript(scriptBuilder, "client-profile default");
+ updateConfigScript(scriptBuilder, "no shutdown");
+ updateConfigScript(scriptBuilder, "exit");
+
+ updateConfigScript(scriptBuilder, "create client-username localhost message-vpn " + vpn);
+ updateConfigScript(scriptBuilder, "acl-profile default");
+ updateConfigScript(scriptBuilder, "client-profile default");
+ updateConfigScript(scriptBuilder, "no shutdown");
+ updateConfigScript(scriptBuilder, "exit");
+
if (withClientCert) {
if (clientCertificateAuthority) {
updateConfigScript(scriptBuilder, "configure");
@@ -152,14 +172,16 @@ private Transferable createConfigurationScript() {
updateConfigScript(scriptBuilder, "authentication");
updateConfigScript(scriptBuilder, "create client-certificate-authority RootCA");
updateConfigScript(scriptBuilder, "certificate file rootCA.crt");
- updateConfigScript(scriptBuilder, "show client-certificate-authority ca-name *");
+ updateConfigScript(scriptBuilder, "show client-certificate-authority RootCA *");
updateConfigScript(scriptBuilder, "end");
updateConfigScript(scriptBuilder, "configure");
updateConfigScript(scriptBuilder, "message-vpn " + vpn);
// Enable client certificate authentication
updateConfigScript(scriptBuilder, "authentication client-certificate");
- updateConfigScript(scriptBuilder, "allow-api-provided-username");
+ if(!withCNAsUsernameSource) {
+ updateConfigScript(scriptBuilder, "allow-api-provided-username");
+ }
updateConfigScript(scriptBuilder, "no shutdown");
updateConfigScript(scriptBuilder, "end");
} else {
@@ -168,7 +190,7 @@ private Transferable createConfigurationScript() {
updateConfigScript(scriptBuilder, "ssl");
updateConfigScript(scriptBuilder, "create domain-certificate-authority RootCA");
updateConfigScript(scriptBuilder, "certificate file rootCA.crt");
- updateConfigScript(scriptBuilder, "show domain-certificate-authority ca-name *");
+ updateConfigScript(scriptBuilder, "show domain-certificate-authority RootCA *");
updateConfigScript(scriptBuilder, "end");
}
@@ -406,6 +428,14 @@ public SolaceOAuthContainer withOAuth() {
return this;
}
+ /**
+ * Sets Common Name as username source
+ */
+ public SolaceOAuthContainer withCNAsUsernameSource() {
+ this.withCNAsUsernameSource = true;
+ return this;
+ }
+
/**
* Configured VPN
*
diff --git a/pubsubplus-connector-spark_3.x/src/test/resources/MyRootCaCert.pem b/pubsubplus-connector-spark_3.x/src/test/resources/MyRootCaCert.pem
new file mode 100644
index 0000000..f7585e9
--- /dev/null
+++ b/pubsubplus-connector-spark_3.x/src/test/resources/MyRootCaCert.pem
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIIDzTCCArWgAwIBAgIUdJosN+VRDHN3/7brWxp+8S1DEd0wDQYJKoZIhvcNAQEL
+BQAwdTELMAkGA1UEBhMCR0IxDzANBgNVBAgMBkxvbmRvbjEPMA0GA1UEBwwGTG9u
+ZG9uMRgwFgYDVQQKDA9HbG9iYWwgU2VjdXJpdHkxFjAUBgNVBAsMDUlUIERlcGFy
+dG1lbnQxEjAQBgNVBAMMCWxvY2FsaG9zdDAgFw0yNTAzMjYwNDE5MzZaGA8yMTI1
+MDMwMjA0MTkzNlowdTELMAkGA1UEBhMCR0IxDzANBgNVBAgMBkxvbmRvbjEPMA0G
+A1UEBwwGTG9uZG9uMRgwFgYDVQQKDA9HbG9iYWwgU2VjdXJpdHkxFjAUBgNVBAsM
+DUlUIERlcGFydG1lbnQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcN
+AQEBBQADggEPADCCAQoCggEBAM+9Ov69Yc5fDXbTPnTyxfjFBBhRrMGr3QAPCFvI
+sex9lFD5L0tTkLHXVwYGns3Jyexvg9IV1kbnu8JTjjN8pzZtwC3vJzNPLXZKdVxD
+yI5zm6bEZTkRsyFJwxHAdc+iDh6n4fDC9YnyQPz6TJdMKmLHIH88bvzX2fWbtDjr
+eGyCX2pnRZ5sPpTckmwsEnC5jzpu25JZAYSFiPmaWGAIQ+yR2LxTHMLGw8qDQKt9
+DlnMiHc15skKp7yQZCmcAxUfiNj+E/ETfvKfto0w3eKcKasyk4Iih8I144ob/WkW
+OAdc+zourPz3z7ncYCjLGeej8Njc/tOC11gfPueUesA5Gu0CAwEAAaNTMFEwHQYD
+VR0OBBYEFKHXJWypvjNlkYY1FI7hp4sVMr3HMB8GA1UdIwQYMBaAFKHXJWypvjNl
+kYY1FI7hp4sVMr3HMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEB
+AIb30NpkGnMy0GWlXNEj/DJ4fTjSwS2yay/hJcuKpjRD9I6H834aF4fzZPqQHKZl
+fdt/EZmsXnZy7IH2gxKo6kKqYNpYcsUZklIFKJyoDw4TcQCh+jnoWeIsqBZ649Vj
+O2za1ddPc60s4HSSqK1ot0Hy8Fxu8NhQpnYA8ELPE5mEkVmY7nM2CXtN8tfEADUH
+DvtDwSA6qnpQbBSTwmlbJHzr2OSmCDoFFUpP7g5J0m3MsL+jd0hOab12m4WJEEEF
+rq8K4Q8g0TWfmfQqQWc64wI7PLT5N2wfq5OsaSKTwFv35Dt6hztcauB3YGLRvsGA
+mMGyWaPAFGL0x3BIjrdq3l4=
+-----END CERTIFICATE-----
diff --git a/pubsubplus-connector-spark_3.x/src/test/resources/MyRootCaKey.key b/pubsubplus-connector-spark_3.x/src/test/resources/MyRootCaKey.key
new file mode 100644
index 0000000..7efc7b8
--- /dev/null
+++ b/pubsubplus-connector-spark_3.x/src/test/resources/MyRootCaKey.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDPvTr+vWHOXw12
+0z508sX4xQQYUazBq90ADwhbyLHsfZRQ+S9LU5Cx11cGBp7Nycnsb4PSFdZG57vC
+U44zfKc2bcAt7yczTy12SnVcQ8iOc5umxGU5EbMhScMRwHXPog4ep+HwwvWJ8kD8
++kyXTCpixyB/PG7819n1m7Q463hsgl9qZ0WebD6U3JJsLBJwuY86btuSWQGEhYj5
+mlhgCEPskdi8UxzCxsPKg0CrfQ5ZzIh3NebJCqe8kGQpnAMVH4jY/hPxE37yn7aN
+MN3inCmrMpOCIofCNeOKG/1pFjgHXPs6Lqz898+53GAoyxnno/DY3P7TgtdYHz7n
+lHrAORrtAgMBAAECggEAAlFCrNrmxfGvNqU6Avbdp4JtML24ZqGBAdb1MwHk1ZQ7
+z+NFzC8Xv6HCOPgbo96gGHtNSMPAwq/5/2ihE7sfmJ/996pnAZaXptkZqHevwnZN
+Zfu2ifU2K7g7im9bs0DY40mNUgIbaYR/5r6rlStdCU4qUKCeRZZ7qOERh4zBBg/5
+6W17/aCyrMt0GPY4rcUAqL+W2Bl88lOZgy2hu7rM8cofbHKn+YwDRy4qfS5Nd44e
+przzeiqD2rKbWnT//6Vy9ew/ll07l6TLdWaV/u0SFrx67rg/DNQ+WDGpp+GMMSn0
+xed9onf4fgT5utnOwclP2BVqvgr5tQ8IrV+fv0eNrwKBgQD4wVDBKNDHB6uEijyZ
+MT/6B8dwWjhOoDXQ3c7i4ogs3i//CF5sjim0U2vXsC/35Smv0rgDP9aXm3p3+h0J
+HxKlmVDYLv4gCdcSDRClm05dTGLDGSC66o+YMcgvt7cvBNK/fC7GjfHLhUy68mY2
+22ymUAD8Y0E+o0Nzo9SgZWdPLwKBgQDVyhsIJYLBUwK4LNMBMY+pwVZsfM7wcWQF
++BqKdpIJ5MUlPK8qgrQZibslKm/+wY0PL03C1d4QL+8IQaMq0GJGas912ZkDUJRW
+mhaeAVfhqkySpByrAbLNKtnyyizxBCKBBzCk1xo3Y49g1MuQdlU41NGmnH5/jdI6
+igCeX71QowKBgQDXbev27NcqlyzmgyWJ8koaymPhnHDJPxx8P6wtd8xBN0dP0jqc
+Pt5FSj3KNM7GHFPm7lHoln/NGpyMjTJaNOvcOJP4NXJl4r+85U22bnMGf+HEXt2m
+ov8q6moZUTy23rF5nh2QXVu7Sz0xZj45YfyNaBoCeh8GFTSUUJ0TL96xhwKBgA/5
+fxWXv/J0InLino2rR2HW2X9XwBrFTkUH6evIulbU6kpDA4+4nn5+BSbcyeuOSN0G
+mtfhJSuueTzzsp6DXaSHXKMgtEdEISFreB8sWIW7NOeMLbw1b0fXWNvEcaE8vttH
+T0Ix9HqJDPCeVeuge/O4K0Y40Vb0oO3Q3AT5gGrjAoGBAKR/YAsnM8T79kFlD8wc
+yVcxYRODSEF14YLebdK0EhZj+krgzoWvObrguMN17elIfNxZR1h7bztcMCkWALBr
+Bzf+qCgxTcWm2bpRsaL60CwBLntU0Tt9QYHd8YkBib54fJGuwGFG521jHIrnkxEL
+rXZZRAe7Rig674vNDiMJwl66
+-----END PRIVATE KEY-----
diff --git a/pubsubplus-connector-spark_3.x/src/test/resources/client1.key b/pubsubplus-connector-spark_3.x/src/test/resources/client1.key
new file mode 100644
index 0000000..0549a94
--- /dev/null
+++ b/pubsubplus-connector-spark_3.x/src/test/resources/client1.key
@@ -0,0 +1,28 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDhFXwK8VE6+5dF
+n5fqGdI2XtF53eX3XB3aOnJgt1KLNm22jTRmbOwPr9CYcy3ilsy50Jw/+tNCXRDt
+qicJ0Nwql7Oxp7MwTfHazWgKvBua9vm+26ki976dzruXadGQkqXVokpuAZoCefG2
+d0M5knpZ1S0Fod5yeOck0/N441w/RfUdQG2W/8O7iveJ0Jb8H7dfNn8eVE7UZzGq
+mbznu9NN+HBs3n8Apj5e+9bTEHFc56MCXYim21mktEc7TBwJXoCSIM21wQYbDTIh
+dxhploq21fP1i+wGuzx1QtO7hRUmamzOIDwLFPKxrDLR5MLIpPvxEmTrLJdkUqMP
+NooRHp25AgMBAAECggEAA1MV6fN2h7B/o3+f9s061aLA5VPpuaWaq0aBufv7NCMx
+q6iaRYkGU5T9N8M7wkUH8RaruCY0SJ6nSVtiCDsVhWuIM++OLgB4ugqepsnwfnF3
+buI/tUyDfEE5dBsUBValLGqhPs63P6Idd9x7GbaYKAuGBrdFWdt/ToPy7K4vBqAy
+zRRpDLmrjO1ENcOL+uGnF44htxt4OcDW7kTJzc0/orNEP3lnvdNuaiPQi+xrF/Zx
+Ib2VyvcoIlSbjdj/aw/Z1R7PeaxfVhdAwNHJPQQjUu6KhX3Eau2VTZdp/6xNr4Mo
+UW5sSSVUEVIW6CXiTAOlt6m8J1b5iXTuqhy7//KeoQKBgQD8N9fdNko3bEOWPdCw
+97OZc6eou89VUDAaSOv3HIuTE1IHlppbcTGBQXhJtyh5n+MndWwlOYGvsAHMUYKE
+7FrmM5kfUmeT/XVfHQMa8mtJG+73IhuJ1f5HWanIicRSgYEJpf/hY2pkILQhgaEU
+KYVgfJGrAk82zu3Us4TEwP8GvwKBgQDkdXwZmR0lAuXB0Vppm0NliNqqr23g3MsC
+ddou5NPWdR9IYGhQMYaomcAyM/tE3FjSPwBSWx2awKqTpv80t6dVWiDpf/OgAnZH
+XKQbZu/tORY04iwtHSrpsJCwdeeKoHURi+W33CHF7owbTOG2lRavvJTftUKy+1oy
+DIO5fwixhwKBgQC/dVCq/uxFuCxyH0iNHes7AF10K+VOmMl7GoHtGsElC1SIqB1S
+btGqkd1S7/OMp/+SU4OdmlLE3/HtfK58YsL53o9NBIvV/mAbrHVP/lfMrqSu+zg2
+YLhex0SoE1RxYRdoEJf2KzZ2/ZBfovx4xrAVt1oIm38wVtKZVRDjHrI5FwKBgQCH
+J2XScv2F7s/juKmnPl5BzBOo1H3JgHVtMDFyepdEESf/lmE/x/zk84h7arskwsKd
+Jv5WumaqLX78ONhT5K5O1TIRUUvchnhYwVqoh147VgFoKcF+svm3JGiErEvdfA2l
+4sB04/rq8LPcVsBVVqAEOSlNqxsZbY+vei3XaGrJMwKBgQCELdIsJ9FcR9jP+S9Z
+fhSWFWth4v8Xa3eyhgUeIviKY5rS4h//U4QI3x+YIkxKPtyP2YivsboN1eXM1dct
+xPpuJm7hI3UH32q8oM2NhSjD1Oe6JD8bt8ghfnHWshjvFtpCD2/tpB2cyBM4vCfs
+5HZKPQWebF8V7uPftx7+mz/+pA==
+-----END PRIVATE KEY-----
diff --git a/pubsubplus-connector-spark_3.x/src/test/resources/clientCert1.pem b/pubsubplus-connector-spark_3.x/src/test/resources/clientCert1.pem
new file mode 100644
index 0000000..3430fdf
--- /dev/null
+++ b/pubsubplus-connector-spark_3.x/src/test/resources/clientCert1.pem
@@ -0,0 +1,23 @@
+-----BEGIN CERTIFICATE-----
+MIID3jCCAsagAwIBAgIUWrwXRVQHXds7nOgU0rDqTrIFymQwDQYJKoZIhvcNAQEL
+BQAwdTELMAkGA1UEBhMCR0IxDzANBgNVBAgMBkxvbmRvbjEPMA0GA1UEBwwGTG9u
+ZG9uMRgwFgYDVQQKDA9HbG9iYWwgU2VjdXJpdHkxFjAUBgNVBAsMDUlUIERlcGFy
+dG1lbnQxEjAQBgNVBAMMCWxvY2FsaG9zdDAgFw0yNTAzMjYwNDIwNDJaGA8yMTI1
+MDMwMjA0MjA0MlowdTELMAkGA1UEBhMCR0IxDzANBgNVBAgMBkxvbmRvbjEPMA0G
+A1UEBwwGTG9uZG9uMRgwFgYDVQQKDA9HbG9iYWwgU2VjdXJpdHkxFjAUBgNVBAsM
+DUlUIERlcGFydG1lbnQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcN
+AQEBBQADggEPADCCAQoCggEBAOEVfArxUTr7l0Wfl+oZ0jZe0Xnd5fdcHdo6cmC3
+Uos2bbaNNGZs7A+v0JhzLeKWzLnQnD/600JdEO2qJwnQ3CqXs7GnszBN8drNaAq8
+G5r2+b7bqSL3vp3Ou5dp0ZCSpdWiSm4BmgJ58bZ3QzmSelnVLQWh3nJ45yTT83jj
+XD9F9R1AbZb/w7uK94nQlvwft182fx5UTtRnMaqZvOe70034cGzefwCmPl771tMQ
+cVznowJdiKbbWaS0RztMHAlegJIgzbXBBhsNMiF3GGmWirbV8/WL7Aa7PHVC07uF
+FSZqbM4gPAsU8rGsMtHkwsik+/ESZOssl2RSow82ihEenbkCAwEAAaNkMGIwCQYD
+VR0TBAIwADAVBgNVHREEDjAMggpsb2NhbGhvc3Q7MB0GA1UdDgQWBBTszPc7cJMB
+r0cDK7r/O1T3qC844TAfBgNVHSMEGDAWgBSh1yVsqb4zZZGGNRSO4aeLFTK9xzAN
+BgkqhkiG9w0BAQsFAAOCAQEAd3/LAfwvIPajnLWLLDniTVzsY2KCBOvfS2NkUyL0
+LJz4SOpeRC2GgwO94D6w1DpQs8ATBXjHZeTkyQHglqcsjP6zg3xpmnYFr7KniEQC
+w45WBg1IkO7yEVgYs5YdlxOs3mIniaauAMDJh942n+ji1AsIdJPVbP5iaCBmF19W
+bdRLo7K3MSfYgfS+JBgdpoNn7xIsVGvY1bPTP3rv0gk/TlcJ2U7Rnj3XpqbFbw3Q
+EBErp524pc2hFepAU5LZftqRIrNmvhuUlL2lrqOCaDN4LaAyJ39tXMC9kP2TiqSc
+u8cT/tqgzY1v+KXHCjCYcf9ZgHhE105hcG5Q5Qqx49j+Kg==
+-----END CERTIFICATE-----
diff --git a/pubsubplus-connector-spark_3.x/src/test/resources/serverCertCombined.pem b/pubsubplus-connector-spark_3.x/src/test/resources/serverCertCombined.pem
new file mode 100644
index 0000000..9103222
--- /dev/null
+++ b/pubsubplus-connector-spark_3.x/src/test/resources/serverCertCombined.pem
@@ -0,0 +1,51 @@
+-----BEGIN PRIVATE KEY-----
+MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDAUf4eAPEgkocO
+Pg3PnLluFFTWH/yS4z8vbZg9taozH0i1P6Hj223XXsQ6zajw7bh8OMYv7L7+jMnB
+SdCsQjLvbS3iZAcXu5/fxjekSsC3CmOXgoEUnCAhJ1ghZo+eCedeXjfbVYIVoIEa
+Y+z1+uYdZNqRosHfV+axxGau6Qvpl8lPKB0wlngHl1Aj+r84irsnnjgKMDrdXYRX
+B2PaxX995QPOmqtpX/MnikSw49ei+x6JDAJpJ38lrMztz7VrwGoof8BEgt/+j2Ii
+M/mhn974O2OTfUeY//evEHYE3RKuikTBdChDC19I0ek0et1FIB+OL5jSj3m5bg0Z
+5Pqnge3fAgMBAAECggEAO47IwvLouX0XHQaDkWlIFXpvYVZLihOc5Cus62NgX2+P
+PS4FDQLNq9Y5r7oz2NTNfo64dWv8I+lmq5iX6lif3Hyzqnnupl9g3dRNM/BAaQ7r
+E6cDIWZOHuZHySQxUrBfed50rtR2sgrgIFOUtQJrk+vBMju0jV1z4FZhtEYQfdiN
+ZXNLUhK2Ir8kES7XvH0F6lc1Lfy0EqZ+QXqmWJ1f3qZjLCQiW9baQciIiN9TGNfz
+Iso80CKAhiM0H2ZDDMzehrTSgY8y+kXAGPUBmTL8a7zoomAxp4zPGOxTnZ78BG9F
+e8VD8XztyH5N8Q4S/jW75YBkAZ961gt3uTY+2a/+vQKBgQDuJXlE+X04HrE0gB8P
+pxnyTRZfNqUh+BfvgbnBf7+L2XhkiIEqI5WNsxrG0tebNTENGABkUUHOf4zZPZ0u
+2j34WtWBIbT0ktvPk/ZiMpiCMpylPLeh9/PqRXveCmTU487001twVPe0aweedEJC
+6myidvO2s8sRR+v5TLzFn6zZswKBgQDOvQU7YQUHAP5kPXSbtCs/MK4iqgIUta+g
+GPhkunLrzXdAWxqQhDtTMEYqV1Ep5BkrIe2O1rmUAgZeQNVC1rV+feK9i4OMleDp
+8LUmL5f+v0I3wrCuyJQjly+dXxMkL/CZfGaCpzI1O4vWHuR9AuuEXaiUx46F5vf/
+swWh5LYtJQKBgE1FqYjA72mTQ+qGHA4HKXsemkDAyWoxRwr26+AmZG+rW847ILYc
+n70/vVOAPGdnJghanl3uo8yuRHSE8oiuVedNCfQBjObC9EHxf8OzKRst8vgGsIsE
+moeMjjmo47jURmXYYtYbK9jF1c4LaSivShqZQFErG5v6202kIah6PCrTAoGAIBgb
+/dHb8/Blja40sg9QP2kRP3RhClICR7pYpVqEiLLG9oiuIQ1GIG34Fo6jnPs6JQYJ
+WQlgmeeOkYdDFDM8zbxBPOexrlt+JRMnLBxrXjLW1s00nI2q4bOQwr+FuzJuCYXU
+bCgElVr/JGh7VCTl5xCbIS2xpHvsuBdUooNc/LkCgYEA30Cq6u/N4/ZwhVyvknwx
+FiHrJGPBJs4S8c+08+i7ibJwIK9FlPG8p2uYN61TIsKJoY6dxhXXvbD7gK/yASRI
+xb6ES165nFWLg64ZFSu1f/+bgTZh6Q72M9G95rhyY9a8sjcqLwVz2HG+Tzz2o2gv
+j3SsjcAObLkiL5/aCsVQoOI=
+-----END PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIID3jCCAsagAwIBAgIUWrwXRVQHXds7nOgU0rDqTrIFymUwDQYJKoZIhvcNAQEL
+BQAwdTELMAkGA1UEBhMCR0IxDzANBgNVBAgMBkxvbmRvbjEPMA0GA1UEBwwGTG9u
+ZG9uMRgwFgYDVQQKDA9HbG9iYWwgU2VjdXJpdHkxFjAUBgNVBAsMDUlUIERlcGFy
+dG1lbnQxEjAQBgNVBAMMCWxvY2FsaG9zdDAgFw0yNTAzMjYwNDIxMzlaGA8yMTI1
+MDMwMjA0MjEzOVowdTELMAkGA1UEBhMCR0IxDzANBgNVBAgMBkxvbmRvbjEPMA0G
+A1UEBwwGTG9uZG9uMRgwFgYDVQQKDA9HbG9iYWwgU2VjdXJpdHkxFjAUBgNVBAsM
+DUlUIERlcGFydG1lbnQxEjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcN
+AQEBBQADggEPADCCAQoCggEBAMBR/h4A8SCShw4+Dc+cuW4UVNYf/JLjPy9tmD21
+qjMfSLU/oePbbddexDrNqPDtuHw4xi/svv6MycFJ0KxCMu9tLeJkBxe7n9/GN6RK
+wLcKY5eCgRScICEnWCFmj54J515eN9tVghWggRpj7PX65h1k2pGiwd9X5rHEZq7p
+C+mXyU8oHTCWeAeXUCP6vziKuyeeOAowOt1dhFcHY9rFf33lA86aq2lf8yeKRLDj
+16L7HokMAmknfyWszO3PtWvAaih/wESC3/6PYiIz+aGf3vg7Y5N9R5j/968QdgTd
+Eq6KRMF0KEMLX0jR6TR63UUgH44vmNKPebluDRnk+qeB7d8CAwEAAaNkMGIwCQYD
+VR0TBAIwADAVBgNVHREEDjAMggpsb2NhbGhvc3Q7MB0GA1UdDgQWBBRFNCOzxJvV
+lMa9GouDEccWhw2lEDAfBgNVHSMEGDAWgBSh1yVsqb4zZZGGNRSO4aeLFTK9xzAN
+BgkqhkiG9w0BAQsFAAOCAQEAwRI51dZcUpkb+uWzF/a4ZNVHOZIFroGba4u0kBKD
+1k0NZSRoEgeFOoPjfwuUVCE/ssfXnPemhxIKadG23e2JHlsseKa2GNrded5QGMqW
+vS0CZCvS1pk8QmBg/1sQxNuhailificBL8Bxr3Ii3Tu3J7splxHJlbYvE/esEbs4
+LMHcX2hyGYCrRkWRxU3DFrrnK6ooi/D1YdPDAkLQTBoDiQmxTxzuJ7n9vON8510U
+9jdjdWy9TcJKbMitmwxSQS7gzfmjXtwtXK77vSOpVQexfiHxODxnb5WBPrLQNqVu
+sfFE0zWpYDC8vMn9KqwkvTbcoolm3nqhVeA/MdLiAfO8sA==
+-----END CERTIFICATE-----