Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions openlinage/debezium-source-sink/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DEBEZIUM_VERSION=3.3.0.Final
198 changes: 198 additions & 0 deletions openlinage/debezium-source-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Complete Data Lineage with Debezium Source and Sink

This demo showcases how to leverage Debezium OpenLineage support for data lineage tracking in a source-to-sink data pipeline.
The setup includes a Postgres database, Kafka, Debezium Source Connector, Debezium MongoDB Sink Connector, MongoDB, and OpenLineage (Marquez) for comprehensive data lineage tracking.

## Architecture Overview

The demo creates a CDC pipeline that:
1. **Data Capture**: Debezium Source Connector captures real-time changes from Postgres tables
2. **Message Streaming**: CDC events are published to Kafka topics
3. **Data Replication**: Debezium MongoDB Sink Connector consumes CDC events and writes to MongoDB
4. **Lineage Tracking**: OpenLineage/Marquez tracks the complete data flow from source database through Kafka to sink database
5. **Output**: Data is replicated to MongoDB with full lineage visibility

### Data Flow Details

The pipeline performs the following:

1. **Source Database**: Postgres database with inventory schema containing tables like:
- `inventory.products` - Product catalog (ID, name, description, weight)
- Other inventory tables

2. **Change Data Capture**:
- Debezium Postgres Connector monitors database changes
- Captures insert, update, and delete operations
- Publishes CDC events to Kafka topics with full Debezium envelope format

3. **Message Streaming**:
- CDC events flow through Kafka topics
- Topic naming follows pattern: `{topic.prefix}.{schema}.{table}`
- Example: `inventory.inventory.products`

4. **Data Sink**:
- Debezium MongoDB Sink Connector consumes CDC events from Kafka
- Writes data to MongoDB collections
- Maintains data consistency between source and sink

5. **Lineage Tracking**: Both source and sink connectors emit OpenLineage events to track:
- Source table schemas and data versions
- Kafka topics and message schemas
- Sink collections and transformations
- Complete end-to-end lineage from Postgres to MongoDB

## Prerequisites

- Docker and Docker Compose
- [kcctl](https://github.com/kcctl/kcctl) (Kafka Connect CLI)
- jq (for JSON processing)

## Components

- **Postgres**: Source database with inventory data
- **Kafka**: Message broker for CDC events
- **Debezium Postgres Connector**: Captures database changes from Postgres and publishes to Kafka
- **Debezium MongoDB Sink Connector**: Consumes CDC events from Kafka and writes to MongoDB
- **MongoDB**: Sink database for replicated data
- **OpenLineage/Marquez**: Tracks complete data lineage from source to sink

## Step-by-Step Setup

### Start OpenLineage/Marquez

First, clone the Marquez repository if you haven't already:

```bash
git clone https://github.com/MarquezProject/marquez && cd marquez
```

Then start the Marquez backend for data lineage tracking:

```bash
./docker/up.sh --db-port 5433
```

### Start the Demo Infrastructure

Launch all services using Docker Compose:

```bash
docker compose -f docker-compose-sink.yaml up
```

This starts:
- Kafka broker
- Postgres database with sample data
- MongoDB database
- Debezium Connect with OpenLineage support

### Connect Marquez to the Network

Connect the Marquez API to the demo network:

```bash
docker network connect debezium-source-sink_default marquez-api
```

### Verify Kafka Topics

Check available Kafka topics:

```bash
docker compose -f docker-compose-sink.yaml exec kafka /kafka/bin/kafka-topics.sh --list --bootstrap-server kafka:9092
```

### Start Debezium Source Connector

Apply the Postgres source connector configuration:

```bash
kcctl apply -f postgres-connector-openlineage.json
```

This connector will:
- Connect to the Postgres database
- Capture changes from all tables in the inventory schema
- Publish CDC events to Kafka topics
- Emit OpenLineage events for lineage tracking

### Verify CDC Events

Check that CDC events are being published to Kafka:

```bash
docker compose -f docker-compose-sink.yaml exec kafka ./bin/kafka-console-consumer.sh --bootstrap-server=kafka:9092 --topic inventory.inventory.products --from-beginning --max-messages 1 | jq
```

You should see Debezium CDC events with the full envelope format:

```json
{
"before": null,
"after": {
"id": 101,
"name": "scooter",
"description": "Small 2-wheel scooter",
"weight": 3.14
},
"source": {
"version": "3.3.0.Final",
"connector": "postgresql",
"name": "inventory",
"ts_ms": 1678901234000,
"snapshot": "true",
"db": "postgres",
"schema": "inventory",
"table": "products"
},
"op": "r",
"ts_ms": 1678901234567
}
```

### Start Debezium MongoDB Sink Connector

Apply the MongoDB sink connector configuration:

```bash
kcctl apply -f mongodb-connector-openlineage.json
```

This connector will:
- Subscribe to the `inventory.inventory.products` Kafka topic
- Consume CDC events
- Write data to MongoDB `inventory2` database
- Emit OpenLineage events for lineage tracking

### Verify Data in MongoDB

Check that data is being replicated to MongoDB:

```bash
docker compose -f docker-compose-sink.yaml exec mongodb mongosh --quiet --eval 'db.getSiblingDB("inventory2").products.find().limit(1).pretty()' mongodb://admin:admin@localhost:27017
```

You should see the replicated product data in MongoDB.

### Accessing the UI

Now that all is set up we can check the lineage graph from the **Marquez UI**: http://localhost:3000

You should see something similar to the following:

![Lineage Graph](images/lineage-graph.png)

The lineage graph will show:
- Source: Postgres tables (`postgres://postgres:5432` namespace)
- Intermediate: Kafka topics (`kafka://kafka:9092` namespace)
- Sink: MongoDB collections (`mongodb://mongodb:27017` namespace)

## Cleanup

To clean up all resources:

```bash
docker compose -f docker-compose-sink.yaml down
docker rmi debezium/connect-openlineage
docker volume ls | grep marquez | awk '{print $2}' | xargs docker volume rm
```
37 changes: 37 additions & 0 deletions openlinage/debezium-source-sink/connect/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
ARG DEBEZIUM_VERSION
FROM quay.io/debezium/connect:${DEBEZIUM_VERSION}

ENV MAVEN_REPO="https://repo1.maven.org/maven2"
ENV GROUP_ID="io/debezium"
ENV DEBEZIUM_VERSION=${DEBEZIUM_VERSION}
ENV ARTIFACT_ID="debezium-openlineage-core"
ENV CLASSIFIER="-libs"

# This is just a workaround for https://issues.apache.org/jira/browse/KAFKA-19758
RUN rm -rf /kafka/connect/debezium-connector-jdbc && \
rm -rf /kafka/connect/debezium-connector-cockroachdb && \
rm -rf /kafka/connect/debezium-connector-db2 && \
rm -rf /kafka/connect/debezium-connector-ibmi && \
rm -rf /kafka/connect/debezium-connector-informix && \
rm -rf /kafka/connect/debezium-connector-mariadb && \
rm -rf /kafka/connect/debezium-connector-mysql && \
rm -rf /kafka/connect/debezium-connector-oracle && \
rm -rf /kafka/connect/debezium-connector-spanner && \
rm -rf /kafka/connect/debezium-connector-sqlserver && \
rm -rf /kafka/connect/debezium-connector-vitess


# Add OpenLineage
RUN mkdir -p /tmp/openlineage-libs && \
curl "$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz" -o /tmp/debezium-openlineage-core-libs.tar.gz && \
tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C /tmp/openlineage-libs --strip-components=1


RUN cp -r /tmp/openlineage-libs /kafka/external_libs/

RUN for file in /kafka/external_libs/openlineage-libs/*; do \
ln -s "$file" /kafka/connect/debezium-connector-postgres/; \
ln -s "$file" /kafka/connect/debezium-connector-mongodb/; \
done

ADD openlineage.yml /kafka/
3 changes: 3 additions & 0 deletions openlinage/debezium-source-sink/connect/openlineage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
transport:
type: http
url: http://marquez-api:5000
53 changes: 53 additions & 0 deletions openlinage/debezium-source-sink/docker-compose-sink.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
services:
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
connect:
image: debezium/connect-openlineage
build:
context: ./connect
args:
DEBEZIUM_VERSION: ${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
postgres:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mongodb:
image: &mongodb-image quay.io/debezium/example-mongodb:${DEBEZIUM_VERSION}
hostname: &mongodb-hostname mongodb
ports:
- 27017:27017
environment:
MONGODB_USER: debezium
MONGODB_PASSWORD: dbz
healthcheck:
test: bash -c 'mongosh --quiet --eval "db.adminCommand(\"ping\").ok" localhost:27017 | grep 1'
interval: 10s
timeout: 5s
retries: 5
start_period: 20s

mongodb-init:
image: *mongodb-image
environment:
HOSTNAME: *mongodb-hostname
entrypoint: /bin/bash
command: "/usr/local/bin/init-inventory.sh"
depends_on:
mongodb:
condition: service_healthy
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 16 additions & 0 deletions openlinage/debezium-source-sink/mongodb-connector-openlineage.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "mongodb-sink",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbSinkConnector",
"tasks.max": "1",
"mongodb.connection.string": "mongodb://admin:admin@mongodb:27017",
"topics": "inventory.inventory.products",
"sink.database": "inventory2",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "This connector does cdc for products",
"openlineage.integration.tags": "env=prod,team=cdc",
"openlineage.integration.owners": "Mario=maintainer,John Doe=Data scientist,IronMan=superero",
"openlineage.integration.dataset.kafka.bootstrap.servers": "kafka:9092"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "inventory-connector-postgres",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.server.id": "184054",
"database.dbname": "postgres",
"topic.prefix": "inventory",
"snapshot.mode": "initial",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"slot.name": "postgres",
"openlineage.integration.enabled": "true",
"openlineage.integration.config.file.path": "/kafka/openlineage.yml",
"openlineage.integration.job.description": "This connector does cdc for products",
"openlineage.integration.tags": "env=prod,team=cdc",
"openlineage.integration.owners": "Mario=maintainer,John Doe=Data scientist,IronMan=superero",
"transforms": "openlineage",
"transforms.openlineage.type": "io.debezium.transforms.openlineage.OpenLineage"
}
}