Skip to content

Commit 8610bd3

Browse files
feat: Kafka source and destination connector (#3176)
Thanks to @tullytim we have a new Kafka source and destination connector. It also works with hosted Kafka via Confluent. Documentation will be added to the Docs repo.
1 parent 2d965fd commit 8610bd3

File tree

27 files changed

+908
-3
lines changed

27 files changed

+908
-3
lines changed

Diff for: CHANGELOG.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
## 0.14.8-dev3
1+
## 0.14.8-dev4
22

33
### Enhancements
44

@@ -21,6 +21,8 @@
2121

2222
* **Expose conversion functions for tables** Adds public functions to convert tables from HTML to the Deckerd format and back
2323

24+
* **Adds Kafka Source and Destination** New source and destination connector added to all CLI ingest commands to support reading from and writing to Kafka streams. Also supports Confluent Kafka.
25+
2426
### Fixes
2527

2628
* **Fix an error publishing docker images.** Update user in docker-smoke-test to reflect changes made by the amd64 image pull from the "unstructured" "wolfi-base" image.

Diff for: MANIFEST.in

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ include requirements/ingest/gitlab.in
3838
include requirements/ingest/google-drive.in
3939
include requirements/ingest/hubspot.in
4040
include requirements/ingest/jira.in
41+
include requirements/ingest/kafka.in
4142
include requirements/ingest/mongodb.in
4243
include requirements/ingest/notion.in
4344
include requirements/ingest/onedrive.in

Diff for: Makefile

+4
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ install-ingest-reddit:
169169
install-ingest-slack:
170170
pip install -r requirements/ingest/slack.txt
171171

172+
.PHONY: install-ingest-kafka
173+
install-ingest-kafka:
174+
python3 -m pip install -r requirements/ingest/kafka.txt
175+
172176
.PHONY: install-ingest-wikipedia
173177
install-ingest-wikipedia:
174178
python3 -m pip install -r requirements/ingest/wikipedia.txt

Diff for: docs/source/ingest/destination_connectors/kafka.rst

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Unstructured Documentation
2+
==========================
3+
4+
The Unstructured documentation page has moved! Check out our new and improved docs page at
5+
`https://docs.unstructured.io <https://docs.unstructured.io>`_ to learn more about our
6+
products and tools.

Diff for: docs/source/ingest/source_connectors/kafka.rst

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Unstructured Documentation
2+
==========================
3+
4+
The Unstructured documentation page has moved! Check out our new and improved docs page at
5+
`https://docs.unstructured.io <https://docs.unstructured.io>`_ to learn more about our
6+
products and tools.

Diff for: examples/ingest/kafka/ingest.sh

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/usr/bin/env bash
2+
3+
# Processes the pdf specified in the input path
4+
# processes the document, and writes to results to a Confluent topic.
5+
6+
SCRIPT_DIR=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)
7+
cd "$SCRIPT_DIR"/../../.. || exit 1
8+
9+
PYTHONPATH=. ./unstructured/ingest/main.py \
10+
local \
11+
--input-path="<path to the file to be processed/partitioned>" \
12+
kafka \
13+
--bootstrap-server="<bootstrap server fully qualified hostname>" \
14+
--port "<port, likely 9092>" \
15+
--topic "<destination topic in confluent>" \
16+
--kafka-api-key="<confluent api key>" \
17+
--secret="<confluent secret>" \
18+
--num-processes="<number of processes to be used>"

Diff for: requirements/ingest/kafka.in

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-c ../deps/constraints.txt
2+
-c ../base.txt
3+
confluent-kafka

Diff for: requirements/ingest/kafka.txt

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#
2+
# This file is autogenerated by pip-compile with Python 3.9
3+
# by the following command:
4+
#
5+
# pip-compile ./ingest/kafka.in
6+
#
7+
confluent-kafka==2.4.0
8+
# via -r ./ingest/kafka.in

Diff for: scripts/kafka-test-helpers/create-kafka-instance.sh

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
SCRIPT_DIR=$(dirname "$(realpath "$0")")
6+
7+
# Create the Weaviate instance
8+
docker-compose version
9+
docker-compose -f "$SCRIPT_DIR"/docker-compose.yml up --wait
10+
docker-compose -f "$SCRIPT_DIR"/docker-compose.yml ps
11+
12+
echo "Instance is live."

Diff for: scripts/kafka-test-helpers/docker-compose.yml

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
services:
2+
zookeeper:
3+
image: confluentinc/cp-zookeeper:latest
4+
environment:
5+
ZOOKEEPER_CLIENT_PORT: 2181
6+
ZOOKEEPER_TICK_TIME: 2000
7+
ports:
8+
- 22181:2181
9+
10+
kafka:
11+
image: confluentinc/cp-kafka:latest
12+
depends_on:
13+
- zookeeper
14+
ports:
15+
- 29092:29092
16+
environment:
17+
KAFKA_BROKER_ID: 1
18+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
19+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
20+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
21+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
22+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Diff for: setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ def load_requirements(file_list: Optional[Union[str, List[str]]] = None) -> List
149149
"google-drive": load_requirements("requirements/ingest/google-drive.in"),
150150
"hubspot": load_requirements("requirements/ingest/hubspot.in"),
151151
"jira": load_requirements("requirements/ingest/jira.in"),
152+
"kafka": load_requirements("requirements/ingest/kafka.in"),
152153
"mongodb": load_requirements("requirements/ingest/mongodb.in"),
153154
"notion": load_requirements("requirements/ingest/notion.in"),
154155
"onedrive": load_requirements("requirements/ingest/onedrive.in"),

Diff for: test_unstructured_ingest/dest/kafka-local.sh

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#!/usr/bin/env bash
2+
3+
set -e
4+
5+
DEST_PATH=$(dirname "$(realpath "$0")")
6+
SCRIPT_DIR=$(dirname "$DEST_PATH")
7+
cd "$SCRIPT_DIR"/.. || exit 1
8+
OUTPUT_FOLDER_NAME=local-kafka-dest
9+
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
10+
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
11+
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
12+
13+
RANDOM_SUFFIX=$((RANDOM % 100000 + 1))
14+
15+
LC_ALL=C
16+
17+
# Set the variables with default values if they're not set in the environment
18+
KAFKA_TOPIC=${KAFKA_TOPIC:-"ingest-test-$RANDOM_SUFFIX"}
19+
20+
# shellcheck disable=SC1091
21+
source "$SCRIPT_DIR"/cleanup.sh
22+
function cleanup {
23+
# Local file cleanup
24+
cleanup_dir "$WORK_DIR"
25+
cleanup_dir "$OUTPUT_DIR"
26+
27+
echo "Stopping local Kafka instance"
28+
docker-compose -f scripts/kafka-test-helpers/docker-compose.yml down --remove-orphans -v
29+
}
30+
31+
trap cleanup EXIT
32+
33+
echo "Creating local Kafka instance"
34+
# shellcheck source=/dev/null
35+
scripts/kafka-test-helpers/create-kafka-instance.sh
36+
wait
37+
38+
PYTHONPATH=. ./unstructured/ingest/main.py \
39+
local \
40+
--num-processes "$max_processes" \
41+
--output-dir "$OUTPUT_DIR" \
42+
--strategy fast \
43+
--verbose \
44+
--reprocess \
45+
--input-path example-docs/layout-parser-paper.pdf \
46+
--work-dir "$WORK_DIR" \
47+
--chunking-strategy basic \
48+
--chunk-combine-text-under-n-chars 200 \
49+
--chunk-new-after-n-chars 2500 \
50+
--chunk-max-characters 38000 \
51+
--chunk-multipage-sections \
52+
--embedding-provider "langchain-huggingface" \
53+
kafka \
54+
--topic "$KAFKA_TOPIC" \
55+
--bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \
56+
--port 29092 \
57+
--confluent false
58+
59+
echo "Checking for matching messages in Kafka"
60+
61+
#Check the number of messages in destination topic
62+
python "$SCRIPT_DIR"/python/test-kafka-output.py check \
63+
--bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \
64+
--topic "$KAFKA_TOPIC" \
65+
--confluent false \
66+
--port 29092
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
[
2+
{
3+
"element_id": "8f574cb913e5d32ce4cea48cb73ef537",
4+
"metadata": {
5+
"data_source": {},
6+
"filetype": "application/pdf",
7+
"languages": [
8+
"eng"
9+
],
10+
"page_number": 1
11+
},
12+
"text": "May 5, 2023",
13+
"type": "UncategorizedText"
14+
},
15+
{
16+
"element_id": "929719c4916d0467030a4ca91645d4b6",
17+
"metadata": {
18+
"data_source": {},
19+
"filetype": "application/pdf",
20+
"languages": [
21+
"eng"
22+
],
23+
"page_number": 1
24+
},
25+
"text": "To Whom it May Concern:",
26+
"type": "Title"
27+
},
28+
{
29+
"element_id": "0442d409994665cebb0374e563803d2d",
30+
"metadata": {
31+
"data_source": {},
32+
"filetype": "application/pdf",
33+
"languages": [
34+
"eng"
35+
],
36+
"page_number": 1
37+
},
38+
"text": "There were 20,000 bottles of water, 10,000 blankets, and 200 laptops delivered on January 23, 2023. A total of 3 trucks were used for 15 hours.",
39+
"type": "NarrativeText"
40+
},
41+
{
42+
"element_id": "df197ce710970cdeccadd9d89229543b",
43+
"metadata": {
44+
"data_source": {},
45+
"filetype": "application/pdf",
46+
"languages": [
47+
"eng"
48+
],
49+
"page_number": 1
50+
},
51+
"text": "Best Regards,",
52+
"type": "UncategorizedText"
53+
},
54+
{
55+
"element_id": "b017fcb835c7f553160252356420acd3",
56+
"metadata": {
57+
"data_source": {},
58+
"filetype": "application/pdf",
59+
"languages": [
60+
"eng"
61+
],
62+
"page_number": 1
63+
},
64+
"text": "Mallori",
65+
"type": "Title"
66+
}
67+
]

Diff for: test_unstructured_ingest/python/test-kafka-output.py

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#!/usr/bin/env python
2+
import socket
3+
from concurrent.futures import ThreadPoolExecutor
4+
5+
import click
6+
from confluent_kafka import Consumer, TopicPartition
7+
8+
9+
@click.group(name="kafka-ingest")
10+
def cli():
11+
pass
12+
13+
14+
def get_partition_size(consumer: Consumer, topic_name: str, partition_key: int):
15+
topic_partition = TopicPartition(topic_name, partition_key)
16+
low_offset, high_offset = consumer.get_watermark_offsets(topic_partition)
17+
partition_size = high_offset - low_offset
18+
return partition_size
19+
20+
21+
def get_topic_size(consumer: Consumer, topic_name: str):
22+
print(f"Getting the number of messages in the topic {topic_name}")
23+
topic = consumer.list_topics(topic=topic_name)
24+
print(f"topic {topic}")
25+
partitions = topic.topics[topic_name].partitions
26+
workers, max_workers = [], len(partitions) or 1
27+
28+
with ThreadPoolExecutor(max_workers=max_workers) as e:
29+
for partition_key in list(topic.topics[topic_name].partitions.keys()):
30+
job = e.submit(get_partition_size, consumer, topic_name, partition_key)
31+
workers.append(job)
32+
33+
topic_size = sum([w.result() for w in workers])
34+
return topic_size
35+
36+
37+
@cli.command()
38+
@click.option("--bootstrap-server", type=str, required=True)
39+
@click.option("--topic", type=str, required=True)
40+
@click.option("--api-key", type=str, required=False)
41+
@click.option("--secret", type=str, required=False)
42+
@click.option("--confluent", type=bool, required=True, default=True)
43+
@click.option("--port", type=int, required=False, default=9092)
44+
def check(bootstrap_server: str, topic: str, api_key: str, secret: str, confluent: bool, port: int):
45+
conf = {
46+
"bootstrap.servers": f"{bootstrap_server}:{port}",
47+
"client.id": socket.gethostname(),
48+
"group.id": "your_group_id",
49+
"enable.auto.commit": "true",
50+
"auto.offset.reset": "earliest",
51+
}
52+
53+
if confluent:
54+
conf["security.protocol"] = "SASL_SSL"
55+
conf["sasl.mechanism"] = "PLAIN"
56+
conf["sasl.username"] = api_key
57+
conf["sasl.password"] = secret
58+
59+
consumer = Consumer(conf)
60+
print("Checking the number of messages in the topic")
61+
topic_size = get_topic_size(consumer, topic)
62+
expected = 16
63+
print(
64+
f"Checking that the number of messages found ({topic_size}) "
65+
f"matches what's expected: {expected}"
66+
)
67+
assert (
68+
topic_size == expected
69+
), f"number of messages found ({topic_size}) doesn't match what's expected: {expected}"
70+
print("successfully checked the number of messages!")
71+
72+
73+
if __name__ == "__main__":
74+
cli()

0 commit comments

Comments
 (0)