Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b8a2c6e
Solace Data Source Writer Implementation
SravanThotakura05 Oct 28, 2024
f5cddb0
Extract row and publish to Solace - yet to test
SravanThotakura05 Oct 29, 2024
4ab873a
Basic solace publish is working and added solace header mapping
SravanThotakura05 Oct 30, 2024
b287ceb
Publish acknowledgement callback class configuration and throw except…
SravanThotakura05 Nov 5, 2024
a7e7036
Solace Spark WriteStream - Added ability to schema with optional colu…
SravanThotakura05 Dec 6, 2024
a2a0bc7
Solace WriteStream - Enhancements & Bug fixes
SravanThotakura05 Dec 9, 2024
971ee48
Fix merge commits
SravanThotakura05 Dec 9, 2024
e956224
Remove unnecessary file
SravanThotakura05 Dec 9, 2024
173a8e1
Changed Java version settings
SravanThotakura05 Dec 9, 2024
52a83f1
Added integration tests
SravanThotakura05 Dec 17, 2024
786f0ae
reverted queue type
SravanThotakura05 Dec 17, 2024
764e896
added more unit tests and removed unnecessary code
SravanThotakura05 Dec 17, 2024
6c4f92e
Removed unnecessary files and added integration tests
SravanThotakura05 Dec 17, 2024
5d4cb3d
added integration tests for coverage
SravanThotakura05 Dec 17, 2024
b9aecf6
Fixed failing test
SravanThotakura05 Dec 17, 2024
ba9ec4b
improved integration tests for coverage
SravanThotakura05 Dec 17, 2024
d3f7aca
Fixed failing test
SravanThotakura05 Dec 17, 2024
e0081b6
Reordered tests
SravanThotakura05 Dec 18, 2024
31b875d
Updated integration tests
SravanThotakura05 Dec 18, 2024
316ced0
Improved Integration tests runtime
SravanThotakura05 Dec 18, 2024
9539c6e
Fixed Integration Tests to assert expected exception
SravanThotakura05 Dec 18, 2024
db101d0
Fixed long running integration test
SravanThotakura05 Dec 18, 2024
3b01b8d
Update SolaceSparkStreamingIT.java
SravanThotakura05 Dec 18, 2024
8c95189
Split Integration tests
SravanThotakura05 Dec 18, 2024
16a1630
Updated integration test for code coverage
SravanThotakura05 Dec 18, 2024
556bbfd
Update SolaceSparkStreamingSourceIT.java
SravanThotakura05 Dec 18, 2024
7ab269f
Updated Integration Tests for code coverage
SravanThotakura05 Dec 18, 2024
66b6dff
[ci skip] prepare release 3.0.0-EA
actions-user Dec 19, 2024
9e130dc
[ci skip] prepare for next development iteration
actions-user Dec 19, 2024
8853b84
Merge branch 'main' into solace-data-source-write-support
SravanThotakura05 Jan 10, 2025
4ae82a8
Fix merge errors
SravanThotakura05 Jan 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .flattened-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.solacecoe.connectors</groupId>
<artifactId>pubsubplus-connector-spark-parent</artifactId>
<version>2.1.0</version>
<version>3.0.0-EA</version>
<packaging>pom</packaging>
<name>PubSubPlus Connector Spark - Parent</name>
<description>Solace PubSub+ Connector for Spark streams data from Solace PubSub+ broker to Spark Data Sources.</description>
Expand Down Expand Up @@ -36,7 +36,7 @@
<properties>
<maven.compiler.source>17</maven.compiler.source>
<generated-filtered-resource.directory>/home/runner/work/spark-connector-v2/spark-connector-v2/target/generated-filtered-resources</generated-filtered-resource.directory>
<revision>2.1.0</revision>
<revision>3.0.0-EA</revision>
<next-revision>${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.nextIncrementalVersion}</next-revision>
<sha1></sha1>
<maven.compiler.target>17</maven.compiler.target>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
Project Versioning Properties
===================================
-->
<revision>2.1.1</revision>
<revision>3.0.0</revision>
<sha1/> <!-- Doesn't actually need to be a sha1, this is just another version modifier variable -->
<changelist>-SNAPSHOT</changelist>
<next-revision>${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.nextIncrementalVersion}</next-revision>
Expand Down
105 changes: 105 additions & 0 deletions pubsubplus-connector-spark_3.x/.flattened-pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>com.solacecoe.connectors</groupId>
<artifactId>pubsubplus-connector-spark</artifactId>
<version>3.0.0-EA</version>
<name>pubsubplus-connector-spark</name>
<description>Solace PubSub+ Connector for Spark streams data from Solace PubSub+ broker to Spark Data Sources.</description>
<url>https://solace.com/pubsubplus-connector-spark</url>
<inceptionYear>2022</inceptionYear>
<organization>
<name>Solace Corporation</name>
<url>https://solace.com</url>
</organization>
<licenses>
<license>
<name>License Agreement for Solace Software</name>
<url>https://solace.com/license-software</url>
<distribution>repo</distribution>
</license>
</licenses>
<developers>
<developer>
<name>Solace</name>
<email>info@solace.com</email>
<organization>Solace Corporation</organization>
<organizationUrl>https://solace.com</organizationUrl>
</developer>
</developers>
<scm>
<connection>scm:git:git@github.com:SolaceDev/spark-connector-v2.git/pubsubplus-connector-spark</connection>
<developerConnection>scm:git:git@github.com:SolaceDev/spark-connector-v2.git/pubsubplus-connector-spark</developerConnection>
<url>https://github.com/SolaceDev/spark-connector-v2.git/pubsubplus-connector-spark</url>
</scm>
<properties>
<docs.build.directory>/home/runner/work/spark-connector-v2/spark-connector-v2/pubsubplus-connector-spark_3.x/target/generated-docs</docs.build.directory>
<maven.compiler.source>8</maven.compiler.source>
<shared-remote-resources.directory>/home/runner/work/spark-connector-v2/spark-connector-v2/pubsubplus-connector-spark_3.x/target/shared-remote-resources</shared-remote-resources.directory>
<shared-remote-resources.process.asciidoctor-resources.skip>false</shared-remote-resources.process.asciidoctor-resources.skip>
<release-notes.url>https://solace.com/pubsubplus-connector-spark</release-notes.url>
<shared-remote-resources.build.main.processed.directory>/home/runner/work/spark-connector-v2/spark-connector-v2/pubsubplus-connector-spark_3.x/target/shared-remote-resources/build/main/processed</shared-remote-resources.build.main.processed.directory>
<generated-filtered-resource.directory>/home/runner/work/spark-connector-v2/spark-connector-v2/pubsubplus-connector-spark_3.x/target/generated-filtered-resources</generated-filtered-resource.directory>
<revision>3.0.0-EA</revision>
<next-revision>${parsedVersion.majorVersion}.${parsedVersion.minorVersion}.${parsedVersion.nextIncrementalVersion}</next-revision>
<sha1></sha1>
<maven.compiler.target>8</maven.compiler.target>
<shared-remote-resources.build.main.raw.directory>/home/runner/work/spark-connector-v2/spark-connector-v2/pubsubplus-connector-spark_3.x/target/shared-remote-resources/build/main/raw</shared-remote-resources.build.main.raw.directory>
<changelist></changelist>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.solacesystems</groupId>
<artifactId>sol-jcsmp</artifactId>
<version>10.24.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.23.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.23.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.nimbusds</groupId>
<artifactId>oauth2-oidc-sdk</artifactId>
<version>11.19.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
50 changes: 47 additions & 3 deletions pubsubplus-connector-spark_3.x/src/docs/asciidoc/User-Guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -89,37 +89,81 @@ NOTE: When access token is read from file, it may lose some of it's expiry time

Solace Spark Connector can replay messages using Solace Replay Log. Connector can replay all messages or after specific replication group message id or after specific timestamp. Please refer to https://docs.solace.com/Features/Replay/Msg-Replay-Concepts-Config.htm[Message Replay Configuration] to enable replay log in Solace PubSub+ broker.

=== Solace Spark Streaming Schema Structure
=== Solace Spark Streaming Source Schema Structure

Solace Spark Connector transforms the incoming message to Spark row with below schema definition.

[cols="2m,2m", options="header"]
[cols="2m,2m,2m", options="header"]
|===
| Column Name
| Column Type
| Description

| Id
| String
| Represents Message ID present in message. This value is based on Offset_Indicator option. By, default it returns replication group message id.

| Payload
| Binary
| Represents payload in binary format.

| PartitionKey
| String
| Represents Partition Key if present in message.

| Topic
| String
| Represents the topic on which message is published.

| TimeStamp
| Timestamp
| Represents sender timestamp if present in message. By, default it returns the timestamp when message is received by connector.

| Headers
| Map<string, binary>
| Represent message headers if present in message. This column is created only when includeHeaders option is set to true.
|===

=== Solace Spark Streaming Sink Schema Structure

Solace Spark Connector transforms the incoming message to Spark row with below schema definition.

[cols="2m,2m,2m", options="header"]
|===
| Column Name
| Column Type
| Description

| Id
| String
| Set the message id for the published message. This will be overwritten if message id is set using the id option. If no message id is set connector will throw an exception as message id is required to track the state of published messages. In case of publish failure the message id along with exception is logged.

| Payload
| Binary
| Payload to be added to the published message. If no payload is set connector will throw an exception.

| PartitionKey(Optional)
| String
| Partition Key for the published message. Useful when published message topic is subscribed by partitioned queues.

| Topic(Optional)
| String
| Set's the topic for the published message. This will be overwritten if topic is set using the topic option. If no topic is set connector will throw an exception as topic is required to publish a message.

| TimeStamp(Optional)
| Timestamp
| Set the timestamp for published message. This column is mapped to Sender Timestamp field in Solace Message.

| Headers(Optional)
| Map<string, binary>
| Set the headers to be added in published message. This column is mapped only when includeHeaders option is set to true.
|===

== Configuration

include::{docdir}/../sections/general/configuration/spark-config.adoc[leveloffset=+2]
include::{docdir}/../sections/general/configuration/solace-spark-source-config.adoc[leveloffset=+2]

include::{docdir}/../sections/general/configuration/solace-spark-sink-config.adoc[leveloffset=+2]

NOTE: This connector is tested on Databricks environment with Cluster Version 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
= Solace Spark Connector Sink Configuration Options
:doctype: book

[cols="2m,1m,1m,1m,2", options="header"]
|===
| Config Option
| Type
| Valid Values
| Default Value
| Description

| host
| string
| any
|
| Fully Qualified Solace Hostname with protocol and port number.

| vpn
| string
| any
|
| Solace VPN name.

| username
| String
| any
|
| Solace Client Username.

| password
| string
| any
|
| Solace Client Username password.

| connectRetries
| int
| (-1) or greater
| 0
| The number of times to attempt and retry a connection during initial connection setup. Zero means no automatic connection retries (that is, try once and give up). -1 means "retry forever".

| reconnectRetries[[reconnect-retries]]
| int
| (-1) or greater
| 3
| The number of times to attempt to reconnect. Zero means no automatic reconnection retries (that is, try once and give up). -1 means "retry forever".

| connectRetriesPerHost
| int
| (-1) or greater
| 0
| When using a host list for the HOST property, this property defines how many times to try to connect or reconnect to a single host before moving to the next host in the list. NOTE: This property works in conjunction with the connect and reconnect retries settings; it does not replace them. Valid values are >= -1. 0 means make a single connection attempt (that is, 0 retries). -1 means attempt an infinite number of reconnect retries (that is, the API only tries to connect or reconnect to first host listed.)

| reconnectRetryWaitInMillis
| int
| 0 - 60000
| 3000
| How much time in (MS) to wait between each attempt to connect or reconnect to a host. If a connect or reconnect attempt to host is not successful, the API waits for the amount of time set for reconnectRetryWaitInMillis, and then makes another connect or reconnect attempt.

| solace.apiProperties.<Property>
| any
| any
|
a| Any additional Solace Java API properties can be set through configuring solace.apiProperties.<Property> where <Property> is the name of the property as defined in the https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/constant-values.html#com.solacesystems.jcsmp.JCSMPProperties[Solace Java API documentation for com.solacesystems.jcsmp.JCSMPProperties], for example:
[source,yaml]
----
solace.apiProperties.pub_ack_window_size=50
----

| solace.oauth.client.access-token
| string
| absolute file path to token file
| empty
| Set this configuration, if rotating access token is present in file. In this case connector will read access token directly from file instead of sending request to OAuth Server. Please note <<solace-oAuth-client-refresh-interval, Solace OAuth Client Refresh Interval>> should be set to read access token from file at regular intervals.

| solace.oauth.client.auth-server-url
| string
| any
| empty
| Full representation of token endpoint to fetch access token.

| solace.oauth.client.client-id
| string
| any
| empty
| OAuth Client ID

| solace.oauth.client.credentials.client-secret
| string
| any
| empty
| OAuth Client Secret

| solace.oauth.client.auth-server.client-certificate.file
| string
| any
| empty
| Absolute path to X.509 client certificate file for TLS connections. Make sure file path is accessible by the connector.

| solace.oauth.client.auth-server.truststore.file
| string
| any
| empty
| Absolute path to trust store file for TLS connections. This property works in two ways

1. If JKS file is available in cluster configure absolute path so that connector will load the JKS file.

2. If solace.oauth.client.auth-server.client-certificate.file is configured simply provide a path(should include file name as well). The connector will load the client certificate to key store and saves to JKS file .

| solace.oauth.client.auth-server.truststore.password
| string
| any
| empty
| Password for JKS file. This property works in two ways

1. If JKS file is available in cluster provide the password to JKS file.

2. If solace.oauth.client.auth-server.client-certificate.file is configured simply provide a password which will be used to protect the JKS file created in above configuration option 2.

| solace.oauth.client.auth-server.ssl.validate-certificate
| boolean
| any
| true
| Boolean value to enable or disable ssl certificate validation. If set to false connector will send TLS request without any validation.

| solace.oauth.client.auth-server.tls.version
| string
| SSL, TLS, TLSv1, TLSv1.1, TLSv1.2, TLSv1.3
| TLSv1.2
| Indicate the type of SSL connection.

| solace.oauth.client.token.refresh.interval[[solace-oAuth-client-refresh-interval]]
| integer
| positive integer value
| 60
| Interval(Seconds) to fetch access token by the connector to avoid disconnection on token expiry. This value should be less than your token expiry time.

| solace.oauth.client.token.fetch.timeout
| integer
| positive integer value
| 100
| Connection timeout(MS) for access token request.

| topic
| string
| any
|
| Sets the topic that all rows will be published to Solace. This option overrides any topic column that may exist in the data.

| id
| string
| any
|
| Sets the message id to all the messages published to Solace. This option overrides any Id column that may exist in the data.

| batchSize
| int
| any
| 1
| Set number of messages to be processed in batch. This should be set to dataframe.count().

| includeHeaders
| boolean
| true or false
| false
| Set this value to true if message headers in the row need to be added to published message.

|===
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
= Solace Spark Connector Configuration Options
= Solace Spark Connector Source Configuration Options
:doctype: book

[cols="2m,1m,1m,1m,2", options="header"]
Expand Down
Loading
Loading