Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 322 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ version = "0.1.0"
default = []
test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"]

# Sink features.
sink-elasticsearch = ["dep:elasticsearch", "dep:url"]

[dependencies]
anyhow = { version = "1.0.98", default-features = false, features = ["std"] }
chrono = { version = "0.4.41", default-features = false }
Expand Down Expand Up @@ -54,9 +57,13 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [
] }


# Optional sink dependencies.
elasticsearch = { version = "8.19.0-alpha.1", optional = true, default-features = false, features = ["rustls-tls"] }
url = { version = "2.5", optional = true, default-features = false }

ctor = { version = "0.4", optional = true }
testcontainers = { version = "0.23", optional = true, features = ["blocking"] }
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "blocking"] }
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "elastic_search", "blocking"] }

[dev-dependencies]
temp-env = "0.3"
Expand Down
11 changes: 11 additions & 0 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
//! Sink configuration types.
//!
//! Defines configuration variants for different event destinations.

use serde::Deserialize;

#[cfg(feature = "sink-elasticsearch")]
use crate::sink::elasticsearch::ElasticsearchSinkConfig;

/// Sink destination configuration.
///
/// Determines where replicated events are sent.
Expand All @@ -8,4 +15,8 @@ use serde::Deserialize;
pub enum SinkConfig {
/// In-memory sink for testing and development.
Memory,

/// Elasticsearch sink for document indexing.
#[cfg(feature = "sink-elasticsearch")]
Elasticsearch(ElasticsearchSinkConfig),
}
20 changes: 20 additions & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
// Create sink based on configuration.
let sink = match &config.sink {
SinkConfig::Memory => AnySink::Memory(MemorySink::new()),

#[cfg(feature = "sink-elasticsearch")]
SinkConfig::Elasticsearch(cfg) => {
use crate::sink::elasticsearch::ElasticsearchSink;
let s = ElasticsearchSink::new(cfg.clone()).await.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::InvalidData,
"Failed to create Elasticsearch sink",
e.to_string()
)
})?;
AnySink::Elasticsearch(s)
}
};

// Create PgStream as an ETL destination
Expand Down Expand Up @@ -122,6 +135,13 @@ fn log_sink_config(config: &SinkConfig) {
SinkConfig::Memory => {
debug!("using memory sink");
}

#[cfg(feature = "sink-elasticsearch")]
SinkConfig::Elasticsearch(cfg) => {
use crate::sink::elasticsearch::ElasticsearchSinkConfigWithoutSecrets;
let safe_cfg: ElasticsearchSinkConfigWithoutSecrets = cfg.into();
debug!(config = ?safe_cfg, "using elasticsearch sink");
}
}
}

Expand Down
238 changes: 238 additions & 0 deletions src/sink/elasticsearch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
//! Elasticsearch sink for indexing events as documents.
//!
//! Indexes each event's payload as a JSON document in Elasticsearch.
//! The sink uses bulk indexing for efficient batch operations.
//!
//! # Dynamic Routing
//!
//! The target index can come from event metadata or sink config:
//!
//! ```sql
//! -- Via metadata_extensions (dynamic per-event)
//! metadata_extensions = '[{"json_path": "index", "expression": "new.index_name"}]'
//!
//! -- Via static metadata
//! metadata = '{"index": "events"}'
//! ```
//!
//! Priority: event.metadata["index"] > config.index

use elasticsearch::http::transport::Transport;
use elasticsearch::{BulkOperation, BulkParts, Elasticsearch};
use etl::error::EtlResult;
use serde::{Deserialize, Serialize};

use crate::sink::Sink;
use crate::types::TriggeredEvent;

/// Configuration for the Elasticsearch sink.
///
/// This intentionally does not implement [`Serialize`] to avoid accidentally
/// leaking sensitive information in serialized forms.
#[derive(Clone, Debug, Deserialize)]
pub struct ElasticsearchSinkConfig {
/// Elasticsearch URL (e.g., "http://localhost:9200").
pub url: String,

/// Index name for document storage.
/// Can be overridden per-event via metadata["index"].
#[serde(default)]
pub index: Option<String>,
}

/// Configuration for the Elasticsearch sink without sensitive data.
///
/// Safe to serialize and log. Use this for debugging and metrics.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ElasticsearchSinkConfigWithoutSecrets {
/// Elasticsearch URL (may contain credentials, so partially redacted).
pub url_host: String,

/// Index name for document storage.
pub index: Option<String>,
}

impl From<ElasticsearchSinkConfig> for ElasticsearchSinkConfigWithoutSecrets {
fn from(config: ElasticsearchSinkConfig) -> Self {
Self {
url_host: extract_host(&config.url),
index: config.index,
}
}
}

impl From<&ElasticsearchSinkConfig> for ElasticsearchSinkConfigWithoutSecrets {
fn from(config: &ElasticsearchSinkConfig) -> Self {
Self {
url_host: extract_host(&config.url),
index: config.index.clone(),
}
}
}

/// Extracts the host from a URL, stripping credentials if present.
fn extract_host(url: &str) -> String {
// Parse URL and extract just the host:port portion.
if let Ok(parsed) = url::Url::parse(url) {
if let Some(host) = parsed.host_str() {
let port = parsed.port().map(|p| format!(":{p}")).unwrap_or_default();
return format!("{host}{port}");
}
}
// Fallback: return as-is if parsing fails.
url.to_string()
}

/// Sink that indexes events in Elasticsearch.
///
/// Events are serialized as JSON documents and bulk indexed.
/// The sink handles connection pooling and automatic retries.
#[derive(Clone)]
pub struct ElasticsearchSink {
/// Elasticsearch client.
client: Elasticsearch,

/// Default index name from config (can be overridden per-event).
index: Option<String>,
}

impl ElasticsearchSink {
/// Creates a new Elasticsearch sink from configuration.
///
/// # Errors
///
/// Returns an error if the transport cannot be created.
pub async fn new(
config: ElasticsearchSinkConfig,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let transport = Transport::single_node(&config.url)?;
let client = Elasticsearch::new(transport);

Ok(Self {
client,
index: config.index,
})
}

/// Resolves the index name for an event.
///
/// Priority: event.metadata["index"] > config.index
fn resolve_index<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> {
// Check event metadata first.
if let Some(ref metadata) = event.metadata {
if let Some(index) = metadata.get("index").and_then(|v| v.as_str()) {
return Some(index);
}
}
// Fall back to config.
self.index.as_deref()
}
}

impl Sink for ElasticsearchSink {
fn name() -> &'static str {
"elasticsearch"
}

async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
if events.is_empty() {
return Ok(());
}

// Build bulk operations.
let mut operations: Vec<BulkOperation<serde_json::Value>> =
Vec::with_capacity(events.len());

for event in events {
// Resolve target index for this event (convert to owned String to end borrow).
let index = self
.resolve_index(&event)
.ok_or_else(|| {
etl::etl_error!(
etl::error::ErrorKind::ConfigError,
"No index in config or event metadata"
)
})?
.to_string();

// Index only the payload (not full event envelope).
let op = BulkOperation::index(event.payload)
.id(event.id.id)
.index(index)
.into();
operations.push(op);
}

// Execute bulk request (no default index needed since each op has its own).
let response = self
.client
.bulk(BulkParts::None)
.body(operations)
.send()
.await
.map_err(|e| {
etl::etl_error!(
etl::error::ErrorKind::DestinationError,
"Failed to execute bulk request",
e.to_string()
)
})?;

// Check response status.
if !response.status_code().is_success() {
let status = response.status_code();
return Err(etl::etl_error!(
etl::error::ErrorKind::DestinationError,
"Elasticsearch bulk request failed",
format!("status: {}", status)
));
}

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_sink_name() {
assert_eq!(ElasticsearchSink::name(), "elasticsearch");
}

#[test]
fn test_config_without_secrets() {
let config = ElasticsearchSinkConfig {
url: "http://user:pass@localhost:9200".to_string(),
index: Some("events".to_string()),
};

let without_secrets: ElasticsearchSinkConfigWithoutSecrets = (&config).into();

// Should extract only host:port, no credentials.
assert_eq!(without_secrets.url_host, "localhost:9200");
assert_eq!(without_secrets.index, Some("events".to_string()));
}

#[test]
fn test_extract_host_simple() {
assert_eq!(extract_host("http://localhost:9200"), "localhost:9200");
}

#[test]
fn test_extract_host_with_credentials() {
assert_eq!(
extract_host("http://elastic:password@localhost:9200"),
"localhost:9200"
);
}

#[test]
fn test_extract_host_no_port() {
assert_eq!(
extract_host("http://elasticsearch.local"),
"elasticsearch.local"
);
}
}
21 changes: 19 additions & 2 deletions src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,35 @@
//! Sink implementations for event publishing.
//!
//! Provides destinations for replicated PostgreSQL events.

mod base;
pub mod memory;

#[cfg(feature = "sink-elasticsearch")]
pub mod elasticsearch;

pub use base::Sink;

use etl::error::EtlResult;
use memory::MemorySink;

#[cfg(feature = "sink-elasticsearch")]
use elasticsearch::ElasticsearchSink;

use crate::types::TriggeredEvent;

/// Wrapper enum for all supported sink types.
///
/// Enables runtime sink selection while maintaining static dispatch.
/// Each variant wraps a concrete sink implementation gated by its feature flag.
/// Enables runtime sink selection while maintaining static dispatch
/// for better performance. Each variant wraps a concrete sink type.
#[derive(Clone)]
pub enum AnySink {
/// In-memory sink for testing and development.
Memory(MemorySink),

#[cfg(feature = "sink-elasticsearch")]
/// Elasticsearch sink for document indexing.
Elasticsearch(ElasticsearchSink),
}

impl Sink for AnySink {
Expand All @@ -26,6 +40,9 @@ impl Sink for AnySink {
async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
match self {
AnySink::Memory(sink) => sink.publish_events(events).await,

#[cfg(feature = "sink-elasticsearch")]
AnySink::Elasticsearch(sink) => sink.publish_events(events).await,
}
}
}
Loading