Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fb3ee7b
Check username & password only if authentication is basic
SravanThotakura05 Mar 24, 2025
37faafd
Update authentication mechanism for Solace Write
SravanThotakura05 Mar 24, 2025
fad0957
Update SolaceBroker.java
SravanThotakura05 Mar 24, 2025
02a5a21
Updated integration test
SravanThotakura05 Mar 24, 2025
a458546
Remove duplicated code
SravanThotakura05 Mar 24, 2025
be8d22c
Update SolaceSparkStreamingOAuthIT.java
SravanThotakura05 Mar 24, 2025
d62753c
Update SolaceSparkStreamingOAuthIT.java
SravanThotakura05 Mar 24, 2025
ff48842
Update ContainerResource.java
SravanThotakura05 Mar 24, 2025
d6e6146
Update SolaceSparkStreamingOAuthIT.java
SravanThotakura05 Mar 24, 2025
c01bcc0
Update SolaceDataSourceReaderFactory.java
SravanThotakura05 Mar 24, 2025
18ea8a6
Update SolaceSparkStreamingOAuthIT.java
SravanThotakura05 Mar 24, 2025
7da5adb
Added tests for client certificate authentication without using passw…
SravanThotakura05 Mar 27, 2025
a3ebeb4
Update SolaceSparkStreamingAuthenticationIT.java
SravanThotakura05 Mar 27, 2025
24b03e9
Update SolaceSparkStreamingAuthenticationIT.java
SravanThotakura05 Mar 27, 2025
0078350
Fix SonarQube Issues
SravanThotakura05 Mar 27, 2025
387bda5
Update SolaceSparkStreamingOAuthIT.java
SravanThotakura05 Mar 27, 2025
1084d12
Update SolaceSparkStreamingOAuthIT.java
SravanThotakura05 Mar 27, 2025
748643f
Update SolaceInputPartitionReader.java
SravanThotakura05 Mar 27, 2025
3d85f59
Update SolaceSparkStreamingOAuthIT.java
SravanThotakura05 Mar 27, 2025
b6088c0
Update SolaceSparkStreamingMessageReplayIT.java
SravanThotakura05 Mar 27, 2025
ccce8dc
Update SolaceSparkStreamingMessageReplayIT.java
SravanThotakura05 Mar 27, 2025
f3ce2b5
Update SolaceSparkStreamingOAuthIT.java
SravanThotakura05 Mar 27, 2025
95b4768
Update SolaceSparkStreamingMessageReplayIT.java
SravanThotakura05 Mar 27, 2025
277c1e0
Update SolaceSparkStreamingOAuthIT.java
SravanThotakura05 Mar 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<packaging>pom</packaging>
<name>PubSubPlus Connector Spark - Parent</name>
<description>Solace PubSub+ Connector for Spark streams data from Solace PubSub+ broker to Spark Data Sources.</description>
<url>https://solace.com</url>
<url>https://solace.com/integration-hub/apache-spark/</url>

<modules>
<module>pubsubplus-connector-spark_3.x</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.solacecoe.connectors.spark.streaming.solace.LVQEventListener;
import com.solacecoe.connectors.spark.streaming.solace.SolaceBroker;
import com.solacecoe.connectors.spark.streaming.solace.exceptions.SolaceInvalidPropertyException;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacecoe.connectors.spark.streaming.solace.utils.SolaceUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.SparkEnv;
Expand Down Expand Up @@ -48,48 +48,7 @@ public SolaceMicroBatch(Map<String, String> properties) {
log.info("SolaceSparkConnector - Initializing Solace Spark Connector");
// Initialize classes required for Solace connectivity

// User configuration validation
if(!properties.containsKey(SolaceSparkStreamingProperties.HOST) || properties.get(SolaceSparkStreamingProperties.HOST) == null || properties.get(SolaceSparkStreamingProperties.HOST).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Host name in configuration options");
}
if(!properties.containsKey(SolaceSparkStreamingProperties.VPN) || properties.get(SolaceSparkStreamingProperties.VPN) == null || properties.get(SolaceSparkStreamingProperties.VPN).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
}

if(properties.containsKey(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME) &&
properties.get(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME).equals(JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2)) {
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN)) {
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Authentication Server URL");
}

if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client ID");
}

if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Credentials Secret");
}

String clientCertificate = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_CLIENT_CERTIFICATE, null);
if(clientCertificate != null) {
String trustStoreFilePassword = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_TRUSTSTORE_PASSWORD, null);
if (trustStoreFilePassword == null || trustStoreFilePassword.isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client TrustStore Password. If TrustStore file path is not configured, please provide password for default java truststore");
}
}
} else if(properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null) == null || properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide valid access token input");
}
} else {
if (!properties.containsKey(SolaceSparkStreamingProperties.USERNAME) || properties.get(SolaceSparkStreamingProperties.USERNAME) == null || properties.get(SolaceSparkStreamingProperties.USERNAME).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Username in configuration options");
}

if (!properties.containsKey(SolaceSparkStreamingProperties.PASSWORD) || properties.get(SolaceSparkStreamingProperties.PASSWORD) == null || properties.get(SolaceSparkStreamingProperties.PASSWORD).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Password in configuration options");
}
}
SolaceUtils.validateCommonProperties(properties);

if(!properties.containsKey(SolaceSparkStreamingProperties.QUEUE) || properties.get(SolaceSparkStreamingProperties.QUEUE) == null || properties.get(SolaceSparkStreamingProperties.QUEUE).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Queue in configuration options");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@ public SolaceDataSourceReaderFactory(boolean includeHeaders, String lastKnownOff

@Override
public PartitionReader<InternalRow> createReader(InputPartition partition) {
TaskContext taskCtx = TaskContext.get();
String queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY());
String batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY());
SolaceInputPartition solaceInputPartition = (SolaceInputPartition) partition;
log.info("SolaceSparkConnector - Creating reader for input partition reader factory with query id {}, batch id {}, task id {} and partition id {}", queryId, batchId, taskCtx.taskAttemptId(), taskCtx.partitionId());
return new SolaceInputPartitionReader(solaceInputPartition, includeHeaders, lastKnownOffset, properties, taskCtx, checkpoints);
try {
TaskContext taskCtx = TaskContext.get();
String queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY());
String batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY());
SolaceInputPartition solaceInputPartition = (SolaceInputPartition) partition;
log.info("SolaceSparkConnector - Creating reader for input partition reader factory with query id {}, batch id {}, task id {} and partition id {}", queryId, batchId, taskCtx.taskAttemptId(), taskCtx.partitionId());
return new SolaceInputPartitionReader(solaceInputPartition, includeHeaders, lastKnownOffset, properties, taskCtx, checkpoints);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i

@Override
public boolean next() {
if(this.solaceBroker != null && this.solaceBroker.isException()) {
throw new SolaceSessionException(this.solaceBroker.getException());
}
solaceMessage = getNextMessage();
return solaceMessage != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public SolaceBroker(Map<String, String> properties, String clientType) {
scheduleOAuthRefresh(interval);
}
} else {
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, properties.get(SolaceSparkStreamingProperties.USERNAME)); // client-username
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, properties.get(SolaceSparkStreamingProperties.PASSWORD)); // client-password
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, properties.getOrDefault(SolaceSparkStreamingProperties.USERNAME, "")); // client-username
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, properties.getOrDefault(SolaceSparkStreamingProperties.PASSWORD, "")); // client-password
}

this.uniqueName = JCSMPFactory.onlyInstance().createUniqueName("solace/spark/connector/"+clientType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.solacecoe.connectors.spark.streaming.properties.SolaceHeaderMeta;
import com.solacecoe.connectors.spark.streaming.properties.SolaceSparkStreamingProperties;
import com.solacecoe.connectors.spark.streaming.solace.exceptions.SolaceInvalidPropertyException;
import com.solacesystems.jcsmp.*;

import java.io.Serializable;
Expand All @@ -13,6 +14,51 @@

public class SolaceUtils {

public static void validateCommonProperties(Map<String, String> properties) {
// User configuration validation
if(!properties.containsKey(SolaceSparkStreamingProperties.HOST) || properties.get(SolaceSparkStreamingProperties.HOST) == null || properties.get(SolaceSparkStreamingProperties.HOST).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Host name in configuration options");
}
if(!properties.containsKey(SolaceSparkStreamingProperties.VPN) || properties.get(SolaceSparkStreamingProperties.VPN) == null || properties.get(SolaceSparkStreamingProperties.VPN).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
}

if(properties.containsKey(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME) &&
properties.get(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME).equals(JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2)) {
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN)) {
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Authentication Server URL");
}

if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client ID");
}

if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Credentials Secret");
}

String clientCertificate = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_CLIENT_CERTIFICATE, null);
if(clientCertificate != null) {
String trustStoreFilePassword = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_TRUSTSTORE_PASSWORD, null);
if (trustStoreFilePassword == null || trustStoreFilePassword.isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client TrustStore Password. If TrustStore file path is not configured, please provide password for default java truststore");
}
}
} else if(properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null) == null || properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide valid access token input");
}
} else if(properties.containsKey(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME) &&
properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC).equals(JCSMPProperties.AUTHENTICATION_SCHEME_BASIC)){
if (!properties.containsKey(SolaceSparkStreamingProperties.USERNAME) || properties.get(SolaceSparkStreamingProperties.USERNAME) == null || properties.get(SolaceSparkStreamingProperties.USERNAME).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Username in configuration options");
}

if (!properties.containsKey(SolaceSparkStreamingProperties.PASSWORD) || properties.get(SolaceSparkStreamingProperties.PASSWORD) == null || properties.get(SolaceSparkStreamingProperties.PASSWORD).isEmpty()) {
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Password in configuration options");
}
}
}
public static String getMessageID(BytesXMLMessage message, String solaceOffsetIndicator) throws SDTException {
switch (solaceOffsetIndicator) {
case "CORRELATION_ID":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.solacecoe.connectors.spark.streaming.write;

import com.solacecoe.connectors.spark.streaming.solace.utils.SolaceUtils;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
Expand All @@ -22,24 +23,7 @@ public SolaceStreamingWrite(StructType schema, Map<String, String> properties, C
this.properties = properties;
this.options = options;

if(!properties.containsKey("host") || properties.get("host") == null || properties.get("host").isEmpty()) {
log.error("SolaceSparkConnector - Please provide Solace Host name in configuration options");
throw new RuntimeException("SolaceSparkConnector - Please provide Solace Host name in configuration options");
}
if(!properties.containsKey("vpn") || properties.get("vpn") == null || properties.get("vpn").isEmpty()) {
log.error("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
throw new RuntimeException("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
}

if(!properties.containsKey("username") || properties.get("username") == null || properties.get("username").isEmpty()) {
log.error("SolaceSparkConnector - Please provide Solace Username in configuration options");
throw new RuntimeException("SolaceSparkConnector - Please provide Solace Username in configuration options");
}

if(!properties.containsKey("password") || properties.get("password") == null || properties.get("password").isEmpty()) {
log.error("SolaceSparkConnector - Please provide Solace Password in configuration options");
throw new RuntimeException("SolaceSparkConnector - Please provide Solace Password in configuration options");
}
SolaceUtils.validateCommonProperties(properties);

// if(!properties.containsKey("topic") || properties.get("topic") == null || properties.get("topic").isEmpty()) {
// log.error("SolaceSparkConnector - Please provide Solace Queue name in configuration options");
Expand Down
Loading
Loading