diff --git a/Cargo.lock b/Cargo.lock index 780a4ae..20677c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "ahash" version = "0.8.12" @@ -50,6 +56,19 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236" +[[package]] +name = "async-compression" +version = "0.4.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98ec5f6c2f8bc326c994cb9e241cc257ddaba9afa8555a43cffbb5dd86efaa37" +dependencies = [ + "compression-codecs", + "compression-core", + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -232,6 +251,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.42" @@ -253,6 +278,23 @@ dependencies = [ "cc", ] +[[package]] +name = "compression-codecs" +version = "0.4.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f7ac3e5b97fdce45e8922fb05cae2c37f7bbd63d30dd94821dacfd8f3f2bf2" +dependencies = [ + "compression-core", + "flate2", + "memchr", +] + +[[package]] +name = "compression-core" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -321,6 +363,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-epoch" version = "0.9.18" @@ -516,6 +567,28 @@ dependencies = [ "serde", ] +[[package]] +name = "elasticsearch" +version = "8.19.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b8b65fab4532c837038a5cc305508006a8cd54660b2b8b0cda7d3bac5ffdaf9" +dependencies = [ + "base64 0.22.1", + "bytes", + "dyn-clone", + "flate2", + "lazy_static", + "percent-encoding", + "reqwest", + "rustc_version", + "serde", + "serde_json", + "serde_with", + "tokio", + "url", + "void", +] + [[package]] name = "encoding_rs" version = "0.8.35" @@ -670,6 +743,16 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7ac824320a75a52197e8f2d787f6a38b6718bb6897a35142d749af3c0e8f4fe" +[[package]] +name = "flate2" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.11.1" @@ -831,8 +914,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -842,9 +927,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi", "wasip2", + "wasm-bindgen", ] [[package]] @@ -1056,6 +1143,7 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", + "webpki-roots 1.0.4", ] [[package]] @@ -1064,6 +1152,7 @@ version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ + "base64 0.22.1", "bytes", "futures-channel", "futures-core", @@ -1071,7 +1160,9 @@ dependencies = [ "http", "http-body", "hyper", + "ipnet", "libc", + "percent-encoding", "pin-project-lite", "socket2 0.6.1", "tokio", @@ -1255,6 +1346,16 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +[[package]] +name = "iri-string" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "itoa" version = "1.0.15" @@ -1350,6 +1451,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "matchers" version = "0.2.0" @@ -1427,6 +1534,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.1.1" @@ -1730,6 +1847,7 @@ dependencies = [ "config", "const-oid", "ctor", + "elasticsearch", "etl", "etl-postgres", "futures", @@ -1756,6 +1874,7 @@ dependencies = [ "tokio-rustls", "tracing", "tracing-subscriber", + "url", "uuid", "x509-cert", ] @@ -1822,6 +1941,61 @@ dependencies = [ "winapi", ] +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2 0.6.1", + "thiserror", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +dependencies = [ + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand 0.9.2", + "ring", + "rustc-hash", + "rustls", + "rustls-pki-types", + "slab", + "thiserror", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2 0.6.1", + "tracing", + "windows-sys 0.60.2", +] + [[package]] name = "quote" version = "1.0.42" @@ -1981,6 +2155,44 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots 1.0.4", +] + [[package]] name = "ring" version = "0.17.14" @@ -2015,6 +2227,21 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "1.1.2" @@ -2071,6 +2298,7 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "708c0f9d5f54ba0272468c1d306a52c495b31fa155e91bc25371e6df7996908c" dependencies = [ + "web-time", "zeroize", ] @@ -2170,6 +2398,12 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + [[package]] name = "serde" version = "1.0.228" @@ -2336,6 +2570,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simd-adler32" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" + [[package]] name = "siphasher" version = "1.0.1" @@ -2663,6 +2903,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + [[package]] name = "synstructure" version = "0.13.2" @@ -2951,6 +3200,50 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", + "tokio", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-http" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" +dependencies = [ + "async-compression", + "bitflags 2.10.0", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "iri-string", + "pin-project-lite", + "tokio", + "tokio-util", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -3096,6 +3389,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "want" version = "0.3.1" @@ -3139,6 +3438,19 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836d9622d604feee9e5de25ac10e3ea5f2d65b41eac0d9ce72eb5deae707ce7c" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.106" @@ -3181,6 +3493,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.26.11" diff --git a/Cargo.toml b/Cargo.toml index b53db85..f9d0c6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,9 @@ version = "0.1.0" default = [] test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"] +# Sink features. +sink-elasticsearch = ["dep:elasticsearch", "dep:url"] + [dependencies] anyhow = { version = "1.0.98", default-features = false, features = ["std"] } chrono = { version = "0.4.41", default-features = false } @@ -54,9 +57,13 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [ ] } +# Optional sink dependencies. +elasticsearch = { version = "8.19.0-alpha.1", optional = true, default-features = false, features = ["rustls-tls"] } +url = { version = "2.5", optional = true, default-features = false } + ctor = { version = "0.4", optional = true } testcontainers = { version = "0.23", optional = true, features = ["blocking"] } -testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "blocking"] } +testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "elastic_search", "blocking"] } [dev-dependencies] temp-env = "0.3" diff --git a/src/config/sink.rs b/src/config/sink.rs index 3f16033..358dcc2 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -1,5 +1,12 @@ +//! Sink configuration types. +//! +//! Defines configuration variants for different event destinations. + use serde::Deserialize; +#[cfg(feature = "sink-elasticsearch")] +use crate::sink::elasticsearch::ElasticsearchSinkConfig; + /// Sink destination configuration. /// /// Determines where replicated events are sent. @@ -8,4 +15,8 @@ use serde::Deserialize; pub enum SinkConfig { /// In-memory sink for testing and development. Memory, + + /// Elasticsearch sink for document indexing. + #[cfg(feature = "sink-elasticsearch")] + Elasticsearch(ElasticsearchSinkConfig), } diff --git a/src/core.rs b/src/core.rs index 1304429..60ae82d 100644 --- a/src/core.rs +++ b/src/core.rs @@ -77,6 +77,19 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> { // Create sink based on configuration. let sink = match &config.sink { SinkConfig::Memory => AnySink::Memory(MemorySink::new()), + + #[cfg(feature = "sink-elasticsearch")] + SinkConfig::Elasticsearch(cfg) => { + use crate::sink::elasticsearch::ElasticsearchSink; + let s = ElasticsearchSink::new(cfg.clone()).await.map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::InvalidData, + "Failed to create Elasticsearch sink", + e.to_string() + ) + })?; + AnySink::Elasticsearch(s) + } }; // Create PgStream as an ETL destination @@ -122,6 +135,13 @@ fn log_sink_config(config: &SinkConfig) { SinkConfig::Memory => { debug!("using memory sink"); } + + #[cfg(feature = "sink-elasticsearch")] + SinkConfig::Elasticsearch(cfg) => { + use crate::sink::elasticsearch::ElasticsearchSinkConfigWithoutSecrets; + let safe_cfg: ElasticsearchSinkConfigWithoutSecrets = cfg.into(); + debug!(config = ?safe_cfg, "using elasticsearch sink"); + } } } diff --git a/src/sink/elasticsearch.rs b/src/sink/elasticsearch.rs new file mode 100644 index 0000000..fa1b4f4 --- /dev/null +++ b/src/sink/elasticsearch.rs @@ -0,0 +1,238 @@ +//! Elasticsearch sink for indexing events as documents. +//! +//! Indexes each event's payload as a JSON document in Elasticsearch. +//! The sink uses bulk indexing for efficient batch operations. +//! +//! # Dynamic Routing +//! +//! The target index can come from event metadata or sink config: +//! +//! ```sql +//! -- Via metadata_extensions (dynamic per-event) +//! metadata_extensions = '[{"json_path": "index", "expression": "new.index_name"}]' +//! +//! -- Via static metadata +//! metadata = '{"index": "events"}' +//! ``` +//! +//! Priority: event.metadata["index"] > config.index + +use elasticsearch::http::transport::Transport; +use elasticsearch::{BulkOperation, BulkParts, Elasticsearch}; +use etl::error::EtlResult; +use serde::{Deserialize, Serialize}; + +use crate::sink::Sink; +use crate::types::TriggeredEvent; + +/// Configuration for the Elasticsearch sink. +/// +/// This intentionally does not implement [`Serialize`] to avoid accidentally +/// leaking sensitive information in serialized forms. +#[derive(Clone, Debug, Deserialize)] +pub struct ElasticsearchSinkConfig { + /// Elasticsearch URL (e.g., "http://localhost:9200"). + pub url: String, + + /// Index name for document storage. + /// Can be overridden per-event via metadata["index"]. + #[serde(default)] + pub index: Option, +} + +/// Configuration for the Elasticsearch sink without sensitive data. +/// +/// Safe to serialize and log. Use this for debugging and metrics. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ElasticsearchSinkConfigWithoutSecrets { + /// Elasticsearch URL (may contain credentials, so partially redacted). + pub url_host: String, + + /// Index name for document storage. + pub index: Option, +} + +impl From for ElasticsearchSinkConfigWithoutSecrets { + fn from(config: ElasticsearchSinkConfig) -> Self { + Self { + url_host: extract_host(&config.url), + index: config.index, + } + } +} + +impl From<&ElasticsearchSinkConfig> for ElasticsearchSinkConfigWithoutSecrets { + fn from(config: &ElasticsearchSinkConfig) -> Self { + Self { + url_host: extract_host(&config.url), + index: config.index.clone(), + } + } +} + +/// Extracts the host from a URL, stripping credentials if present. +fn extract_host(url: &str) -> String { + // Parse URL and extract just the host:port portion. + if let Ok(parsed) = url::Url::parse(url) { + if let Some(host) = parsed.host_str() { + let port = parsed.port().map(|p| format!(":{p}")).unwrap_or_default(); + return format!("{host}{port}"); + } + } + // Fallback: return as-is if parsing fails. + url.to_string() +} + +/// Sink that indexes events in Elasticsearch. +/// +/// Events are serialized as JSON documents and bulk indexed. +/// The sink handles connection pooling and automatic retries. +#[derive(Clone)] +pub struct ElasticsearchSink { + /// Elasticsearch client. + client: Elasticsearch, + + /// Default index name from config (can be overridden per-event). + index: Option, +} + +impl ElasticsearchSink { + /// Creates a new Elasticsearch sink from configuration. + /// + /// # Errors + /// + /// Returns an error if the transport cannot be created. + pub async fn new( + config: ElasticsearchSinkConfig, + ) -> Result> { + let transport = Transport::single_node(&config.url)?; + let client = Elasticsearch::new(transport); + + Ok(Self { + client, + index: config.index, + }) + } + + /// Resolves the index name for an event. + /// + /// Priority: event.metadata["index"] > config.index + fn resolve_index<'a>(&'a self, event: &'a TriggeredEvent) -> Option<&'a str> { + // Check event metadata first. + if let Some(ref metadata) = event.metadata { + if let Some(index) = metadata.get("index").and_then(|v| v.as_str()) { + return Some(index); + } + } + // Fall back to config. + self.index.as_deref() + } +} + +impl Sink for ElasticsearchSink { + fn name() -> &'static str { + "elasticsearch" + } + + async fn publish_events(&self, events: Vec) -> EtlResult<()> { + if events.is_empty() { + return Ok(()); + } + + // Build bulk operations. + let mut operations: Vec> = + Vec::with_capacity(events.len()); + + for event in events { + // Resolve target index for this event (convert to owned String to end borrow). + let index = self + .resolve_index(&event) + .ok_or_else(|| { + etl::etl_error!( + etl::error::ErrorKind::ConfigError, + "No index in config or event metadata" + ) + })? + .to_string(); + + // Index only the payload (not full event envelope). + let op = BulkOperation::index(event.payload) + .id(event.id.id) + .index(index) + .into(); + operations.push(op); + } + + // Execute bulk request (no default index needed since each op has its own). + let response = self + .client + .bulk(BulkParts::None) + .body(operations) + .send() + .await + .map_err(|e| { + etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Failed to execute bulk request", + e.to_string() + ) + })?; + + // Check response status. + if !response.status_code().is_success() { + let status = response.status_code(); + return Err(etl::etl_error!( + etl::error::ErrorKind::DestinationError, + "Elasticsearch bulk request failed", + format!("status: {}", status) + )); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sink_name() { + assert_eq!(ElasticsearchSink::name(), "elasticsearch"); + } + + #[test] + fn test_config_without_secrets() { + let config = ElasticsearchSinkConfig { + url: "http://user:pass@localhost:9200".to_string(), + index: Some("events".to_string()), + }; + + let without_secrets: ElasticsearchSinkConfigWithoutSecrets = (&config).into(); + + // Should extract only host:port, no credentials. + assert_eq!(without_secrets.url_host, "localhost:9200"); + assert_eq!(without_secrets.index, Some("events".to_string())); + } + + #[test] + fn test_extract_host_simple() { + assert_eq!(extract_host("http://localhost:9200"), "localhost:9200"); + } + + #[test] + fn test_extract_host_with_credentials() { + assert_eq!( + extract_host("http://elastic:password@localhost:9200"), + "localhost:9200" + ); + } + + #[test] + fn test_extract_host_no_port() { + assert_eq!( + extract_host("http://elasticsearch.local"), + "elasticsearch.local" + ); + } +} diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 4f8ab01..0fe98f0 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -1,21 +1,35 @@ +//! Sink implementations for event publishing. +//! +//! Provides destinations for replicated PostgreSQL events. + mod base; pub mod memory; +#[cfg(feature = "sink-elasticsearch")] +pub mod elasticsearch; + pub use base::Sink; use etl::error::EtlResult; use memory::MemorySink; +#[cfg(feature = "sink-elasticsearch")] +use elasticsearch::ElasticsearchSink; + use crate::types::TriggeredEvent; /// Wrapper enum for all supported sink types. /// -/// Enables runtime sink selection while maintaining static dispatch. -/// Each variant wraps a concrete sink implementation gated by its feature flag. +/// Enables runtime sink selection while maintaining static dispatch +/// for better performance. Each variant wraps a concrete sink type. #[derive(Clone)] pub enum AnySink { /// In-memory sink for testing and development. Memory(MemorySink), + + #[cfg(feature = "sink-elasticsearch")] + /// Elasticsearch sink for document indexing. + Elasticsearch(ElasticsearchSink), } impl Sink for AnySink { @@ -26,6 +40,9 @@ impl Sink for AnySink { async fn publish_events(&self, events: Vec) -> EtlResult<()> { match self { AnySink::Memory(sink) => sink.publish_events(events).await, + + #[cfg(feature = "sink-elasticsearch")] + AnySink::Elasticsearch(sink) => sink.publish_events(events).await, } } } diff --git a/src/test_utils/container.rs b/src/test_utils/container.rs index 82625ea..d51e7c8 100644 --- a/src/test_utils/container.rs +++ b/src/test_utils/container.rs @@ -2,6 +2,7 @@ use ctor::dtor; use etl::config::{PgConnectionConfig, TlsConfig}; use std::sync::{Mutex, OnceLock}; use testcontainers::{ContainerRequest, ImageExt, runners::SyncRunner}; +use testcontainers_modules::elastic_search::ElasticSearch; use testcontainers_modules::postgres::Postgres; use uuid::Uuid; @@ -74,3 +75,46 @@ pub async fn test_pg_config() -> PgConnectionConfig { keepalive: None, } } + +static ELASTICSEARCH_PORT: OnceLock = OnceLock::new(); +/// Using Mutex> so we can take ownership for cleanup. +static ELASTICSEARCH_CONTAINER: OnceLock>>> = + OnceLock::new(); + +/// Cleanup function that runs at program exit to stop and remove the Elasticsearch container. +#[dtor] +fn cleanup_elasticsearch_container() { + if let Some(mutex) = ELASTICSEARCH_CONTAINER.get() { + if let Ok(mut guard) = mutex.lock() { + if let Some(container) = guard.take() { + // rm() stops and removes the container. + let _ = container.rm(); + } + } + } +} + +/// Ensures an Elasticsearch container is running and returns its HTTP API port. +/// +/// The container is reused across tests. Used for testing Elasticsearch sink. +pub async fn ensure_elasticsearch() -> u16 { + *ELASTICSEARCH_PORT.get_or_init(|| { + std::thread::spawn(|| { + let container: ContainerRequest = ElasticSearch::default().into(); + + let container = container + .start() + .expect("Failed to start Elasticsearch container"); + + let port = container + .get_host_port_ipv4(9200) + .expect("Failed to get Elasticsearch port"); + + let _ = ELASTICSEARCH_CONTAINER.set(Mutex::new(Some(container))); + + port + }) + .join() + .expect("Failed to join container startup thread") + }) +} diff --git a/tests/elasticsearch_sink_tests.rs b/tests/elasticsearch_sink_tests.rs new file mode 100644 index 0000000..d423609 --- /dev/null +++ b/tests/elasticsearch_sink_tests.rs @@ -0,0 +1,288 @@ +//! Integration tests for Elasticsearch sink. +//! +//! Uses Elasticsearch testcontainer to test document indexing. + +#![cfg(feature = "sink-elasticsearch")] + +use chrono::Utc; +use elasticsearch::http::transport::Transport; +use elasticsearch::indices::IndicesRefreshParts; +use elasticsearch::{Elasticsearch, GetParts, SearchParts}; +use postgres_stream::sink::Sink; +use postgres_stream::sink::elasticsearch::{ElasticsearchSink, ElasticsearchSinkConfig}; +use postgres_stream::test_utils::ensure_elasticsearch; +use postgres_stream::types::{EventIdentifier, PgLsn, StreamId, TriggeredEvent}; +use uuid::Uuid; + +/// Creates a test event with the given payload key. +fn make_test_event(key: &str) -> TriggeredEvent { + TriggeredEvent { + id: EventIdentifier::new(Uuid::new_v4().to_string(), Utc::now()), + stream_id: StreamId::default(), + payload: serde_json::json!({ "key": key, "value": "test_data" }), + metadata: Some(serde_json::json!({ "source": "test" })), + lsn: Some(PgLsn::from(12345u64)), + } +} + +/// Creates an Elasticsearch client for testing. +async fn create_test_client(port: u16) -> Elasticsearch { + let url = format!("http://127.0.0.1:{port}"); + let transport = Transport::single_node(&url).expect("Failed to create transport"); + Elasticsearch::new(transport) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_elasticsearch_sink_indexes_events() { + let port = ensure_elasticsearch().await; + let index_name = format!("test-index-{}", Uuid::new_v4()); + + // Create sink. + let config = ElasticsearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: Some(index_name.clone()), + }; + + let sink = ElasticsearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Publish events. + let event1 = make_test_event("event1"); + let event2 = make_test_event("event2"); + let event1_id = event1.id.id.clone(); + let event2_id = event2.id.id.clone(); + + sink.publish_events(vec![event1, event2]) + .await + .expect("Failed to publish"); + + // Refresh index to make documents searchable. + let client = create_test_client(port).await; + client + .indices() + .refresh(IndicesRefreshParts::Index(&[&index_name])) + .send() + .await + .expect("Failed to refresh index"); + + // Verify documents exist. + let response = client + .get(GetParts::IndexId(&index_name, &event1_id)) + .send() + .await + .expect("Failed to get document"); + + assert!( + response.status_code().is_success(), + "Document not found: {event1_id}" + ); + + let response = client + .get(GetParts::IndexId(&index_name, &event2_id)) + .send() + .await + .expect("Failed to get document"); + + assert!( + response.status_code().is_success(), + "Document not found: {event2_id}" + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_elasticsearch_sink_handles_empty_batch() { + let port = ensure_elasticsearch().await; + let index_name = format!("test-index-{}", Uuid::new_v4()); + + let config = ElasticsearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: Some(index_name), + }; + + let sink = ElasticsearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Publishing empty batch should succeed without making any API calls. + sink.publish_events(vec![]) + .await + .expect("Empty batch should succeed"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_elasticsearch_sink_indexes_only_payload() { + let port = ensure_elasticsearch().await; + let index_name = format!("test-index-{}", Uuid::new_v4()); + + let config = ElasticsearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: Some(index_name.clone()), + }; + + let sink = ElasticsearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Create event with metadata. + let event = TriggeredEvent { + id: EventIdentifier::new(Uuid::new_v4().to_string(), Utc::now()), + stream_id: StreamId::default(), + payload: serde_json::json!({ "action": "created", "user_id": 456 }), + metadata: Some(serde_json::json!({ "source": "api" })), + lsn: Some(PgLsn::from(99999u64)), + }; + let event_id = event.id.id.clone(); + + sink.publish_events(vec![event]) + .await + .expect("Failed to publish"); + + // Refresh and retrieve document. + let client = create_test_client(port).await; + client + .indices() + .refresh(IndicesRefreshParts::Index(&[&index_name])) + .send() + .await + .expect("Failed to refresh"); + + let response = client + .get(GetParts::IndexId(&index_name, &event_id)) + .send() + .await + .expect("Failed to get document"); + + let body = response + .json::() + .await + .expect("Failed to parse response"); + + let source = &body["_source"]; + // Only payload fields should be present. + assert_eq!(source["action"], "created"); + assert_eq!(source["user_id"], 456); + // No envelope fields. + assert!(source.get("id").is_none()); + assert!(source.get("created_at").is_none()); + assert!(source.get("metadata").is_none()); + assert!(source.get("lsn").is_none()); + assert!(source.get("stream_id").is_none()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_elasticsearch_sink_searchable() { + let port = ensure_elasticsearch().await; + let index_name = format!("test-index-{}", Uuid::new_v4()); + + let config = ElasticsearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: Some(index_name.clone()), + }; + + let sink = ElasticsearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Publish events with different keys. + let events = vec![ + make_test_event("alpha"), + make_test_event("beta"), + make_test_event("gamma"), + ]; + + sink.publish_events(events) + .await + .expect("Failed to publish"); + + // Refresh index. + let client = create_test_client(port).await; + client + .indices() + .refresh(IndicesRefreshParts::Index(&[&index_name])) + .send() + .await + .expect("Failed to refresh"); + + // Search for documents. + let response = client + .search(SearchParts::Index(&[&index_name])) + .body(serde_json::json!({ + "query": { "match_all": {} } + })) + .send() + .await + .expect("Failed to search"); + + let body = response + .json::() + .await + .expect("Failed to parse response"); + + let hits = body["hits"]["total"]["value"] + .as_i64() + .expect("Missing hit count"); + assert_eq!(hits, 3, "Expected 3 documents"); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_elasticsearch_sink_uses_index_from_metadata() { + let port = ensure_elasticsearch().await; + let metadata_index = format!("metadata-index-{}", Uuid::new_v4()); + + // Create sink with NO default index. + let config = ElasticsearchSinkConfig { + url: format!("http://127.0.0.1:{port}"), + index: None, + }; + + let sink = ElasticsearchSink::new(config) + .await + .expect("Failed to create sink"); + + // Create event with index in metadata. + let event = TriggeredEvent { + id: EventIdentifier::new(Uuid::new_v4().to_string(), Utc::now()), + stream_id: StreamId::default(), + payload: serde_json::json!({ "routed": true }), + metadata: Some(serde_json::json!({ "index": metadata_index })), + lsn: None, + }; + let event_id = event.id.id.clone(); + + sink.publish_events(vec![event]) + .await + .expect("Failed to publish"); + + // Verify document was indexed to metadata-specified index. + let client = create_test_client(port).await; + client + .indices() + .refresh(IndicesRefreshParts::Index(&[&metadata_index])) + .send() + .await + .expect("Failed to refresh"); + + let response = client + .get(GetParts::IndexId(&metadata_index, &event_id)) + .send() + .await + .expect("Failed to get document"); + + assert!( + response.status_code().is_success(), + "Document not found in metadata-specified index" + ); + + let body = response + .json::() + .await + .expect("Failed to parse response"); + + assert_eq!(body["_source"]["routed"], true); +} + +#[test] +fn test_sink_name() { + assert_eq!(ElasticsearchSink::name(), "elasticsearch"); +}