Skip to content

Commit a94a732

Browse files
psteinroeclaude
andcommitted
feat(sink): add NATS sink
Add a new sink implementation that publishes events to NATS subjects. Features include: - Publishes JSON-serialized events to configurable subject - Uses async-nats client with automatic reconnection - Feature-gated behind `sink-nats` flag - Adds AnySink enum pattern for runtime sink dispatch 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent be5eb3a commit a94a732

8 files changed

Lines changed: 642 additions & 22 deletions

File tree

Cargo.lock

Lines changed: 293 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ version = "0.1.0"
66
[features]
77
default = []
88
test-utils = ["dep:ctor", "dep:testcontainers", "dep:testcontainers-modules"]
9+
sink-nats = ["dep:async-nats"]
910

1011
[dependencies]
1112
anyhow = { version = "1.0.98", default-features = false, features = ["std"] }
@@ -46,6 +47,9 @@ etl = { git = "https://github.com/supabase/etl", rev = "dd2987a55efc16a
4647
etl-postgres = { git = "https://github.com/supabase/etl", rev = "dd2987a55efc16aeb4402e4b06853c7731a6155a" }
4748
uuid = { version = "1.19.0", default-features = false, features = ["v4"] }
4849

50+
# Optional sink dependencies.
51+
async-nats = { version = "0.38", optional = true }
52+
4953
[target.'cfg(not(target_env = "msvc"))'.dependencies]
5054
tikv-jemalloc-ctl = { version = "0.6.0", default-features = false, features = ["stats"] }
5155
tikv-jemallocator = { version = "0.6.1", default-features = false, features = [
@@ -56,7 +60,7 @@ tikv-jemallocator = { version = "0.6.1", default-features = false, features = [
5660

5761
ctor = { version = "0.4", optional = true }
5862
testcontainers = { version = "0.23", optional = true, features = ["blocking"] }
59-
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "blocking"] }
63+
testcontainers-modules = { version = "0.11", optional = true, features = ["postgres", "nats", "blocking"] }
6064

6165
[dev-dependencies]
6266
temp-env = "0.3"

src/config/sink.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use serde::Deserialize;
22

3+
#[cfg(feature = "sink-nats")]
4+
use crate::sink::nats::NatsSinkConfig;
5+
36
/// Sink destination configuration.
47
///
58
/// Determines where replicated events are sent.
@@ -8,4 +11,8 @@ use serde::Deserialize;
811
pub enum SinkConfig {
912
/// In-memory sink for testing and development.
1013
Memory,
14+
15+
/// NATS sink for pub/sub messaging.
16+
#[cfg(feature = "sink-nats")]
17+
Nats(NatsSinkConfig),
1118
}

src/core.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use crate::{
66
config::{PipelineConfig, SinkConfig},
77
migrations::migrate_etl,
8-
sink::memory::MemorySink,
8+
sink::{AnySink, memory::MemorySink},
99
slot_recovery::{handle_slot_recovery, is_slot_invalidation_error},
1010
stream::PgStream,
1111
};
@@ -74,9 +74,22 @@ async fn run_pipeline(config: &PipelineConfig) -> EtlResult<()> {
7474
// Initialize state store for ETL pipeline state tracking
7575
let state_store = PostgresStore::new(config.stream.id, config.stream.pg_connection.clone());
7676

77-
// Create sink based on configuration
77+
// Create sink based on configuration.
7878
let sink = match &config.sink {
79-
SinkConfig::Memory => MemorySink::new(),
79+
SinkConfig::Memory => AnySink::Memory(MemorySink::new()),
80+
81+
#[cfg(feature = "sink-nats")]
82+
SinkConfig::Nats(cfg) => {
83+
use crate::sink::nats::NatsSink;
84+
let s = NatsSink::new(cfg.clone()).await.map_err(|e| {
85+
etl::etl_error!(
86+
etl::error::ErrorKind::InvalidData,
87+
"Failed to create NATS sink",
88+
e.to_string()
89+
)
90+
})?;
91+
AnySink::Nats(s)
92+
}
8093
};
8194

8295
// Create PgStream as an ETL destination
@@ -122,6 +135,11 @@ fn log_sink_config(config: &SinkConfig) {
122135
SinkConfig::Memory => {
123136
debug!("using memory sink");
124137
}
138+
139+
#[cfg(feature = "sink-nats")]
140+
SinkConfig::Nats(_cfg) => {
141+
debug!("using nats sink");
142+
}
125143
}
126144
}
127145

src/sink/mod.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,46 @@
1+
//! Sink implementations for publishing events to external systems.
2+
13
mod base;
24
pub mod memory;
35

6+
#[cfg(feature = "sink-nats")]
7+
pub mod nats;
8+
49
pub use base::Sink;
10+
11+
use etl::error::EtlResult;
12+
use memory::MemorySink;
13+
14+
#[cfg(feature = "sink-nats")]
15+
use nats::NatsSink;
16+
17+
use crate::types::TriggeredEvent;
18+
19+
/// Wrapper enum for all supported sink types.
20+
///
21+
/// Enables runtime sink selection while maintaining static dispatch.
22+
/// Each variant wraps a concrete sink implementation gated by its feature flag.
23+
#[derive(Clone)]
24+
pub enum AnySink {
25+
/// In-memory sink for testing and development.
26+
Memory(MemorySink),
27+
28+
/// NATS sink for pub/sub messaging.
29+
#[cfg(feature = "sink-nats")]
30+
Nats(NatsSink),
31+
}
32+
33+
impl Sink for AnySink {
34+
fn name() -> &'static str {
35+
"any"
36+
}
37+
38+
async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
39+
match self {
40+
AnySink::Memory(sink) => sink.publish_events(events).await,
41+
42+
#[cfg(feature = "sink-nats")]
43+
AnySink::Nats(sink) => sink.publish_events(events).await,
44+
}
45+
}
46+
}

src/sink/nats.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
//! NATS sink for publishing events to a NATS subject.
2+
//!
3+
//! Publishes each event as a JSON message to the configured subject.
4+
//! The sink maintains a persistent connection with automatic reconnection.
5+
6+
use async_nats::Client;
7+
use etl::error::EtlResult;
8+
use serde::Deserialize;
9+
use std::sync::Arc;
10+
use tracing::info;
11+
12+
use crate::sink::Sink;
13+
use crate::types::TriggeredEvent;
14+
15+
/// Configuration for the NATS sink.
16+
#[derive(Clone, Debug, Deserialize)]
17+
pub struct NatsSinkConfig {
18+
/// NATS server URL (e.g., "nats://localhost:4222").
19+
pub url: String,
20+
21+
/// Subject to publish messages to.
22+
pub subject: String,
23+
}
24+
25+
/// Sink that publishes events to a NATS subject.
26+
///
27+
/// Each event is serialized as JSON and published to the configured subject.
28+
/// The NATS client handles connection pooling and automatic reconnection.
29+
#[derive(Clone)]
30+
pub struct NatsSink {
31+
/// Shared NATS client connection.
32+
client: Arc<Client>,
33+
34+
/// Subject to publish messages to.
35+
subject: String,
36+
}
37+
38+
impl NatsSink {
39+
/// Creates a new NATS sink from configuration.
40+
///
41+
/// # Errors
42+
///
43+
/// Returns an error if the NATS connection cannot be established.
44+
pub async fn new(
45+
config: NatsSinkConfig,
46+
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
47+
let client = async_nats::connect(&config.url).await?;
48+
49+
Ok(Self {
50+
client: Arc::new(client),
51+
subject: config.subject,
52+
})
53+
}
54+
}
55+
56+
impl Sink for NatsSink {
57+
fn name() -> &'static str {
58+
"nats"
59+
}
60+
61+
async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
62+
if events.is_empty() {
63+
return Ok(());
64+
}
65+
66+
info!(
67+
"publishing {} events to NATS subject '{}'",
68+
events.len(),
69+
self.subject
70+
);
71+
72+
for event in &events {
73+
// Build JSON object manually since TriggeredEvent doesn't implement Serialize.
74+
let mut json_obj = serde_json::json!({
75+
"id": event.id.id,
76+
"created_at": event.id.created_at.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
77+
"payload": event.payload,
78+
"stream_id": format!("{:?}", event.stream_id),
79+
});
80+
81+
// Add optional fields.
82+
if let Some(ref metadata) = event.metadata {
83+
json_obj["metadata"] = metadata.clone();
84+
}
85+
if let Some(lsn) = event.lsn {
86+
json_obj["lsn"] = serde_json::json!(lsn.to_string());
87+
}
88+
89+
let payload = serde_json::to_vec(&json_obj).map_err(|e| {
90+
etl::etl_error!(
91+
etl::error::ErrorKind::InvalidData,
92+
"Failed to serialize event to JSON",
93+
e.to_string()
94+
)
95+
})?;
96+
97+
// Publish to the configured subject.
98+
self.client
99+
.publish(self.subject.clone(), payload.into())
100+
.await
101+
.map_err(|e| {
102+
etl::etl_error!(
103+
etl::error::ErrorKind::InvalidData,
104+
"Failed to publish event to NATS",
105+
e.to_string()
106+
)
107+
})?;
108+
}
109+
110+
Ok(())
111+
}
112+
}
113+
114+
#[cfg(test)]
115+
mod tests {
116+
use super::*;
117+
118+
#[test]
119+
fn test_sink_name() {
120+
assert_eq!(NatsSink::name(), "nats");
121+
}
122+
}

src/test_utils/container.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,39 @@ use ctor::dtor;
22
use etl::config::{PgConnectionConfig, TlsConfig};
33
use std::sync::{Mutex, OnceLock};
44
use testcontainers::{ContainerRequest, ImageExt, runners::SyncRunner};
5+
use testcontainers_modules::nats::Nats;
56
use testcontainers_modules::postgres::Postgres;
67
use uuid::Uuid;
78

89
static POSTGRES_PORT: OnceLock<u16> = OnceLock::new();
9-
// Using Mutex<Option<...>> so we can take ownership for cleanup
10+
// Using Mutex<Option<...>> so we can take ownership for cleanup.
1011
static POSTGRES_CONTAINER: OnceLock<Mutex<Option<testcontainers::Container<Postgres>>>> =
1112
OnceLock::new();
1213

13-
/// Cleanup function that runs at program exit to stop and remove the postgres container
14+
static NATS_PORT: OnceLock<u16> = OnceLock::new();
15+
// Using Mutex<Option<...>> so we can take ownership for cleanup.
16+
static NATS_CONTAINER: OnceLock<Mutex<Option<testcontainers::Container<Nats>>>> = OnceLock::new();
17+
18+
/// Cleanup function that runs at program exit to stop and remove the postgres container.
1419
#[dtor]
1520
fn cleanup_postgres_container() {
1621
if let Some(mutex) = POSTGRES_CONTAINER.get() {
1722
if let Ok(mut guard) = mutex.lock() {
1823
if let Some(container) = guard.take() {
19-
// rm() stops and removes the container
24+
// rm() stops and removes the container.
25+
let _ = container.rm();
26+
}
27+
}
28+
}
29+
}
30+
31+
/// Cleanup function that runs at program exit to stop and remove the NATS container.
32+
#[dtor]
33+
fn cleanup_nats_container() {
34+
if let Some(mutex) = NATS_CONTAINER.get() {
35+
if let Ok(mut guard) = mutex.lock() {
36+
if let Some(container) = guard.take() {
37+
// rm() stops and removes the container.
2038
let _ = container.rm();
2139
}
2240
}
@@ -74,3 +92,30 @@ pub async fn test_pg_config() -> PgConnectionConfig {
7492
keepalive: None,
7593
}
7694
}
95+
96+
/// Ensures a NATS container is running and returns its port.
97+
///
98+
/// Uses a shared container across all tests. The container is started lazily
99+
/// on first use and cleaned up automatically when the test process exits.
100+
pub async fn ensure_nats() -> u16 {
101+
// Use get_or_init to handle concurrent initialization attempts.
102+
*NATS_PORT.get_or_init(|| {
103+
// Run container startup in a separate thread to avoid runtime-in-runtime panic.
104+
std::thread::spawn(|| {
105+
let container: ContainerRequest<Nats> = Nats::default().into();
106+
107+
let container = container.start().expect("Failed to start NATS container");
108+
109+
let port = container
110+
.get_host_port_ipv4(4222)
111+
.expect("Failed to get NATS container port");
112+
113+
// Store the container in a Mutex<Option<...>> so we can take ownership later for cleanup.
114+
let _ = NATS_CONTAINER.set(Mutex::new(Some(container)));
115+
116+
port
117+
})
118+
.join()
119+
.expect("Failed to join NATS container startup thread")
120+
})
121+
}

0 commit comments

Comments
 (0)