Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added LICENSE.pdf
Binary file not shown.
92 changes: 8 additions & 84 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# Getting Started
## Solace Spark Connector
Solace Spark Connector is based on DataSourceV2 API provided in Spark. The following are good places to start

* https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a
Solace Spark Connector is based on Spark DataSourceV2 API provided in Spark.

The "Getting Started" tutorials will get you up to speed and sending messages with Solace technology as quickly as possible. There are three ways you can get started:

Expand All @@ -12,94 +10,20 @@ The "Getting Started" tutorials will get you up to speed and sending messages wi

# Contents

This repository contains code that connects to specified Solace service and inserts data into Spark Internal row. Please note that this connector only support Spark read operation.

## Features

* The current connector enables user to configure Solace service details as required
* It supports processing of specific number of messages in a partition(BatchSize option can be used to configure the number of messages per partition)
* Message are acknowledged to Solace once commit method is triggered from Spark. Commit is triggered from Spark on successful write operation.
* Message IDs are persisted in checkpoint location once commit is successfull. This helps in deduplication of messages and guaranteed processing.
This repository contains code that can stream messages from Solace to Spark Internal row and also publish messages to Solace from Spark.

## Architecture
## User Guide

<p align="center">
<img width="700" height="400" src="https://user-images.githubusercontent.com/83568543/154064846-c1b5025f-9898-4190-90b5-84f05bd7fba9.png"/>
</p>
For complete user guide, please check the connector page on [Solace Integration Hub](https://solace.com/integration-hub/apache-spark/).

# Prerequisites
## Maven Central

Apache Spark 3.5.1, Scala 2.12
The connector is available in maven central as [pubsubplus-connector-spark](https://mvnrepository.com/artifact/com.solacecoe.connectors/pubsubplus-connector-spark)

# Build the connector

`mvn clean install`

# Running the connector

## Databricks environment

Create a Databricks cluster with Runtime with above Spark version and follow the steps below

1. In the libraries section upload the pubsubplus-connector-spark jar and also add the following maven dependencies using install new option available in Databricks cluster libraries section

<p align="center">
<img src="https://github.com/SolaceTechCOE/spark-connector-v3/assets/83568543/b2bc0f68-d80e-422c-8fd3-af89034cdaca"/>
</p>

## Running as a Job
If you are running Spark Connector as a job, use the jar files provided as part of distribution to configure your job. In case of thin jar you need to provide dependencies as in above screenshot for the job. For class name and other connector configuration please refer sample scripts and configuration option sections

## Thin Jar vs Fat Jar
Thin jar is light weight jar where only Spark Streaming API related dependencies are included. In this case Solace and Log4j related dependencies should be added during configuration of Spark Job. Spark supports adding these dependencies via maven or actual jars.

```com.solacesystems:sol-jcsmp:10.21.0``` & ```log4j:log4j:1.2.17```

Fat jar includes all the dependencies and it can be used directly without any additional dependencies.

## Solace Spark Schema

Solace Spark Connector transforms the incoming message to Spark row with below schema.

| Column Name | Column Type |
|-------------|---------------------|
| Id | String |
| Payload | Binary |
| Topic | String |
| TimeStamp | Timestamp |
| Headers | Map<string, binary> |

## Using Sample Script

### Databricks

1. Solace_Read_Stream_Script.txt

Create a new notebook in Databricks environment and set the language to Scala. Copy the Script to notebook and provide the required details. Once started, script reads data from Solace Queue and writes to parquet files.

2. Read_Parquet_Script.txt
This scripts reads the data from parquet and displays the count of records. The output should match the number of records available in Solace queue before processing.

## Spark Job or Spark Submit

Spark Job or Spark Submit requires jar as input. You can convert above Scala scripts to jar and provide it as input and add pubsubplus-connector-solace jar(thin or fat) as dependency. In case of thin jar please note that additional dependencies need to be configured as mentioned in Thin Jar vs Fat Jar section.

## Configuration Options
| Config Option | Type | Valid Values | Default Value | Description |
|--------------------------|---------|------------------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| host | String | tcp(s)://hostname:port | Empty | Fully Qualified Solace Hostname with protocol and port number |
| vpn | String | | Empty | Solace VPN Name |
| username | String | | Empty | Solace Client Username |
| password | String | | Empty | Solace Client Password |
| queue | String | | Empty | Solace Queue name |
| batchSize | Integer | | 1 | Set number of messages to be processed in batch. Default is set to 1 |
| ackLastProcessedMessages | Boolean | | false | Set this value to true if connector needs to determine processed messages in last run. The connector purely depends on offset file generated during Spark commit. In some cases connector may acknowledge the message based on offset but same may not be available in your downstream systems. In general we recommend leaving this false and handle process/reprocess of messages in downstream systems |
| skipDuplicates | Boolean | | false | Set this value to true if connector needs check for duplicates before adding to Spark row. This scenario occurs when the tasks are executing more than expected time and message is not acknowledged before the start of next task. In such cases the message will be added again to Spark row. |
| offsetIndicator | String | | MESSAGE_ID | Set this value if your Solace Message has unique ID in message header. Supported Values are <ul><li> MESSAGE_ID </li><li> CORRELATION_ID </li> <li> APPLICATION_MESSAGE_ID </li> <li> CUSTOM_USER_PROPERTY </li> </ul> CUSTOM_USER_PROPERTY refers to one of headers in user properties header. <br/> <br/> Note: Default value uses replication group message ID property as offset indicator. <br/> <br/> A replication group message ID is an attribute of Solace messages, assigned by the event broker delivering the messages to the queue and topic endpoints, that uniquely identifies a message on a particular queue or topic endpoint within a high availability (HA) group and replication group of event brokers. |
| includeHeaders | Boolean | | false | Set this value to true if message headers need to be included in output |
| partitions | Integer | | 1 | Sets the number of consumers for configured queue. Equal number of partitions are created to process data received from consumers |
| createFlowsOnSameSession | Boolean | | false | If enabled consumer flows are enabled on same session. The number of consumer flows is equal to number of partitions configured. This is helpful when users want to optimize on number of connections created from Spark. By default the connector creates a new connection for each consumer. |

`mvn clean install` to build connector with integration tests
`mvn clean install -DskipTests ` to build connector without integration tests.

# Exploring the Code using IDE

Expand Down
Loading