Skip to content

Commit c5c4b31

Browse files
User Guide updates and enhancements in ack handling (#37)
1 parent 0bd3306 commit c5c4b31

18 files changed

+689
-186
lines changed

pom.xml

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,13 @@
6868
</developer>
6969
</developers>
7070

71+
<distributionManagement>
72+
<site>
73+
<id>solace-pubsubplus-spark-connector-site</id>
74+
<url>https://solace.com/integration-hub/apache-spark</url>
75+
</site>
76+
</distributionManagement>
77+
7178
<dependencyManagement>
7279
<dependencies>
7380
<dependency>
@@ -92,17 +99,17 @@
9299
<dependency>
93100
<groupId>org.apache.spark</groupId>
94101
<artifactId>spark-streaming_2.12</artifactId>
95-
<version>3.5.1</version>
102+
<version>3.5.2</version>
96103
</dependency>
97104
<dependency>
98105
<groupId>org.apache.spark</groupId>
99106
<artifactId>spark-core_2.12</artifactId>
100-
<version>3.5.1</version>
107+
<version>3.5.2</version>
101108
</dependency>
102109
<dependency>
103110
<groupId>org.apache.spark</groupId>
104111
<artifactId>spark-sql_2.12</artifactId>
105-
<version>3.5.1</version>
112+
<version>3.5.2</version>
106113
</dependency>
107114
<dependency>
108115
<groupId>org.apache.logging.log4j</groupId>

src/docs/asciidoc/User-Guide.adoc

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,25 @@ endif::[]
2626

2727
== Getting Started
2828

29-
This guide assumes you are familiar with Spark set up and Spark Structured Streaming concepts. In the following sections we will show how to set up Solace Spark Connector to stream data from Solace to Spark.
29+
This guide assumes you are familiar with Spark set up and Spark Structured Streaming concepts. In the following sections we will show how to set up Solace Spark Connector to stream data from Solace to Spark and publish events from Spark to Solace.
3030

3131
=== Prerequisites
3232

3333
* https://solace.com/products/event-broker/[Solace PubSub+ Event Broker]
34-
* Apache Spark 3.5.1, Scala 2.12
34+
* Apache Spark 3.5.2 and Scala 2.12
35+
36+
=== Supported Platforms
37+
38+
The connector is built on the Spark Structured Streaming API and has been tested on Azure Databricks(15.4 LTS (includes Apache Spark 3.5.0, Scala 2.12) with photon acceleration disabled). Since the Databricks runtime is consistent across all supported cloud platforms(AWS & Google Cloud), it is expected to behave similarly in other Databricks environments. Additionally, the connector has been validated on vanilla Apache Spark, ensuring compatibility with any platform that supports standard Spark deployments.
3539

3640
=== Quick Start common steps
3741

3842
include::{docdir}/../sections/general/quick-start/quick-start.adoc[leveloffset=+2]
3943

4044
NOTE: Above sample code used parquet as example data source. You can configure your required data source to write data.
4145

46+
NOTE: In case of databricks deployment, it is recommended to store and retrieve sensitive credentials from Databricks secrets. Please refer to <<Using Databricks Secret Management>> on how to configure secrets and use them in notebook.
47+
4248
=== Databricks Considerations
4349

4450
In case if you are using Shared compute cluster, make sure your cluster has https://docs.databricks.com/en/data-governance/unity-catalog/manage-privileges/allowlist.html[appropriate permissions] to install connector from maven central and access the jars. Please contact your Databricks administrator for required permissions.
@@ -49,15 +55,15 @@ Solace Spark connector relies on Spark Checkpointing mechanism to resume from la
4955

5056
=== Checkpoint Handling
5157

52-
Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This give us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to communicate checkpoint information from worker nodes to driver node(commit to checkpoint location) as they run on different JVM's.
58+
Starting from version 3.1.0 connector, solace connection is now executed on worker node instead of driver node. This give us the ability to utilize cluster resource efficiently and also improves processing performance. The connector uses Solace LVQ to store checkpoint along with Spark Checkpoint.
5359

5460
NOTE: In case of recovery, connector uses offset state from LVQ to identify last successfully processed messages. Hence, it is recommended not to delete or modify offset state in LVQ.
5561

5662
In some cases, there might be checkpoint failures as spark may fail to write to checkpoint during instance crash or unavailability or other reasons. Though the connector will handle duplicates in most cases, we recommend to keep your downstream systems idempotent.
5763

5864
=== User Authentication
5965

60-
Solace Spark Connector supports Basic and OAuth authentication to Solace. Client Credentials flow is supported when connecting using OAuth.
66+
Solace Spark Connector supports Basic, Client Certificate and OAuth authentication to Solace. Client Credentials flow is supported when connecting using OAuth.
6167

6268
If OAuth server is available use below options to fetch access token from endpoint. For property description please refer to <<Configuration>> section.
6369

@@ -81,18 +87,75 @@ If rotating access token is present in file accessible by connector use below op
8187
[source,scala]
8288
----
8389
spark.readStream.format("solace").option("host", "")
84-
.option("vpn", "default")
90+
.option("vpn", "")
8591
.option("solace.apiProperties.AUTHENTICATION_SCHEME", "AUTHENTICATION_SCHEME_OAUTH2")
8692
.option("solace.oauth.client.access-token", "<absolute-path-to-token-file>")
8793
.option("solace.oauth.client.token.refresh.interval", 110)
8894
----
8995

9096
NOTE: When access token is read from file, it may lose some of it's expiry time by the time it is accessed by connector. It is recommended to have minimal time difference between writing to file and access by the connector so that a valid new token is updated in solace session before expiry of old token.
9197

98+
Below is an example on how to use client certificate authentication when connecting to Solace.
99+
100+
[source,scala]
101+
----
102+
sparkSession.readStream().format("solace")
103+
.option("host", "")
104+
.option("vpn", "default")
105+
.option("username", "")
106+
.option("solace.apiProperties.AUTHENTICATION_SCHEME", "AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE")
107+
.option("solace.apiProperties.SSL_TRUST_STORE", "<path-to-jks-file>")
108+
.option("solace.apiProperties.SSL_TRUST_STORE_FORMAT", "jks")
109+
.option("solace.apiProperties.SSL_TRUST_STORE_PASSWORD", "")
110+
.option("solace.apiProperties.SSL_KEY_STORE", "<path-to-jks-file>")
111+
.option("solace.apiProperties.SSL_KEY_STORE_FORMAT", "jks")
112+
.option("solace.apiProperties.SSL_KEY_STORE_PASSWORD", "")
113+
----
114+
115+
For more properties please refer to https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/constant-values.html#com.solacesystems.jcsmp.JCSMPProperties[Solace Java API documentation for com.solacesystems.jcsmp.JCSMPProperties]
116+
117+
==== Using Databricks Secret Management
118+
119+
If Solace Spark Connector is deployed in Databricks, it is recommended to use Databricks secrets to store sensitive credentials.
120+
121+
To configure secrets refer to the https://docs.databricks.com/aws/en/security/secrets/[Databricks documentation].
122+
123+
You can reference those secrets in your Spark cluster using the same Spark config options:
124+
125+
Below is an example on how to retrieve username and password from Databricks secrets and connect to Solace.
126+
[source,scala]
127+
----
128+
spark.readStream.format("solace").option("host", dbutils.secrets.get(scope = "solace-dev-credentials", key = "host"))
129+
.option("vpn", "default")
130+
.option("username", dbutils.secrets.get(scope = "solace-dev-credentials", key = "username"))
131+
.option("password", dbutils.secrets.get(scope = "solace-dev-credentials", key = "password"))
132+
----
133+
134+
OAuth based authentication to Solace using Databricks secrets. The certificates can be stored in cloud object storage, and you can restrict access to the certificates only to cluster that can access Solace. See https://docs.databricks.com/aws/en/data-governance/[Data governance with Unity Catalog].
135+
136+
[source,scala]
137+
----
138+
spark.readStream.format("solace").option("host", dbutils.secrets.get(scope = "solace-dev-credentials", key = "host"))
139+
.option("vpn", "default")
140+
.option("solace.apiProperties.AUTHENTICATION_SCHEME", "AUTHENTICATION_SCHEME_OAUTH2")
141+
.option("solace.oauth.client.auth-server-url", "")
142+
.option("solace.oauth.client.client-id", dbutils.secrets.get(scope = "solace-dev-credentials", key = "client-id"))
143+
.option("solace.oauth.client.credentials.client-secret", dbutils.secrets.get(scope = "solace-dev-credentials", key = "client-secret"))
144+
.option("solace.oauth.client.auth-server.client-certificate.file", "")
145+
.option("solace.oauth.client.auth-server.truststore.file", "")
146+
.option("solace.oauth.client.auth-server.truststore.password", dbutils.secrets.get(scope = "solace-dev-credentials", key = "truststore-password"))
147+
.option("solace.oauth.client.auth-server.ssl.validate-certificate", false)
148+
.option("solace.oauth.client.token.refresh.interval", 110)
149+
----
150+
92151
=== Message Replay
93152

94153
Solace Spark Connector can replay messages using Solace Replay Log. Connector can replay all messages or after specific replication group message id or after specific timestamp. Please refer to https://docs.solace.com/Features/Replay/Msg-Replay-Concepts-Config.htm[Message Replay Configuration] to enable replay log in Solace PubSub+ broker.
95154

155+
=== Parallel Processing
156+
157+
The Solace Spark Connector supports automatic scaling of consumers based on the number of worker nodes or can be configured to use a fixed number of consumers. To control this behavior, use the partition property in the Solace Spark Connector Source configuration. Setting this property to 0 enables automatic scaling, where the number of consumers matches the number of worker nodes.
158+
96159
=== Solace Spark Streaming Source Schema Structure
97160

98161
Solace Spark Connector transforms the incoming message to Spark row with below schema definition.
@@ -169,8 +232,6 @@ include::{docdir}/../sections/general/configuration/solace-spark-source-config.a
169232

170233
include::{docdir}/../sections/general/configuration/solace-spark-sink-config.adoc[leveloffset=+2]
171234

172-
NOTE: This connector is tested on Databricks environment with Cluster Version 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)
173-
174235
== License
175236

176237
This project is licensed under the Solace Community License, Version 1.0. - See the `LICENSE` file for details.

src/docs/sections/general/configuration/solace-spark-source-config.adoc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ solace.apiProperties.client_channel_properties.keepAliveIntervalInMillis=3000
152152
| int
153153
| any
154154
| 1
155-
| Set number of messages to be processed in batch. The connector can stream data in batches to Spark based on configured size.
155+
| Set number of messages to be processed in batch. The connector can stream data in batches to Spark based on configured size. For optimal throughput, configure the Solace queue's 'Maximum Delivered Unacknowledged Messages per Flow' property to a value equal to twice the batch size.
156156

157157
| replayStrategy
158158
| string
@@ -210,7 +210,7 @@ Note: Default value uses replication group message ID property as offset indicat
210210
| int
211211
| any
212212
| 1
213-
| Sets the number of consumers for configured queue. If more the one worker node is present, consumers are split across worker nodes for efficient processing.
213+
| Sets the number of consumers for configured queue. If more the one worker node is present, consumers are split across worker nodes for efficient processing. If set to 0 the connector will create consumers equal to number of worker nodes and will scale if more worker nodes are added.
214214

215215
| createFlowsOnSameSession(deprecated)
216216
| boolean

src/docs/sections/general/quick-start/quick-start.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ NOTE: Before installing latest version of connector make sure earlier versions o
5858
5959
query.awaitTermination()
6060
----
61+
TIP: For optimal throughput, configure the Solace queue's 'Maximum Delivered Unacknowledged Messages per Flow' property to a value equal to twice the batch size.
6162
.. Finally, let's read data from the parquet file from the location configured above
6263
+
6364
[source,scala]

src/main/java/com/solacecoe/connectors/spark/SolaceScan.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ public Batch toBatch() {
4141

4242
@Override
4343
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
44-
return new SolaceMicroBatch(properties);
44+
return new SolaceMicroBatch(properties, checkpointLocation);
4545
}
4646
}

0 commit comments

Comments
 (0)