Skip to content

feat(sink): add Elasticsearch sink for document indexing#32

Closed
psteinroe wants to merge 7 commits intomainfrom
feat/sink-elasticsearch
Closed

feat(sink): add Elasticsearch sink for document indexing#32
psteinroe wants to merge 7 commits intomainfrom
feat/sink-elasticsearch

Conversation

@psteinroe
Copy link
Copy Markdown
Owner

@psteinroe psteinroe commented Jan 7, 2026

Summary

  • Add ElasticsearchSink that indexes events as documents
  • Sends event.payload as document body (with injected id field)
  • Supports dynamic index routing via event metadata

Configuration

sink:
  type: elasticsearch
  url: http://localhost:9200
  index: events  # optional default

Routing

Index is resolved from:

  1. index field in event metadata
  2. Fallback to config value

- Adds Elasticsearch sink for indexing events as JSON documents
- Feature-gated behind `sink-elasticsearch` flag
- Uses `elasticsearch` crate (v8.19.0-alpha.1) with bulk indexing
- Includes `ElasticsearchSinkConfigWithoutSecrets` for safe logging
- Documents `routing` and `pipeline` payload extensions
- Adds AnySink enum pattern for runtime sink dispatch
- Integration tests use Elasticsearch testcontainer
…nly output

- Make index field optional (can come from event.metadata["index"])
- Add resolve_index() for dynamic per-event index routing
- Send only event.payload (no id, created_at, lsn, stream_id, metadata envelope)
- Use DestinationError for publish failures, ConfigError for missing config
- Remove verbose info! logging
- Update tests for payload-only content and add metadata routing test
@psteinroe psteinroe force-pushed the feat/sink-elasticsearch branch from 978a7c2 to b35afc8 Compare January 8, 2026 12:00
psteinroe and others added 5 commits January 8, 2026 13:12
Use {"json_path": ..., "expression": ...} format instead of incorrect
{"key": ..., "source": ..., "value": ...} format.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Rely on HTTP status code for error detection.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
Remove cloning in publish_events to improve performance.
Events are now moved by value instead of borrowed and cloned.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
The borrowed index reference was keeping the event borrowed,
preventing us from moving event.payload and event.id.id.
Convert to String immediately to end the borrow.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@psteinroe psteinroe closed this Jan 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant