Skip to content

Commit 8fdfb8b

Browse files
DATAGO-97452: Check username & password only if authentication is basic (#28)
1 parent c6197d0 commit 8fdfb8b

18 files changed

+1005
-74
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<packaging>pom</packaging>
1111
<name>PubSubPlus Connector Spark - Parent</name>
1212
<description>Solace PubSub+ Connector for Spark streams data from Solace PubSub+ broker to Spark Data Sources.</description>
13-
<url>https://solace.com</url>
13+
<url>https://solace.com/integration-hub/apache-spark/</url>
1414

1515
<modules>
1616
<module>pubsubplus-connector-spark_3.x</module>

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/SolaceMicroBatch.java

Lines changed: 2 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import com.solacecoe.connectors.spark.streaming.solace.LVQEventListener;
1010
import com.solacecoe.connectors.spark.streaming.solace.SolaceBroker;
1111
import com.solacecoe.connectors.spark.streaming.solace.exceptions.SolaceInvalidPropertyException;
12-
import com.solacesystems.jcsmp.JCSMPProperties;
12+
import com.solacecoe.connectors.spark.streaming.solace.utils.SolaceUtils;
1313
import org.apache.logging.log4j.LogManager;
1414
import org.apache.logging.log4j.Logger;
1515
import org.apache.spark.SparkEnv;
@@ -48,48 +48,7 @@ public SolaceMicroBatch(Map<String, String> properties) {
4848
log.info("SolaceSparkConnector - Initializing Solace Spark Connector");
4949
// Initialize classes required for Solace connectivity
5050

51-
// User configuration validation
52-
if(!properties.containsKey(SolaceSparkStreamingProperties.HOST) || properties.get(SolaceSparkStreamingProperties.HOST) == null || properties.get(SolaceSparkStreamingProperties.HOST).isEmpty()) {
53-
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Host name in configuration options");
54-
}
55-
if(!properties.containsKey(SolaceSparkStreamingProperties.VPN) || properties.get(SolaceSparkStreamingProperties.VPN) == null || properties.get(SolaceSparkStreamingProperties.VPN).isEmpty()) {
56-
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
57-
}
58-
59-
if(properties.containsKey(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME) &&
60-
properties.get(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME).equals(JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2)) {
61-
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN)) {
62-
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL).isEmpty()) {
63-
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Authentication Server URL");
64-
}
65-
66-
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID).isEmpty()) {
67-
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client ID");
68-
}
69-
70-
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET).isEmpty()) {
71-
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Credentials Secret");
72-
}
73-
74-
String clientCertificate = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_CLIENT_CERTIFICATE, null);
75-
if(clientCertificate != null) {
76-
String trustStoreFilePassword = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_TRUSTSTORE_PASSWORD, null);
77-
if (trustStoreFilePassword == null || trustStoreFilePassword.isEmpty()) {
78-
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client TrustStore Password. If TrustStore file path is not configured, please provide password for default java truststore");
79-
}
80-
}
81-
} else if(properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null) == null || properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null).isEmpty()) {
82-
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide valid access token input");
83-
}
84-
} else {
85-
if (!properties.containsKey(SolaceSparkStreamingProperties.USERNAME) || properties.get(SolaceSparkStreamingProperties.USERNAME) == null || properties.get(SolaceSparkStreamingProperties.USERNAME).isEmpty()) {
86-
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Username in configuration options");
87-
}
88-
89-
if (!properties.containsKey(SolaceSparkStreamingProperties.PASSWORD) || properties.get(SolaceSparkStreamingProperties.PASSWORD) == null || properties.get(SolaceSparkStreamingProperties.PASSWORD).isEmpty()) {
90-
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Password in configuration options");
91-
}
92-
}
51+
SolaceUtils.validateCommonProperties(properties);
9352

9453
if(!properties.containsKey(SolaceSparkStreamingProperties.QUEUE) || properties.get(SolaceSparkStreamingProperties.QUEUE) == null || properties.get(SolaceSparkStreamingProperties.QUEUE).isEmpty()) {
9554
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Queue in configuration options");

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceDataSourceReaderFactory.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,15 @@ public SolaceDataSourceReaderFactory(boolean includeHeaders, String lastKnownOff
3131

3232
@Override
3333
public PartitionReader<InternalRow> createReader(InputPartition partition) {
34-
TaskContext taskCtx = TaskContext.get();
35-
String queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY());
36-
String batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY());
37-
SolaceInputPartition solaceInputPartition = (SolaceInputPartition) partition;
38-
log.info("SolaceSparkConnector - Creating reader for input partition reader factory with query id {}, batch id {}, task id {} and partition id {}", queryId, batchId, taskCtx.taskAttemptId(), taskCtx.partitionId());
39-
return new SolaceInputPartitionReader(solaceInputPartition, includeHeaders, lastKnownOffset, properties, taskCtx, checkpoints);
34+
try {
35+
TaskContext taskCtx = TaskContext.get();
36+
String queryId = taskCtx.getLocalProperty(StreamExecution.QUERY_ID_KEY());
37+
String batchId = taskCtx.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY());
38+
SolaceInputPartition solaceInputPartition = (SolaceInputPartition) partition;
39+
log.info("SolaceSparkConnector - Creating reader for input partition reader factory with query id {}, batch id {}, task id {} and partition id {}", queryId, batchId, taskCtx.taskAttemptId(), taskCtx.partitionId());
40+
return new SolaceInputPartitionReader(solaceInputPartition, includeHeaders, lastKnownOffset, properties, taskCtx, checkpoints);
41+
} catch (Exception e) {
42+
throw new RuntimeException(e);
43+
}
4044
}
4145
}

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/partitions/SolaceInputPartitionReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ public SolaceInputPartitionReader(SolaceInputPartition inputPartition, boolean i
9393

9494
@Override
9595
public boolean next() {
96+
if(this.solaceBroker != null && this.solaceBroker.isException()) {
97+
throw new SolaceSessionException(this.solaceBroker.getException());
98+
}
9699
solaceMessage = getNextMessage();
97100
return solaceMessage != null;
98101
}

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/SolaceBroker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ public SolaceBroker(Map<String, String> properties, String clientType) {
9191
scheduleOAuthRefresh(interval);
9292
}
9393
} else {
94-
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, properties.get(SolaceSparkStreamingProperties.USERNAME)); // client-username
95-
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, properties.get(SolaceSparkStreamingProperties.PASSWORD)); // client-password
94+
jcsmpProperties.setProperty(JCSMPProperties.USERNAME, properties.getOrDefault(SolaceSparkStreamingProperties.USERNAME, "")); // client-username
95+
jcsmpProperties.setProperty(JCSMPProperties.PASSWORD, properties.getOrDefault(SolaceSparkStreamingProperties.PASSWORD, "")); // client-password
9696
}
9797

9898
this.uniqueName = JCSMPFactory.onlyInstance().createUniqueName("solace/spark/connector/"+clientType);

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/solace/utils/SolaceUtils.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.solacecoe.connectors.spark.streaming.properties.SolaceHeaderMeta;
44
import com.solacecoe.connectors.spark.streaming.properties.SolaceSparkStreamingProperties;
5+
import com.solacecoe.connectors.spark.streaming.solace.exceptions.SolaceInvalidPropertyException;
56
import com.solacesystems.jcsmp.*;
67

78
import java.io.Serializable;
@@ -13,6 +14,51 @@
1314

1415
public class SolaceUtils {
1516

17+
public static void validateCommonProperties(Map<String, String> properties) {
18+
// User configuration validation
19+
if(!properties.containsKey(SolaceSparkStreamingProperties.HOST) || properties.get(SolaceSparkStreamingProperties.HOST) == null || properties.get(SolaceSparkStreamingProperties.HOST).isEmpty()) {
20+
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Host name in configuration options");
21+
}
22+
if(!properties.containsKey(SolaceSparkStreamingProperties.VPN) || properties.get(SolaceSparkStreamingProperties.VPN) == null || properties.get(SolaceSparkStreamingProperties.VPN).isEmpty()) {
23+
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
24+
}
25+
26+
if(properties.containsKey(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME) &&
27+
properties.get(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME).equals(JCSMPProperties.AUTHENTICATION_SCHEME_OAUTH2)) {
28+
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN)) {
29+
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_URL).isEmpty()) {
30+
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Authentication Server URL");
31+
}
32+
33+
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CLIENT_ID).isEmpty()) {
34+
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client ID");
35+
}
36+
37+
if(!properties.containsKey(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET) == null || properties.get(SolaceSparkStreamingProperties.OAUTH_CLIENT_CREDENTIALS_CLIENTSECRET).isEmpty()) {
38+
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client Credentials Secret");
39+
}
40+
41+
String clientCertificate = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_CLIENT_CERTIFICATE, null);
42+
if(clientCertificate != null) {
43+
String trustStoreFilePassword = properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_AUTHSERVER_TRUSTSTORE_PASSWORD, null);
44+
if (trustStoreFilePassword == null || trustStoreFilePassword.isEmpty()) {
45+
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide OAuth Client TrustStore Password. If TrustStore file path is not configured, please provide password for default java truststore");
46+
}
47+
}
48+
} else if(properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null) == null || properties.getOrDefault(SolaceSparkStreamingProperties.OAUTH_CLIENT_ACCESSTOKEN, null).isEmpty()) {
49+
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide valid access token input");
50+
}
51+
} else if(properties.containsKey(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME) &&
52+
properties.getOrDefault(SolaceSparkStreamingProperties.SOLACE_API_PROPERTIES_PREFIX+ JCSMPProperties.AUTHENTICATION_SCHEME, JCSMPProperties.AUTHENTICATION_SCHEME_BASIC).equals(JCSMPProperties.AUTHENTICATION_SCHEME_BASIC)){
53+
if (!properties.containsKey(SolaceSparkStreamingProperties.USERNAME) || properties.get(SolaceSparkStreamingProperties.USERNAME) == null || properties.get(SolaceSparkStreamingProperties.USERNAME).isEmpty()) {
54+
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Username in configuration options");
55+
}
56+
57+
if (!properties.containsKey(SolaceSparkStreamingProperties.PASSWORD) || properties.get(SolaceSparkStreamingProperties.PASSWORD) == null || properties.get(SolaceSparkStreamingProperties.PASSWORD).isEmpty()) {
58+
throw new SolaceInvalidPropertyException("SolaceSparkConnector - Please provide Solace Password in configuration options");
59+
}
60+
}
61+
}
1662
public static String getMessageID(BytesXMLMessage message, String solaceOffsetIndicator) throws SDTException {
1763
switch (solaceOffsetIndicator) {
1864
case "CORRELATION_ID":

pubsubplus-connector-spark_3.x/src/main/java/com/solacecoe/connectors/spark/streaming/write/SolaceStreamingWrite.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.solacecoe.connectors.spark.streaming.write;
22

3+
import com.solacecoe.connectors.spark.streaming.solace.utils.SolaceUtils;
34
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
45
import org.apache.spark.sql.connector.write.WriterCommitMessage;
56
import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
@@ -22,24 +23,7 @@ public SolaceStreamingWrite(StructType schema, Map<String, String> properties, C
2223
this.properties = properties;
2324
this.options = options;
2425

25-
if(!properties.containsKey("host") || properties.get("host") == null || properties.get("host").isEmpty()) {
26-
log.error("SolaceSparkConnector - Please provide Solace Host name in configuration options");
27-
throw new RuntimeException("SolaceSparkConnector - Please provide Solace Host name in configuration options");
28-
}
29-
if(!properties.containsKey("vpn") || properties.get("vpn") == null || properties.get("vpn").isEmpty()) {
30-
log.error("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
31-
throw new RuntimeException("SolaceSparkConnector - Please provide Solace VPN name in configuration options");
32-
}
33-
34-
if(!properties.containsKey("username") || properties.get("username") == null || properties.get("username").isEmpty()) {
35-
log.error("SolaceSparkConnector - Please provide Solace Username in configuration options");
36-
throw new RuntimeException("SolaceSparkConnector - Please provide Solace Username in configuration options");
37-
}
38-
39-
if(!properties.containsKey("password") || properties.get("password") == null || properties.get("password").isEmpty()) {
40-
log.error("SolaceSparkConnector - Please provide Solace Password in configuration options");
41-
throw new RuntimeException("SolaceSparkConnector - Please provide Solace Password in configuration options");
42-
}
26+
SolaceUtils.validateCommonProperties(properties);
4327

4428
// if(!properties.containsKey("topic") || properties.get("topic") == null || properties.get("topic").isEmpty()) {
4529
// log.error("SolaceSparkConnector - Please provide Solace Queue name in configuration options");

0 commit comments

Comments
 (0)