Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Copy `michelin-connect-plugins.jar` into the ``plugin.path`` folder of your Kafk

## Transformations
- [TimestampMicrosConverter](doc/transforms/timestamp-micros-converter.md)
- [ExpandJsonHeaders](doc/transforms/expand-json-headers.md)
## Predicates
- [HeaderValueMatches](doc/predicates/header-value-matches.md)
## Config Providers
- [AES256ConfigProvider](doc/config-providers/aes256-config-provider.md)
- [AES256ConfigProvider](doc/config-providers/aes256-config-provider.md)
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ dependencies {
// Use JUnit Jupiter for testing.
testImplementation 'org.junit.jupiter:junit-jupiter:5.14.0'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher:1.14.0'

// Dependencies provided by Kafka Connect
compileOnly 'com.fasterxml.jackson.core:jackson-databind:2.16.2'
compileOnly 'org.slf4j:slf4j-api:1.7.36'
}
java {
sourceCompatibility = '11'
Expand Down
83 changes: 83 additions & 0 deletions doc/transforms/expand-json-headers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Expand JSON Headers SMT

A Kafka Connect Single Message Transform (SMT) that takes an existing JSON header and expands each key-value pair into separate individual Kafka message headers. The original JSON header is removed after expansion.

## Use Case

This transform is particularly useful when working with transactional outbox patterns where you have JSON data in a Kafka header that you want to expand into individual headers for better message routing and filtering.

For example, if you have a Kafka header named `headers` containing:
```json
{"userId": "user123", "requestId": "req456", "source": "web"}
```

This SMT will:
1. Extract each key-value pair and create individual headers:
- `userId` = "user123"
- `requestId` = "req456"
- `source` = "web"
2. Remove the original `headers` header

## Configuration

| Property | Type | Default | Description |
|----------|------|---------|-------------|
| `header.field` | String | `headers` | Header name containing the JSON map |

## Usage with Debezium EventRouter

```json
{
"name": "outbox-connector-with-headers",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"plugin.name": "pgoutput",
"table.include.list": "public.outbox_messages",
"transforms": "outbox,addHeaders,expandHeaders",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.expand.json.payload": "true",
"transforms.outbox.table.field.event.key": "partition_key",
"transforms.outbox.route.by.field": "topic",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"transforms.outbox.table.fields.additional.placement": "headers:headers",
"transforms.expandHeaders.type": "com.michelin.connect.transforms.ExpandJsonHeaders",
"transforms.expandHeaders.header.field": "headers",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8085"
}
}
```
## Examples

### Basic Usage
```json
"transforms": "expandHeaders",
"transforms.expandHeaders.type": "com.michelin.connect.transforms.ExpandJsonHeaders"
```

### With Custom Field Name
```json
"transforms": "expandHeaders",
"transforms.expandHeaders.type": "com.michelin.connect.transforms.ExpandJsonHeaders",
"transforms.expandHeaders.header.field": "metadata"
```

## Behavior

- **Input**: A Kafka header containing JSON map data
- **Processing**: Parses the JSON and creates individual headers for each key-value pair
- **Output**: Individual headers with the original JSON header removed
- **Error Handling**: If the header is missing, invalid JSON, or not a JSON object, the record is passed through unchanged

## Error Handling

The transform is designed to be resilient:
- If the specified header is missing, the record is passed through unchanged
- If the JSON is invalid, the record is passed through unchanged
- If the header is not a JSON object, the record is passed through unchanged
- Errors are logged but do not cause the connector to fail
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.michelin.kafka.connect.transforms;

import java.util.NoSuchElementException;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Iterator;
import java.util.Map;

/**
* <p>Kafka Connect Single Message Transform (SMT) that takes an existing JSON header
* and expands each key-value pair into separate individual Kafka message headers.
* The original JSON header is removed after expansion.</p>
*
* <p>This transform is useful when you have JSON content in a header that you want
* to split into multiple headers for better message routing and filtering.</p>
*
* <p>Configuration:
* <ul>
* <li>header.field: Header name containing the JSON map (default: "headers")</li>
* </ul></p>
*/
public class ExpandJsonHeaders<R extends ConnectRecord<R>> implements Transformation<R> {

private static final Logger log = LoggerFactory.getLogger(ExpandJsonHeaders.class);

private static final String HEADER_FIELD_CONFIG = "header.field";
private static final String HEADER_FIELD_DEFAULT = "headers";

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(HEADER_FIELD_CONFIG, ConfigDef.Type.STRING, HEADER_FIELD_DEFAULT,
ConfigDef.Importance.HIGH, "Header name containing the JSON map");

private String headerField;
private ObjectMapper objectMapper;

@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
headerField = config.getString(HEADER_FIELD_CONFIG);
objectMapper = new ObjectMapper();

log.info("Configured ExpandJsonHeaders with field='{}'", headerField);
}

/**
* Applies the transformation to expand a JSON header into individual headers.
* Extracts key-value pairs from the specified JSON header field, adds them as
* separate headers, and removes the original JSON header. If the header is
* missing or invalid, the original record is returned unchanged.

* @param currentRecord the Kafka Connect record to transform
* @return the transformed record with expanded headers
*/
@Override
public R apply(R currentRecord) {
Headers headers = currentRecord.headers();

try {
Header headerValue = headers.allWithName(headerField).next();

JsonNode jsonNode = objectMapper.readTree(headerValue.value().toString());

if (!jsonNode.isObject()) {
log.warn("Field '{}' is not a JSON object, skipping header extraction", headerField);
return currentRecord;
}

jsonNode.properties().forEach(field -> {
String headerName = field.getKey();
String headerValueStr = field.getValue().asText();
headers.addString(headerName, headerValueStr);
log.debug("Added header: {} = {}", headerName, headerValueStr);
});

log.debug("Successfully extracted headers from field '{}'", headerField);

// Remove the original JSON header after expansion
headers.remove(headerField);
log.debug("Removed original header '{}'", headerField);
} catch (NoSuchElementException e) {
log.debug("No '{}' field found in currentRecord, skipping header extraction", headerField);
// Return original currentRecord if field is not found
return currentRecord;
} catch (Exception e) {
log.warn("Failed to parse JSON from field '{}': {}", headerField, e.getMessage());
// Return original currentRecord on parsing errors
return currentRecord;
}

return currentRecord.newRecord(
currentRecord.topic(),
currentRecord.kafkaPartition(),
currentRecord.keySchema(),
currentRecord.key(),
currentRecord.valueSchema(),
currentRecord.value(),
currentRecord.timestamp(),
headers
);
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

@Override
public void close() {
// No resources to close
}
}
Loading