Skip to content

Commit 1f5befa

Browse files
authored
feat: add Apache Iggy connector (#1969)
* Enable CocoIndex to integrate with Apache Iggy streams Iggy uses stream/topic/partition addressing and payload-only messages rather than Kafka's topic/key/tombstone model. The connector maps that API into CocoIndex live streams and target states while preserving downstream-ready offset storage for source consumption. Constraint: Iggy Python SDK does not expose Kafka-style message keys, tombstones, assignment callbacks, or per-partition watermarks. Rejected: Blind Kafka connector port | would silently mis-handle keys, deletes, and readiness semantics. Confidence: medium Scope-risk: moderate Directive: Do not weaken source readiness for multi-partition topics without an SDK-supported per-partition watermark or explicit initial_high_watermark. Tested: uv run pytest python/tests/connectors/test_iggy_source.py Tested: uv run ruff check python/cocoindex/connectors/iggy/_source.py python/cocoindex/connectors/iggy/_target.py python/tests/connectors/test_iggy_source.py Tested: uv run mypy python/cocoindex/connectors/iggy/_source.py python/cocoindex/connectors/iggy/_target.py python/tests/connectors/test_iggy_source.py Not-tested: Live integration against an Apache Iggy server. * Keep Iggy connector tests formatter-clean The CI prek action runs Ruff formatting across all files and rewrites the long async test signature in the Iggy connector test module. Commit the formatter output so the hook has no working-tree changes to report. Constraint: CI runs prek 0.4.0 with ruff-format over all files Confidence: high Scope-risk: narrow Reversibility: clean Tested: uv run ruff format --check python/tests/connectors/test_iggy_source.py Tested: uv run prek run --all-files * Prevent duplicate Iggy deliveries during live consumption A real Iggy broker can return an already-delivered offset while manual offset storage is still catching up. The source now tracks offsets delivered during a watch call and skips duplicate partition offsets before sending them downstream. The mocked regression mirrors the live broker sequence observed during the smoke test: 0, 1, 1, 2. Constraint: Iggy consumer groups use server-side offsets with manual store_offset calls Rejected: Treat mocked source tests as sufficient | live broker returned a duplicate offset that mocks did not cover Confidence: high Scope-risk: narrow Reversibility: clean Directive: Keep live-broker smoke coverage in mind when changing Iggy offset handling Tested: uv run pytest python/tests/connectors/test_iggy_source.py Tested: real apache/iggy:latest broker smoke over TCP delivered offsets 0,1,2 through topic_as_stream Tested: uv run ruff check python/cocoindex/connectors/iggy/_source.py python/tests/connectors/test_iggy_source.py Tested: uv run mypy Tested: uv run prek run --all-files
1 parent 4421582 commit 1f5befa

6 files changed

Lines changed: 1361 additions & 1 deletion

File tree

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ doris = ["aiohttp>=3.9.0", "pymysql>=1.1.0", "aiomysql>=0.2.0"]
8989
falkordb = ["falkordb>=1.1.0"]
9090
neo4j = ["neo4j>=5.18.0"]
9191
kafka = ["confluent_kafka>=2.6"]
92+
iggy = ["apache-iggy>=0.8.0"]
9293
oci = ["oci>=2.0"]
9394
entity_resolution = ["faiss-cpu>=1.7"]
9495
entity_resolution_llm = [
@@ -120,6 +121,7 @@ all = [
120121
"falkordb>=1.1.0",
121122
"neo4j>=5.18.0",
122123
"confluent_kafka>=2.6",
124+
"apache-iggy>=0.8.0",
123125
"oci>=2.0",
124126
"faiss-cpu>=1.7",
125127
"instructor>=1.0",
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from . import _source, _target
2+
from ._source import *
3+
from ._target import *
4+
5+
__all__ = _source.__all__ + _target.__all__

0 commit comments

Comments
 (0)