Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified en/docs/assets/attachments/connectors/kafka-connector.zip
Binary file not shown.
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -8,67 +8,96 @@ Given below is a sample API that illustrates how you can connect to a Kafka brok

API has the `/publishMessages` context. It publishes messages via the topic to the Kafka server.

## Set up Kafka
## Prerequisites

Before you begin, set up Kafka by following the instructions in [Setting up Kafka](setting-up-kafka.md).
- Set up MI by following the instructions in [Setting up Kafka](setting-up-kafka.md)
- Set up Confluent by following the [Confluent documentation](https://docs.confluent.io/platform/current/installation/overview.html).

## Set up the integration project

Follow the steps in the [create integration project]({{base_path}}/develop/create-integration-project/) guide to set up the Integration Project.
## Create the integration logic
Follow the steps below to set up the integration project using the WSO2 Integrator: MI Visual Studio Code extension.

1. Click `+` on the Extension panel APIs to create the REST API.
### Create a new project

2. Specify the API name as `KafkaTransport` and API context as `/publishMessages`. Click create.
Follow the steps in the [create integration project]({{base_path}}/develop/create-integration-project/) guide to set up WSO2 MI and create a new integration project. Use a suitable Project Name for your integration.

<img src="{{base_path}}/assets/img/integrate/connectors/kafka-avro-example-1.png" title="Adding a Rest API" alt="Adding a Rest API" />
### Create a connection

3. Click the `/resource` default endpoint to open the **Resource View**. Then click the `+` arrow below the Start node to open the side panel. Select **Externals** and click **Add new Connection**. Search `kafkaTransport` and click.
<img src="{{base_path}}/assets/img/integrate/connectors/kafka-avro-example-2.png" title="Adding a kafka Connection" alt="Adding a kafka Connection"/>
4. Provide values as below and click **Add**.
1. In the Design View, click the **+** button and select **Connection**.

2. In the search bar, type `Kafka` and select the `Kafka connector` from the list.

<img src="{{base_path}}/assets/img/integrate/connectors/kafka/kafka-avro-create-connection.png" title="Creating a new connection" width="600" alt="Creating a new connection"/>

3. In the Connection Configuration pane, enter the following required information:
- **Connection Name** - Sample_Kafka
- **Connection Type** - kafka
- **Boostrap Servers** - localhost:9092
- **Key Serializer Class** - io.confluent.kafka.serializers.KafkaAvroSerializer
- **Value Serializer Class** - io.confluent.kafka.serializers.KafkaAvroSerializer
- **Schema Registry URL** - http://localhost:8081
- **Max Pool Size** - 100
- **Max Active Connections** - 100

### Create an API

1. Click on the **API** button in create an integration project pane.

<img src="{{base_path}}/assets/img/integrate/connectors/jira/create-api1.png" title="Creating a new API" width="600" alt="Creating a new API"/>

2. Enter the API Name as `KafkaTransport` and the Context as `/publishMessages`, then click **Create**.

<img src="{{base_path}}/assets/img/integrate/connectors/kafka/kafka-avro-example-1.png" title="Creating a new API" width="600" alt="Creating a new API"/>

3. To add the Kafka connector:
- In the **Design View**, click the **+** button.
- In the **Mediator** section, search for `Kafka`.
- Select the **Kafka** connector and click **Download**

### Implement the API

1. Go to the **Source View** of the API by clicking on the **<>** icon in the top right corner of the **Design View**.
<img src="{{base_path}}/assets/img/integrate/connectors/jira/source_view.png" title="Source view of the implemented resource" width="600" alt="Source view of the implemented resource"/>

2. Copy and paste the following code in the **Source View** of the API.

<img src="{{base_path}}/assets/img/integrate/connectors/kafka-avro-example-3.png" title="Create a kafka Connection" alt="Create a kafka Connection"/>
5. You can go to the XML configuration of the API (source view) and copy the following configuration.
??? note "Source view of the implemented resource"
```xml
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishMessages" name="KafkaTransport" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST" uri-template="/">
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishMessages" name="KafkaTransport" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST" uri-template="/">
<inSequence>
<property name="valueSchema" expression="json-eval($.test)" scope="default" type="STRING"/>
<property name="value" expression="json-eval($.value)" scope="default" type="STRING"/>
<property name="key" expression="json-eval($.key)" scope="default" type="STRING"/>
<property name="topic" expression="json-eval($.topic)" scope="default" type="STRING"/>
<kafkaTransport.publishMessages configKey="Sample_Kafka">
<topic>{$ctx:topic}</topic>
<partitionNo>0</partitionNo>
<key>{$ctx:key}</key>
<value>{$ctx:value}</value>
<valueSchema>{$ctx:valueSchema}</valueSchema>
<keySchemaSoftDeleted>false</keySchemaSoftDeleted>
<valueSchemaSoftDeleted>false</valueSchemaSoftDeleted>
</kafkaTransport.publishMessages>
</inSequence>
</resource>
</api>
<property name="valueSchema" expression="json-eval($.test)" scope="default" type="STRING"/>
<property name="value" expression="json-eval($.value)" scope="default" type="STRING"/>
<property name="key" expression="json-eval($.key)" scope="default" type="STRING"/>
<property name="topic" expression="json-eval($.topic)" scope="default" type="STRING"/>
<kafkaTransport.publishMessages configKey="Sample_Kafka">
<topic>{$ctx:topic}</topic>
<partitionNo>0</partitionNo>
<key>{$ctx:key}</key>
<value>{$ctx:value}</value>
<valueSchema>{$ctx:valueSchema}</valueSchema>
<keySchemaSoftDeleted>false</keySchemaSoftDeleted>
<valueSchemaSoftDeleted>false</valueSchemaSoftDeleted>
</kafkaTransport.publishMessages>
</inSequence>
</resource>
</api>
```

## Deployment

Follow these steps to deploy the exported CApp in the Enterprise Integrator Runtime.

**Deploying on WSO2 Integrator: MI**
To deploy and run the project, refer to the [Build and Run]({{base_path}}/develop/deploy-artifacts/#build-and-run) guide.

## Test

Invoke the API (http://localhost:8290/publishMessages) with the following payload,
Invoke the API as shown below using the MI VSCode Extension.

<img src="{{base_path}}/assets/img/integrate/connectors/common/runtime-services.png" title="Runtime services" width="600" alt="Runtime services"/>

### Sample request:

- Content-Type: application/json
- Request body:

````json
{
Expand All @@ -91,19 +120,18 @@ Invoke the API (http://localhost:8290/publishMessages) with the following payloa
````

**Expected Response**:
!!!info
Refer to the [confluent documentation](https://docs.confluent.io/platform/current/installation/overview.html) for installing confluent.


Run the following command to verify the messages:
````bash
[confluent_home]/bin/kafka-avro-console-consumer.sh --topic myTopic --bootstrap-server localhost:9092 --property print.key=true --from-beginning
````

See the following message content:
````json
{"f1":{"string":"sampleValue"}}
````
Sample Connection configuration when the Confluent Schema Registry is secured with basic auth

> NOTE: Connection configuration for the Confluent Schema Registry is secured with basic auth
```xml
<?xml version="1.0" encoding="UTF-8"?>
<localEntry key="Sample_Kafka" xmlns="http://ws.apache.org/ns/synapse">
Expand All @@ -115,26 +143,22 @@ Sample Connection configuration when the Confluent Schema Registry is secured wi
<valueSerializerClass>io.confluent.kafka.serializers.KafkaAvroSerializer</valueSerializerClass>
<schemaRegistryUrl>http://localhost:8081</schemaRegistryUrl>
<basicAuthCredentialsSource>USER_INFO</basicAuthCredentialsSource>
<basicAuthUserInfo>admin:admi</basicAuthUserInfo>
<maxPoolSize>100</maxPoolSize>
<basicAuthUserInfo>admin:admin</basicAuthUserInfo>
<maxActiveConnections>100</maxActiveConnections>
<poolingEnabled>false</poolingEnabled>
</kafkaTransport.init>
</localEntry>
```
In the above example, the <b>basicAuthCredentialsSource</b> parameter is configured as <b>USER_INFO</b>. For example, consider a scenario where the <b>basicAuthCredentialsSource</b> parameter is set to <b>URL</b> as follows:

````xml
<basicAuthCredentialsSource>URL</basicAuthCredentialsSource>
````

Then, the <b>schemaRegistryUrl</b> parameter should be configured as shown below.

````xml
<schemaRegistryUrl>http://admin:admin@localhost:8081</schemaRegistryUrl>
````
Refer to the [confluent documentation](https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html) for more details.

This demonstrates how the Kafka connector publishes Avro messages to Kafka brokers.

## What's next

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ API has the context `/publishMessages`. It will publish messages via the topic t

The following diagram illustrates all the required functionality of the Kafka service that you are going to build.

<a href="{{base_path}}/assets/img/integrate/connectors/kafkaconnectorpublishmessage.png"><img src="{{base_path}}/assets/img/integrate/connectors/kafkaconnectorpublishmessage.png" title="KafkaConnector" width="800" alt="KafkaConnector"/></a>
<a href="{{base_path}}/assets/img/integrate/connectors/kafka/kafkaconnectorpublishmessage.png"><img src="{{base_path}}/assets/img/integrate/connectors/kafka/kafkaconnectorpublishmessage.png" title="KafkaConnector" width="800" alt="KafkaConnector"/></a>

If you do not want to configure this yourself, you can simply [get the project](#get-the-project) and run it.

Expand All @@ -20,64 +20,83 @@ Before you begin, set up Kafka by following the instructions in [Setting up Kafk

## Set up the integration project

1. Follow the steps in [create integration project]({{base_path}}/develop/create-integration-project/) guide to set up the Integration Project.
Follow the steps below to set up the integration project using the WSO2 Integrator: MI Visual Studio Code extension.

2. Create a new Kafka connection.
1. Goto `Local Entries` -> `Connections` and click on the `+` sign.
2. Select `KafkaTransport` connector.
<img src="{{base_path}}/assets/img/integrate/connectors/kafka-conn-add-new-connection.png" title="Add new kafka connection" width="800" alt="Add new kafka connection"/>
### Create a new project

3. Use the following values to create the connection.
- Connection Name - `KafkaConnection`
- Connection Type - `kafka`
- Bootstrap Servers - `localhost:9092`
- Key Serializer Class - `org.apache.kafka.common.serialization.StringSerializer`
- Value Serializer Class - `org.apache.kafka.common.serialization.StringSerializer`
- Pooling Enabled - `false`
Follow the steps in the [create integration project]({{base_path}}/develop/create-integration-project/) guide to set up WSO2 MI and create a new integration project. Use a suitable Project Name for your integration.

## Create the integration logic
### Create a connection

1. Select WSO2 Integrator: MI and click on `+` in APIs to create a REST API. Provide `KafkaTransport` as name and `publishMessages` as context.
<img src="{{base_path}}/assets/img/integrate/connectors/kafka-conn-add-api.png" title="Adding a Rest API" width="800" alt="Adding a Rest API"/>
1. In the Design View, click the **+View More** button and select **Connection**.

2. Create a resource with the below configuration.<br/>
<img src="{{base_path}}/assets/img/integrate/connectors/kafka-conn-add-resource.png" title="Adding API Resource" width="400" alt="Adding API Resource"/>
2. In the search bar, type `Kafka` and select the `Kafka connector` from the list.

3. Select the created resource and add the `PublishMessages` operation.
<img src="{{base_path}}/assets/img/integrate/connectors/kafka-conn-add-operation.png" title="Adding operation" width="800" alt="Adding operation"/>
<img src="{{base_path}}/assets/img/integrate/connectors/kafka/kafka-avro-create-connection.png" title="Creating a new connection" width="600" alt="Creating a new connection"/>

- Use the following values to fill the appearing form.
- Connection - `KafkaConnection`
- Topic - `test`
- Partition Number - `0`
3. In the Connection Configuration pane, enter the following required information:
- **Connection Name** - KafkaConnection
- **Connection Type** - kafka
- **Boostrap Servers** - localhost:9092
- **Key Serializer Class** - org.apache.kafka.common.serialization.StringSerializer
- **Value Serializer Class** - org.apache.kafka.common.serialization.StringSerializer
- **Pooling Enabled** - false

<img src="{{base_path}}/assets/img/integrate/connectors/kafka-conn-config-operation.png" title="Configure operation" width="400" alt="Configure operation"/>
### Create an API

The source view of the XML configuration file of the API will be as below.
1. Click on the **API** button in create an integration project pane.

```xml
<?xml version="1.0" encoding="UTF-8"?>
<img src="{{base_path}}/assets/img/integrate/connectors/jira/create-api1.png" title="Creating a new API" width="600" alt="Creating a new API"/>

2. Enter the API Name as `KafkaTransport` and the Context as `/publishMessages`, then click **Create**.

<img src="{{base_path}}/assets/img/integrate/connectors/kafka/kafka-avro-example-1.png" title="Creating a new API" width="600" alt="Creating a new API"/>

3. To add the Kafka connector:
- In the **Design View**, click the **+** button.
- In the **Mediator** section, search for `Kafka`.
- Select the **Kafka** connector and click **Download**

### Implement the API

1. Go to the **Source View** of the API by clicking on the **<>** icon in the top right corner of the **Design View**.
<img src="{{base_path}}/assets/img/integrate/connectors/jira/source_view.png" title="Source view of the implemented resource" width="600" alt="Source view of the implemented resource"/>

2. Copy and paste the following code in the **Source View** of the API.

??? note "Source view of the implemented resource"
```xml
<?xml version="1.0" encoding="UTF-8"?>
<api context="/publishMessages" name="KafkaTransport" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST">
<resource methods="POST" uri-template="/">
<inSequence>
<kafkaTransport.publishMessages configKey="KafkaConnection">
<topic>test</topic>
<partitionNo>0</partitionNo>
<key></key>
<keySchema></keySchema>
<keySchemaId></keySchemaId>
<keySchemaMetadata></keySchemaMetadata>
<valueSchema></valueSchema>
<valueSchemaId></valueSchemaId>
<keySchemaSubject></keySchemaSubject>
<keySchemaVersion></keySchemaVersion>
<keySchemaSoftDeleted>false</keySchemaSoftDeleted>
<valueSchemaSubject></valueSchemaSubject>
<valueSchemaVersion></valueSchemaVersion>
<valueSchemaMetadata></valueSchemaMetadata>
<valueSchemaSoftDeleted>false</valueSchemaSoftDeleted>
<value></value>
<forwardExistingHeaders>None</forwardExistingHeaders>
<customHeaders>[]</customHeaders>
<customHeaderExpression></customHeaderExpression>
</kafkaTransport.publishMessages>
</inSequence>
<faultSequence>
</faultSequence>
</resource>
</resource>
</api>
```

Now, we can export the imported connector and the API into a single CAR application. The CAR application needs to be deployed during server runtime.

## Export integration logic as a carbon application

To export the project, please refer to the [build and export the carbon application]({{base_path}}/develop/deploy-artifacts/#build-and-export-the-carbon-application) guide.
```

## Get the project

Expand All @@ -89,12 +108,14 @@ You can download the ZIP file and extract the contents to get the project code.

## Deployment

To deploy and run the project, please refer to the [build and run]({{base_path}}/develop/deploy-artifacts/#build-and-run) guide.
To deploy and run the project, refer to the [Build and Run]({{base_path}}/develop/deploy-artifacts/#build-and-run) guide.

You can further refer the application deployed through the CLI tool. See the instructions on [managing integrations from the CLI]({{base_path}}/observe-and-manage/managing-integrations-with-micli).

## Test

Invoke the API as shown below using the MI VSCode Extension.

<img src="{{base_path}}/assets/img/integrate/connectors/common/runtime-services.png" title="Runtime services" width="600" alt="Runtime services"/>

**Create a topic**:

Let’s create a topic named `test` with a single partition and only one replica.
Expand All @@ -104,12 +125,15 @@ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-fac
```

**Sample request**:

Send a message to the Kafka broker using a CURL command or sample client.
```bash
curl -X POST -d '{"name":"sample"}' "http://localhost:8290/publishMessages" -H "Content-Type:application/json" -v
```

- Content-Type: application/json
- Request body:
```json
{
"name": "sample"
}
```

**Expected response**:

Navigate to the `<KAFKA_HOME>` and run the following command to verify the messages:
Expand All @@ -120,9 +144,7 @@ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --f
See the following message content:
```bash
{"name":"sample"}
```

This demonstrates how the Kafka connector publishes messages to the Kafka brokers.
```

## What's next

Expand Down