Skip to content

feat: add custom ETL connector support (YET-1178)#317

Open
sassmith wants to merge 3 commits into
mainfrom
ssmith/yet-1178-custom-etl-connectors
Open

feat: add custom ETL connector support (YET-1178)#317
sassmith wants to merge 3 commits into
mainfrom
ssmith/yet-1178-custom-etl-connectors

Conversation

@sassmith
Copy link
Copy Markdown
Contributor

@sassmith sassmith commented Jun 5, 2026

Summary

  • Add runtime-loaded custom ETL connector infrastructure at apollo/integrations/custom_etl/, mirroring the existing custom (warehouse) connector scaffolding
  • CustomEtlProxyClient wraps Connector(BaseEtlConnector) modules loaded from /opt/custom-etl-connectors/{name}/, exposing fetch_etl_assets and fetch_etl_runs methods that match the agent calls in data-collector PR #2390
  • Factory routes custom-etl-connector-<hash> connection types through the new proxy client (gated by MCD_CUSTOM_CONNECTORS_ENABLED)
  • Discovery endpoints (/api/v1/agent/connectors/types and /api/v1/agent/custom-connectors/manifests) now return custom ETL connector types and manifests alongside existing native/custom entries

Related issues and PRs

Test plan

  • 40 new tests in `tests/test_custom_etl_proxy_client.py` covering loader, serialization, proxy client, discovery, agent integration, and factory routing
  • Existing `test_custom_proxy_client.py` (33 tests) updated and passing
  • `test_proxy_client_factory.py` passing
  • pyright clean on new modules (0 errors)
  • CI passes
  • Ran `/code-review`

🤖 Generated with Claude Code

Add runtime-loaded custom ETL connector infrastructure, mirroring the
existing custom (warehouse) connector scaffolding for ETL use cases.

- Discover and register connectors from /opt/custom-etl-connectors/{name}/
- CustomEtlProxyClient wraps Connector(BaseEtlConnector) modules with
  fetch_etl_assets and fetch_etl_runs methods matching DC PR #2390
- Factory routes custom-etl-connector-<hash> types (gated by
  MCD_CUSTOM_CONNECTORS_ENABLED)
- Discovery endpoints return custom_etl types alongside native/custom

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@sassmith sassmith requested a review from a team as a code owner June 5, 2026 14:42
@linear
Copy link
Copy Markdown

linear Bot commented Jun 5, 2026

YET-1178

sassmith and others added 2 commits June 5, 2026 12:50
Rename _fetch_run_details → fetch_run_details (public method on
BaseEtlConnector) and remove the sys.path manipulation that was
added to work around the missing etl_connectors import — the
package is pip-installed in the agent image, not path-hacked.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…module caching

- Move credential validation before module load/Connector instantiation (F1)
- Strip None values consistently from all dict-like structures in _serialize (F2)
- Cache loaded connector modules to avoid re-executing top-level code (F3)
- Add tests for setup_connection failure and close() exception suppression (F4, F5)
- Add tearDown to reset registry cache between test classes (F6)
- Fix stale _fetch_run_details docstring reference (F8)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@genehynson
Copy link
Copy Markdown
Contributor

Cross-repo contract check (vs saas-serverless #4279 normalizer + pycarlo wire envelope) — possible asset-drop bug.

The ETL metadata wire envelope is asymmetric in pycarlo (pycarlo/features/ingestion/etl/models.py):

  • metadata events are wrapped: build_etl_metadata_payload emits [EtlMetadataEvent(etl_asset=e).to_dict() for e in events] → each event is {"etl_asset": {job_source_id, name, ...}}. The EtlMetadataEvent.etl_asset field is required: True on the wire schema.
  • run events are flat: build_etl_runs_payload emits [e.to_dict() for e in events]{job_source_id, run_source_id, ...} with no wrapper.

The saas-serverless normalizer matches this exactly — asset path does wire_event.get("etl_asset") and skips the event if it's missing; runs path reads fields top-level.

This PR's CustomEtlProxyClient._serialize reflects the EtlAsset object flat for both methods, so fetch_etl_assets returns {"job_source_id": ..., "name": ...} without the etl_asset wrapper. End result: every asset event silently drops at the normalizer (.get("etl_asset")None → skip). Runs are unaffected (flat is correct for them).

Suggested fix: serialize assets through the pycarlo envelope rather than the generic _serialize — e.g. EtlMetadataEvent(etl_asset=e).to_dict() (or build_etl_metadata_payload) for fetch_etl_assets, keeping runs flat. Since the customer connector's fetch_metadata already returns List[EtlAsset] (pycarlo type), round-tripping through pycarlo's own serializers also avoids field-name/null-stripping drift from the reflection-based _serialize.

Worth a unit test asserting fetch_etl_assets output has the etl_asset key, to lock the contract against the normalizer.

@sassmith
Copy link
Copy Markdown
Contributor Author

sassmith commented Jun 5, 2026

Code Review — 8 findings (7 fixed, 1 skipped)

Reviewed: 2026-06-05 UTC | Reviewers: security, correctness, testing, devdocs | Scope: full PR

# Severity Finding Status
F1 ISSUE Credential validation ordering — module loaded and Connector() instantiated before credentials checked ✅ Moved credential validation before module load
F2 ISSUE _serialize strips None from dataclass/object dicts but not from plain dicts ✅ Added None-stripping to dict branch for consistency
F3 ISSUE Module not registered in sys.modules — re-executes top-level code on each load ✅ Added module-level cache to avoid re-execution
F4 SUGGESTION Missing test for setup_connection failure path ✅ Added test for setup_connection failure
F5 SUGGESTION Missing test for close() exception suppression ✅ Added test for close() exception suppression
F6 SUGGESTION Registry cache state bleed between tests ✅ Added tearDown to reset registry cache
F7 SUGGESTION Path traversal risk in load_connector_module ⏭️ Skipped — matches existing warehouse connector pattern
F8 NIT Stale _fetch_run_details reference in fetch_etl_runs docstring ✅ Fixed docstring reference

👉 Fix commit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants