diff --git a/ADD-SINK.md b/ADD-SINK.md new file mode 100644 index 000000000..d0057e4f1 --- /dev/null +++ b/ADD-SINK.md @@ -0,0 +1,116 @@ +# Adding a New Sink to Sequin + +This document outlines the steps required to add a new sink to Sequin. A sink is a destination where Sequin can send data changes. + +## Steps: + +### [Backend] Add the sink schema + +Create a new sink schema in `lib/sequin/consumers/` (e.g., `my_sink.ex`). The schema should: +- Use `Ecto.Schema` and `TypedEctoSchema` +- Define required fields and their types +- Implement validation in a changeset function + +Example: `lib/sequin/consumers/kafka_sink.ex` + +### [Backend] Add the sink type to SinkConsumer + +Update `lib/sequin/consumers/sink_consumer.ex`: +- Add the new sink type to the `@sink_types` list +- Add the new module to the `sink_module/1` mapping + +### [Backend] Create the sink client + +Create a new client in `lib/sequin/sinks/` (e.g., `my_sink/client.ex`). The client should: +- Handle API communication with the sink service. Use Req if the sink uses HTTP. Otherwise we may need to bring in a library like AWS or :brod. +- Implement `test_connection/1` for connection validation +- Implement methods for sending data (e.g., `append_records/2`) +- Handle error cases and logging appropriately +- If using Req, we can support testing with req_opts. If not, we need a client behavior and for the client to implement that behavior. + +For HTTP see: lib/sequin/sinks/typesense/client.ex +For non-http see: lib/sequin/sinks/kafka/kafka.ex and lib/sequin/sinks/kafka/client.ex + +### [Backend] Add the sink pipeline + +Create a new pipeline in `lib/sequin/runtime/` (e.g., `my_sink_pipeline.ex`). The pipeline should: +- Implement the `Sequin.Runtime.SinkPipeline` behaviour +- Define batching configuration +- Handle message transformation and delivery +- Implement error handling and retries + +Example: `lib/sequin/runtime/kafka_pipeline.ex` + + +### [Backend] Update the pipeline registry + +Add the new sink type to `lib/sequin/runtime/sink_pipeline.ex` in the `pipeline_module/1` function. + +### [Backend] Add transforms support + +Update `lib/sequin/transforms/transforms.ex`: +- Add `to_external/2` function for the new sink type +- Add parsing support in `parse_sink/2` + +### [Backend] Update configuration + +Update relevant config files (e.g., `config/test.exs`) to add any necessary configuration for the new sink. + +### [Frontend] Add sink type to TypeScript types + +Update `assets/svelte/consumers/types.ts`: +- Add new sink type interface +- Update the Consumer union type + +### [Frontend] Create sink components + +Create new components in `assets/svelte/sinks/my_sink/`: +- `MySinkIcon.svelte` - Sink icon component +- `MySinkSinkCard.svelte` - Display component for sink details +- `MySinkSinkForm.svelte` - Form component for sink configuration + +### [Frontend] Update consumer components + +Update the following components to include the new sink: +- `assets/svelte/consumers/ShowSink.svelte` +- `assets/svelte/consumers/ShowSinkHeader.svelte` +- `assets/svelte/consumers/SinkConsumerForm.svelte` +- `assets/svelte/consumers/SinkIndex.svelte` + +### [Frontend] Update consumer form handler + +Update `lib/sequin_web/live/components/consumer_form.ex`: +- Add sink-specific connection testing +- Add encoding/decoding functions +- Update the sink title helper + +### [Frontend] Update live view handlers + +Update relevant live view handlers in `lib/sequin_web/live/`: +- Add sink-specific handling in show.ex +- Update any relevant forms or displays + + +### [Tests] Update existing tests + +Update: +- Factory modules in `test/support/factory/` +- YAML loader tests +- Consumer form tests + +### [Tests] Add test coverage + +Create a minimal pipeline test with a mock client or req adapter. See: +- test/sequin/kafka_pipeline_test.exs OR +- test/sequin/typesense_pipeline_test.exs + +Also create tests for: +- Sink schema and changeset validation +- Client functionality and error handling + +### [DOCS] Add reference, how-to, and quickstart docs + +See: +- docs/reference/sinks/kafka.mdx +- docs/how-to/stream-postgres-to-kafka.mdx +- docs/quickstart/kafka.mdx \ No newline at end of file diff --git a/assets/svelte/consumers/ShowSink.svelte b/assets/svelte/consumers/ShowSink.svelte index f3c7a4447..e6a0963b0 100644 --- a/assets/svelte/consumers/ShowSink.svelte +++ b/assets/svelte/consumers/ShowSink.svelte @@ -27,6 +27,7 @@ ElasticsearchConsumer, RedisStringConsumer, AzureEventHubConsumer, + PostgresSinkConsumer, } from "./types"; import AzureEventHubSinkCard from "../sinks/azure_event_hub/AzureEventHubSinkCard.svelte"; import ElasticsearchSinkCard from "../sinks/elasticsearch/ElasticsearchSinkCard.svelte"; @@ -48,6 +49,8 @@ import HealthAlerts from "$lib/health/HealthAlerts.svelte"; import { Button } from "$lib/components/ui/button"; import CollapsibleCode from "../components/CollapsibleCode.svelte"; + import PostgresSinkCard from "../sinks/postgres/PostgresSinkCard.svelte"; + import type { Table } from "../databases/types"; export let live; export let parent; @@ -161,6 +164,12 @@ return consumer.sink.type === "rabbitmq"; } + function isPostgresSinkConsumer( + consumer: Consumer, + ): consumer is PostgresSinkConsumer { + return consumer.sink.type === "postgres"; + } + let chartElement; let updateChart; let resizeObserver; @@ -761,6 +770,15 @@ unit: units[unitIndex], }; } + + function getRoutingCode(consumer: Consumer): string | null { + if (!consumer.routing || !consumer.routing.function) return null; + const func = consumer.routing.function; + if (func.type === "routing" && "code" in func) { + return func.code; + } + return null; + }
@@ -1254,6 +1272,8 @@ {:else if isRedisStringConsumer(consumer)} + {:else if isPostgresSinkConsumer(consumer)} + {/if} diff --git a/assets/svelte/consumers/ShowSinkHeader.svelte b/assets/svelte/consumers/ShowSinkHeader.svelte index fb7df76f3..7edc03f9d 100644 --- a/assets/svelte/consumers/ShowSinkHeader.svelte +++ b/assets/svelte/consumers/ShowSinkHeader.svelte @@ -32,6 +32,7 @@ import AzureEventHubIcon from "../sinks/azure_event_hub/AzureEventHubIcon.svelte"; import TypesenseIcon from "../sinks/typesense/TypesenseIcon.svelte"; import ElasticsearchIcon from "../sinks/elasticsearch/ElasticsearchIcon.svelte"; + import PostgresIcon from "../sinks/postgres/PostgresIcon.svelte"; import StopSinkModal from "./StopSinkModal.svelte"; import { Badge } from "$lib/components/ui/badge"; @@ -162,6 +163,8 @@ {:else if consumer.sink.type === "s2"} + {:else if consumer.sink.type === "postgres"} + {/if}

{consumer.name} diff --git a/assets/svelte/consumers/SinkConsumerForm.svelte b/assets/svelte/consumers/SinkConsumerForm.svelte index 08d35aa30..f5b56a65f 100644 --- a/assets/svelte/consumers/SinkConsumerForm.svelte +++ b/assets/svelte/consumers/SinkConsumerForm.svelte @@ -36,6 +36,7 @@ import TypesenseSinkForm from "$lib/sinks/typesense/TypesenseSinkForm.svelte"; import MeilisearchSinkForm from "$lib/sinks/meilisearch/MeilisearchSinkForm.svelte"; import ElasticsearchSinkForm from "$lib/sinks/elasticsearch/ElasticsearchSinkForm.svelte"; + import PostgresSinkForm from "$lib/sinks/postgres/PostgresSinkForm.svelte"; import * as Alert from "$lib/components/ui/alert/index.js"; import SchemaTableSelector from "../components/SchemaTableSelector.svelte"; import * as Tooltip from "$lib/components/ui/tooltip"; @@ -731,6 +732,14 @@ {functions} {functionRefreshState} /> + {:else if consumer.type === "postgres"} + {:else if consumer.type === "kafka"} = { }, }, }, + postgres: { + fields: { + table_name: { + description: "Postgres table name", + staticValue: "", + staticFormField: "table_name", + dynamicDefault: "", + }, + }, + }, }; diff --git a/assets/svelte/consumers/types.ts b/assets/svelte/consumers/types.ts index 77a482de4..a820fda1c 100644 --- a/assets/svelte/consumers/types.ts +++ b/assets/svelte/consumers/types.ts @@ -33,6 +33,7 @@ export type BaseConsumer = { message_grouping: boolean; ack_wait_ms: number; max_ack_pending: number; + max_retry_count: number; max_deliver: number; max_waiting: number; inserted_at: string; @@ -247,6 +248,20 @@ export type ElasticsearchConsumer = BaseConsumer & { }; }; +// PostgreSQL specific sink +export type PostgresSinkConsumer = BaseConsumer & { + sink: { + type: "postgres"; + host: string; + port: number; + database: string; + table_name: string; + username: string; + ssl: boolean; + routing_mode: "dynamic" | "static"; + }; +}; + // Union type for all consumer types export type Consumer = | HttpPushConsumer @@ -263,7 +278,8 @@ export type Consumer = | TypesenseConsumer | SnsConsumer | ElasticsearchConsumer - | RedisStringConsumer; + | RedisStringConsumer + | PostgresSinkConsumer; export const SinkTypeValues = [ "http_push", @@ -281,6 +297,7 @@ export const SinkTypeValues = [ "meilisearch", "elasticsearch", "redis_string", + "postgres", ] as const; export type SinkType = (typeof SinkTypeValues)[number]; @@ -301,6 +318,7 @@ export const RoutedSinkTypeValues = [ "sqs", "sns", "kinesis", + "postgres", ] as const; export type RoutedSinkType = (typeof RoutedSinkTypeValues)[number]; diff --git a/assets/svelte/sinks/postgres/PostgresIcon.svelte b/assets/svelte/sinks/postgres/PostgresIcon.svelte new file mode 100644 index 000000000..5145118e9 --- /dev/null +++ b/assets/svelte/sinks/postgres/PostgresIcon.svelte @@ -0,0 +1,58 @@ + + + + + + + + + + + + + + + + + + diff --git a/assets/svelte/sinks/postgres/PostgresSinkCard.svelte b/assets/svelte/sinks/postgres/PostgresSinkCard.svelte new file mode 100644 index 000000000..55e102be8 --- /dev/null +++ b/assets/svelte/sinks/postgres/PostgresSinkCard.svelte @@ -0,0 +1,119 @@ + + + + +
+

PostgreSQL Configuration

+
+ +
+
+ Host +
+
+ {consumer.sink.host} +
+
+
+ +
+ Port +
+ {consumer.sink.port} +
+
+ +
+ Database +
+ {consumer.sink.database} +
+
+ +
+ Table Name +
+ {consumer.sink.table_name} +
+
+ +
+ SSL Enabled +
+ {consumer.sink.ssl ? "Yes" : "No"} +
+
+
+
+
+ + + + Routing + + +
+ Table +
+ + {#if consumer.routing_id} + Determined by router + + {:else} + {consumer.sink.table_name} + {/if} + +
+
+ {#if consumer.routing} + {#if getRoutingCode(consumer)} +
+ Router +
+
{getRoutingCode(consumer)}
+
+
+ {/if} + {/if} +
+
diff --git a/assets/svelte/sinks/postgres/PostgresSinkForm.svelte b/assets/svelte/sinks/postgres/PostgresSinkForm.svelte new file mode 100644 index 000000000..112b502dd --- /dev/null +++ b/assets/svelte/sinks/postgres/PostgresSinkForm.svelte @@ -0,0 +1,156 @@ + + + + + PostgreSQL Configuration + + +
+ + + {#if errors.sink?.host} +

{errors.sink.host}

+ {/if} +
+ +
+ + + {#if errors.sink?.port} +

{errors.sink.port}

+ {/if} +
+ +
+ + + {#if errors.sink?.database} +

{errors.sink.database}

+ {/if} +
+ +
+ + + {#if errors.sink?.username} +

{errors.sink.username}

+ {/if} +
+ +
+ +
+ + +
+ {#if errors.sink?.password} +

{errors.sink.password}

+ {/if} +
+ + +
+
+ { + form.sink.ssl = checked; + }} + /> + +
+ {#if errors.sink?.ssl} +

{errors.sink.ssl}

+ {/if} +
+
+
+ + + + Routing + + + + + {#if !isDynamicRouting} +
+ + + {#if errors.sink?.table_name} +

{errors.sink.table_name}

+ {/if} +
+ {/if} +
+
diff --git a/config/test.exs b/config/test.exs index 9ded40b1b..268dbaff2 100644 --- a/config/test.exs +++ b/config/test.exs @@ -113,6 +113,7 @@ config :sequin, kafka_module: Sequin.Sinks.KafkaMock, nats_module: Sequin.Sinks.NatsMock, rabbitmq_module: Sequin.Sinks.RabbitMqMock, + postgres_module: Sequin.Sinks.PostgresMock, aws_module: Sequin.AwsMock, # Arbitrarily high memory limit for testing max_memory_bytes: 100 * 1024 * 1024 * 1024, diff --git a/lib/sequin/application.ex b/lib/sequin/application.ex index 43ec602de..fd0ceee89 100644 --- a/lib/sequin/application.ex +++ b/lib/sequin/application.ex @@ -90,6 +90,7 @@ defmodule Sequin.Application do Sequin.Sinks.Kafka.ConnectionCache, Sequin.Sinks.Nats.ConnectionCache, Sequin.Sinks.RabbitMq.ConnectionCache, + Sequin.Sinks.Postgres.ConnectionCache, SequinWeb.Presence, Sequin.SystemMetricsServer, {Task, fn -> enqueue_workers() end}, diff --git a/lib/sequin/consumers/postgres_sink.ex b/lib/sequin/consumers/postgres_sink.ex new file mode 100644 index 000000000..adb6bc7e6 --- /dev/null +++ b/lib/sequin/consumers/postgres_sink.ex @@ -0,0 +1,53 @@ +defmodule Sequin.Consumers.PostgresSink do + @moduledoc false + use Ecto.Schema + use TypedEctoSchema + + import Ecto.Changeset + + alias __MODULE__ + alias Sequin.Encrypted.Field, as: EncryptedField + + @derive {Jason.Encoder, only: [:host, :port, :database, :table_name]} + @derive {Inspect, except: [:password]} + @primary_key false + typed_embedded_schema do + field :type, Ecto.Enum, values: [:postgres], default: :postgres + field :host, :string + field :port, :integer, default: 5432 + field :database, :string + field :table_name, :string + field :username, :string + field :password, EncryptedField + field :ssl, :boolean, default: false + field :routing_mode, Ecto.Enum, values: [:dynamic, :static] + field :connection_id, :string + end + + @doc false + def changeset(struct, params) do + struct + |> cast(params, [:host, :port, :database, :table_name, :username, :password, :ssl, :routing_mode]) + |> validate_required([:host, :port, :database, :table_name]) + |> validate_number(:port, greater_than: 0, less_than: 65_536) + |> put_new_connection_id() + end + + def conn_opts(%PostgresSink{} = sink) do + [ + hostname: sink.host, + port: sink.port, + username: sink.username, + password: sink.password, + database: sink.database, + ssl: if(sink.ssl, do: [verify: :verify_none], else: false) + ] + end + + defp put_new_connection_id(changeset) do + case get_field(changeset, :connection_id) do + nil -> put_change(changeset, :connection_id, Ecto.UUID.generate()) + _ -> changeset + end + end +end diff --git a/lib/sequin/consumers/routing_function.ex b/lib/sequin/consumers/routing_function.ex index 7c7c5ee68..e966b19c0 100644 --- a/lib/sequin/consumers/routing_function.ex +++ b/lib/sequin/consumers/routing_function.ex @@ -29,6 +29,7 @@ defmodule Sequin.Consumers.RoutingFunction do :elasticsearch, :s2, :rabbitmq, + :postgres, :sqs ] diff --git a/lib/sequin/consumers/sink_consumer.ex b/lib/sequin/consumers/sink_consumer.ex index a61b71b2b..71a465d9e 100644 --- a/lib/sequin/consumers/sink_consumer.ex +++ b/lib/sequin/consumers/sink_consumer.ex @@ -19,6 +19,7 @@ defmodule Sequin.Consumers.SinkConsumer do alias Sequin.Consumers.KinesisSink alias Sequin.Consumers.MeilisearchSink alias Sequin.Consumers.NatsSink + alias Sequin.Consumers.PostgresSink alias Sequin.Consumers.RabbitMqSink alias Sequin.Consumers.RedisStreamSink alias Sequin.Consumers.RedisStringSink @@ -36,22 +37,23 @@ defmodule Sequin.Consumers.SinkConsumer do @type not_visible_until :: DateTime.t() @types [ + :azure_event_hub, + :elasticsearch, + :gcp_pubsub, :http_push, - :sqs, + :kafka, :kinesis, - :s2, + :meilisearch, + :nats, + :postgres, + :rabbitmq, :redis_stream, :redis_string, - :kafka, + :s2, :sequin_stream, - :gcp_pubsub, - :nats, - :rabbitmq, - :azure_event_hub, - :typesense, - :meilisearch, :sns, - :elasticsearch + :sqs, + :typesense ] # This is a module attribute to compile the types into the schema @@ -121,22 +123,23 @@ defmodule Sequin.Consumers.SinkConsumer do polymorphic_embeds_one(:sink, types: [ + azure_event_hub: AzureEventHubSink, + elasticsearch: ElasticsearchSink, + gcp_pubsub: GcpPubsubSink, http_push: HttpPushSink, - sqs: SqsSink, + kafka: KafkaSink, kinesis: KinesisSink, - s2: S2Sink, - sns: SnsSink, + meilisearch: MeilisearchSink, + nats: NatsSink, + postgres: PostgresSink, + rabbitmq: RabbitMqSink, redis_stream: RedisStreamSink, redis_string: RedisStringSink, - kafka: KafkaSink, + s2: S2Sink, sequin_stream: SequinStreamSink, - gcp_pubsub: GcpPubsubSink, - nats: NatsSink, - rabbitmq: RabbitMqSink, - azure_event_hub: AzureEventHubSink, - typesense: TypesenseSink, - meilisearch: MeilisearchSink, - elasticsearch: ElasticsearchSink + sns: SnsSink, + sqs: SqsSink, + typesense: TypesenseSink ], on_replace: :update, type_field_name: :type diff --git a/lib/sequin/runtime/postgres_pipeline.ex b/lib/sequin/runtime/postgres_pipeline.ex new file mode 100644 index 000000000..4e9d8eede --- /dev/null +++ b/lib/sequin/runtime/postgres_pipeline.ex @@ -0,0 +1,40 @@ +defmodule Sequin.Runtime.PostgresPipeline do + @moduledoc false + @behaviour Sequin.Runtime.SinkPipeline + + alias Sequin.Consumers.PostgresSink + alias Sequin.Consumers.SinkConsumer + alias Sequin.Runtime.SinkPipeline + alias Sequin.Sinks.Postgres + alias Sequin.Transforms.Message + + @impl SinkPipeline + def init(context, _opts) do + context + end + + @impl SinkPipeline + def batchers_config(_consumer) do + [default: [concurrency: 10, batch_size: 1, batch_timeout: 1]] + end + + @impl SinkPipeline + def handle_message(message, context) do + {:ok, message, context} + end + + @impl SinkPipeline + def handle_batch(:default, messages, _batch_info, context) do + %{consumer: %SinkConsumer{sink: %PostgresSink{} = sink} = consumer} = context + + records = + Enum.map(messages, fn %{data: data} -> + Message.to_external(consumer, data) + end) + + case Postgres.insert_records(sink, records) do + :ok -> {:ok, messages, context} + {:error, error} -> {:error, error} + end + end +end diff --git a/lib/sequin/runtime/routing/consumers/postgres.ex b/lib/sequin/runtime/routing/consumers/postgres.ex new file mode 100644 index 000000000..383c935ac --- /dev/null +++ b/lib/sequin/runtime/routing/consumers/postgres.ex @@ -0,0 +1,29 @@ +defmodule Sequin.Runtime.Routing.Consumers.Postgres do + @moduledoc false + use Sequin.Runtime.Routing.RoutedConsumer + + @primary_key false + @derive {Jason.Encoder, only: [:table_name]} + typed_embedded_schema do + field :table_name, :string + end + + def changeset(struct, params) do + allowed_keys = [:table_name] + + struct + |> cast(params, allowed_keys, empty_values: []) + |> Routing.Helpers.validate_no_extra_keys(params, allowed_keys) + |> validate_required([:table_name]) + |> validate_length(:table_name, min: 1, max: 512) + |> validate_format(:table_name, ~r/^[^\s]+$/, message: "cannot contain whitespace") + end + + def route(_action, _record, _changes, metadata) do + %{table_name: metadata.table_name} + end + + def route_consumer(%Sequin.Consumers.SinkConsumer{sink: sink}) do + %{table_name: sink.table_name} + end +end diff --git a/lib/sequin/runtime/sink_pipeline.ex b/lib/sequin/runtime/sink_pipeline.ex index 4feb7550d..2244e8864 100644 --- a/lib/sequin/runtime/sink_pipeline.ex +++ b/lib/sequin/runtime/sink_pipeline.ex @@ -421,6 +421,7 @@ defmodule Sequin.Runtime.SinkPipeline do :kinesis -> Sequin.Runtime.KinesisPipeline :nats -> Sequin.Runtime.NatsPipeline :rabbitmq -> Sequin.Runtime.RabbitMqPipeline + :postgres -> Sequin.Runtime.PostgresPipeline :redis_stream -> Sequin.Runtime.RedisStreamPipeline :redis_string -> Sequin.Runtime.RedisStringPipeline :sequin_stream -> Sequin.Runtime.SequinStreamPipeline diff --git a/lib/sequin/sinks/postgres/connection_cache.ex b/lib/sequin/sinks/postgres/connection_cache.ex new file mode 100644 index 000000000..5aa63179b --- /dev/null +++ b/lib/sequin/sinks/postgres/connection_cache.ex @@ -0,0 +1,231 @@ +defmodule Sequin.Sinks.Postgres.ConnectionCache do + @moduledoc """ + Cache connections to customer Postgres databases. + + By caching these connections, we can avoid paying a significant startup + penalty when performing multiple operations on the same Postgres database. + + Each `Sequin.Consumers.PostgresSink` gets its own connection in the cache. + + The cache takes ownership of the Postgres connections and is responsible for + closing them when they are invalidated (or when the cache is stopped). Thus, + callers should not call `GenServer.stop/1` on these connections. + + Cached connections are invalidated and recreated when their Postgres sink's + connection options change. + + The cache will detect dead connections and create new ones as needed. + """ + + use GenServer + + alias Sequin.Consumers.PostgresSink + + require Logger + + defmodule Cache do + @moduledoc false + + @type sink :: PostgresSink.t() + @type entry :: %{ + conn: atom(), + options_hash: binary() + } + @type t :: %{binary() => entry()} + + @spec new :: t() + def new, do: %{} + + @spec each(t(), (atom() -> any())) :: :ok + def each(cache, function) do + Enum.each(cache, fn {_id, entry} -> function.(entry.conn) end) + end + + @spec lookup(t(), sink()) :: {:ok, atom()} | {:error, :stale} | {:error, :not_found} + def lookup(cache, sink) do + new_hash = options_hash(sink) + entry = Map.get(cache, sink.connection_id) + + cond do + is_nil(entry) -> + {:error, :not_found} + + is_pid(entry.conn) and !Process.alive?(entry.conn) -> + Logger.warning("Cached Postgres connection was dead upon lookup", sink_id: sink.connection_id) + {:error, :not_found} + + entry.options_hash != new_hash -> + Logger.info("Cached Postgres sink connection was stale", sink_id: sink.connection_id) + {:error, :stale} + + true -> + {:ok, entry.conn} + end + end + + @spec pop(t(), sink()) :: {atom() | nil, t()} + def pop(cache, sink) do + {entry, new_cache} = Map.pop(cache, sink.connection_id, nil) + + if entry, do: {entry.conn, new_cache}, else: {nil, new_cache} + end + + @spec store(t(), sink(), atom()) :: t() + def store(cache, sink, conn) do + entry = %{conn: conn, options_hash: options_hash(sink)} + Map.put(cache, sink.connection_id, entry) + end + + defp options_hash(sink) do + :erlang.phash2(PostgresSink.conn_opts(sink)) + end + end + + defmodule State do + @moduledoc false + use TypedStruct + + alias Sequin.Consumers.PostgresSink + alias Sequin.Error + + @type sink :: PostgresSink.t() + @type opt :: {:start_fn, State.start_function()} | {:stop_fn, State.stop_function()} + @type start_function :: (sink() -> start_result()) + @type start_result :: {:ok, atom()} | {:error, Error.t()} + @type stop_function :: (atom() -> :ok) + + typedstruct do + field :cache, Cache.t(), default: Cache.new() + field :start_fn, start_function() + field :stop_fn, stop_function() + end + + @spec new([opt]) :: t() + def new(opts) do + start_fn = Keyword.get(opts, :start_fn, &default_start/1) + stop_fn = Keyword.get(opts, :stop_fn, &GenServer.stop/1) + + %__MODULE__{ + start_fn: start_fn, + stop_fn: stop_fn + } + end + + @spec find_or_create_connection(t(), sink(), boolean()) :: {:ok, pid(), t()} | {:error, term()} + def find_or_create_connection(%__MODULE__{} = state, sink, create_on_miss) do + case Cache.lookup(state.cache, sink) do + {:ok, conn} -> + {:ok, conn, state} + + {:error, :stale} -> + state + |> invalidate_connection(sink) + |> find_or_create_connection(sink, create_on_miss) + + {:error, :not_found} when create_on_miss -> + with {:ok, conn} <- state.start_fn.(sink) do + new_cache = Cache.store(state.cache, sink, conn) + new_state = %{state | cache: new_cache} + {:ok, conn, new_state} + end + + {:error, :not_found} -> + {:error, :not_found} + end + end + + @spec invalidate_all(t()) :: t() + def invalidate_all(%__MODULE__{} = state) do + Cache.each(state.cache, state.stop_fn) + + %{state | cache: Cache.new()} + end + + @spec invalidate_connection(t(), sink()) :: t() + def invalidate_connection(%__MODULE__{} = state, sink) do + {conn, new_cache} = Cache.pop(state.cache, sink) + + if conn, do: state.stop_fn.(conn) + + %{state | cache: new_cache} + end + + defp default_start(%PostgresSink{} = sink) do + opts = PostgresSink.conn_opts(sink) + + case Postgrex.start_link(opts) do + {:ok, pid} -> {:ok, pid} + {:error, {:already_started, client_pid}} -> {:ok, client_pid} + {:error, reason} -> {:error, reason} + end + end + end + + @type sink :: PostgresSink.t() + @type opt :: State.opt() + @type start_result :: State.start_result() + + @spec start_link([opt]) :: GenServer.on_start() + def start_link(opts) do + name = Keyword.get(opts, :name, __MODULE__) + GenServer.start_link(__MODULE__, opts, name: name) + end + + @spec connection(sink()) :: start_result() + @spec connection(GenServer.server(), sink()) :: start_result() + def connection(server \\ __MODULE__, %PostgresSink{} = sink) do + GenServer.call(server, {:connection, sink, true}) + end + + @spec invalidate_connection(GenServer.server(), sink()) :: :ok + def invalidate_connection(server \\ __MODULE__, %PostgresSink{} = sink) do + GenServer.cast(server, {:invalidate_connection, sink}) + end + + # This function is intended for test purposes only + @spec cache_connection(GenServer.server(), sink(), pid()) :: :ok + def cache_connection(server \\ __MODULE__, %PostgresSink{} = sink, conn) do + GenServer.call(server, {:cache_connection, sink, conn}) + end + + @impl GenServer + def init(opts) do + Process.flag(:trap_exit, true) + + {:ok, State.new(opts)} + end + + @impl GenServer + def handle_call({:connection, %PostgresSink{} = sink, create_on_miss}, _from, %State{} = state) do + case State.find_or_create_connection(state, sink, create_on_miss) do + {:ok, conn, new_state} -> + {:reply, {:ok, conn}, new_state} + + {:error, :not_found} -> + {:reply, {:error, Sequin.Error.not_found(entity: :database_connection)}, state} + + error -> + {:reply, error, state} + end + end + + # This function is intended for test purposes only + @impl GenServer + def handle_call({:cache_connection, %PostgresSink{} = sink, conn}, _from, %State{} = state) do + new_cache = Cache.store(state.cache, sink, conn) + new_state = %{state | cache: new_cache} + {:reply, :ok, new_state} + end + + @impl GenServer + def handle_cast({:invalidate_connection, %PostgresSink{} = sink}, %State{} = state) do + new_state = State.invalidate_connection(state, sink) + {:noreply, new_state} + end + + @impl GenServer + def terminate(_reason, %State{} = state) do + _new_state = State.invalidate_all(state) + :ok + end +end diff --git a/lib/sequin/sinks/postgres/implementation.ex b/lib/sequin/sinks/postgres/implementation.ex new file mode 100644 index 000000000..964cc49f5 --- /dev/null +++ b/lib/sequin/sinks/postgres/implementation.ex @@ -0,0 +1,36 @@ +defmodule Sequin.Sinks.Postgres.Implementation do + @moduledoc false + @behaviour Sequin.Sinks.Postgres + + alias Sequin.Consumers.PostgresSink + alias Sequin.Sinks.Postgres + alias Sequin.Sinks.Postgres.ConnectionCache + + @impl Postgres + def insert_records(%PostgresSink{} = sink, [record]) do + action = record[:action] + record = record[:record] + changes = record[:changes] + metadata = record[:metadata] + + with {:ok, conn} <- ConnectionCache.connection(sink), + {:ok, _} <- + Postgrex.query( + conn, + "INSERT INTO #{sink.table_name} (action, record, changes, metadata) VALUES ($1, $2, $3, $4)", + [action, record, changes, metadata] + ) do + :ok + end + end + + @impl Postgres + def upsert_records(_sink, _records) do + raise "Not implemented" + end + + @impl Postgres + def test_connection(_sink) do + raise "Not implemented" + end +end diff --git a/lib/sequin/sinks/postgres/postgres.ex b/lib/sequin/sinks/postgres/postgres.ex new file mode 100644 index 000000000..78aaa4bc6 --- /dev/null +++ b/lib/sequin/sinks/postgres/postgres.ex @@ -0,0 +1,27 @@ +defmodule Sequin.Sinks.Postgres do + @moduledoc false + alias Sequin.Consumers.PostgresSink + alias Sequin.Error + alias Sequin.Sinks.Postgres.Implementation + + @module Application.compile_env(:sequin, :postgres_module, Implementation) + + @callback insert_records(PostgresSink.t(), [map()]) :: :ok | {:error, Error.t()} + @callback upsert_records(PostgresSink.t(), [map()]) :: :ok | {:error, Error.t()} + @callback test_connection(PostgresSink.t()) :: :ok | {:error, Error.t()} + + @spec insert_records(PostgresSink.t(), [map()]) :: :ok | {:error, Error.t()} + def insert_records(%PostgresSink{} = sink, records) do + @module.insert_records(sink, records) + end + + @spec upsert_records(PostgresSink.t(), [map()]) :: :ok | {:error, Error.t()} + def upsert_records(%PostgresSink{} = sink, records) do + @module.upsert_records(sink, records) + end + + @spec test_connection(PostgresSink.t()) :: :ok | {:error, Error.t()} + def test_connection(%PostgresSink{} = sink) do + @module.test_connection(sink) + end +end diff --git a/lib/sequin/transforms/transforms.ex b/lib/sequin/transforms/transforms.ex index cda95b793..836fa2e50 100644 --- a/lib/sequin/transforms/transforms.ex +++ b/lib/sequin/transforms/transforms.ex @@ -17,6 +17,7 @@ defmodule Sequin.Transforms do alias Sequin.Consumers.MeilisearchSink alias Sequin.Consumers.NatsSink alias Sequin.Consumers.PathFunction + alias Sequin.Consumers.PostgresSink alias Sequin.Consumers.RabbitMqSink alias Sequin.Consumers.RedisStreamSink alias Sequin.Consumers.RedisStringSink @@ -250,6 +251,19 @@ defmodule Sequin.Transforms do }) end + def to_external(%PostgresSink{} = sink, show_sensitive) do + reject_nil_values(%{ + type: "postgres", + host: sink.host, + port: sink.port, + database: sink.database, + username: sink.username, + password: SensitiveValue.new(sink.password, show_sensitive), + table_name: sink.table_name, + routing_mode: sink.routing_mode + }) + end + def to_external(%RedisStreamSink{} = sink, show_sensitive) do reject_nil_values(%{ type: "redis_stream", @@ -1185,6 +1199,21 @@ defmodule Sequin.Transforms do }} end + defp parse_sink(%{"type" => "postgres"} = attrs, _resources) do + {:ok, + %{ + type: :postgres, + host: attrs["host"], + port: attrs["port"], + database: attrs["database"], + table_name: attrs["table_name"], + username: attrs["username"], + password: attrs["password"], + ssl: attrs["ssl"] || false, + routing_mode: attrs["routing_mode"] || "static" + }} + end + # Backwards compatibility for Redis sink -> Redis Stream sink defp parse_sink(%{"type" => "redis"} = attrs, resources) do parse_sink(%{attrs | "type" => "redis_stream"}, resources) diff --git a/lib/sequin_web/live/components/consumer_form.ex b/lib/sequin_web/live/components/consumer_form.ex index 3c7752fcd..e5012c94b 100644 --- a/lib/sequin_web/live/components/consumer_form.ex +++ b/lib/sequin_web/live/components/consumer_form.ex @@ -17,6 +17,7 @@ defmodule SequinWeb.Components.ConsumerForm do alias Sequin.Consumers.KinesisSink alias Sequin.Consumers.MeilisearchSink alias Sequin.Consumers.NatsSink + alias Sequin.Consumers.PostgresSink alias Sequin.Consumers.RabbitMqSink alias Sequin.Consumers.RedisStreamSink alias Sequin.Consumers.RedisStringSink @@ -44,6 +45,7 @@ defmodule SequinWeb.Components.ConsumerForm do alias Sequin.Sinks.Kafka alias Sequin.Sinks.Meilisearch.Client, as: MeilisearchClient alias Sequin.Sinks.Nats + alias Sequin.Sinks.Postgres, as: PostgresSinkClient alias Sequin.Sinks.RabbitMq alias Sequin.Sinks.Redis alias Sequin.Sinks.Typesense.Client, as: TypesenseClient @@ -314,6 +316,12 @@ defmodule SequinWeb.Components.ConsumerForm do :ok -> {:reply, %{ok: true}, socket} {:error, error} -> {:reply, %{ok: false, error: error}, socket} end + + :postgres -> + case test_postgres_connection(socket) do + :ok -> {:reply, %{ok: true}, socket} + {:error, error} -> {:reply, %{ok: false, error: error}, socket} + end end end @@ -693,6 +701,27 @@ defmodule SequinWeb.Components.ConsumerForm do end end + defp test_postgres_connection(socket) do + sink_changeset = + socket.assigns.changeset + |> Ecto.Changeset.get_field(:sink) + |> case do + %Ecto.Changeset{} = changeset -> changeset + %PostgresSink{} = sink -> PostgresSink.changeset(sink, %{}) + end + + if sink_changeset.valid? do + sink = Ecto.Changeset.apply_changes(sink_changeset) + + case PostgresSinkClient.test_connection(sink) do + :ok -> :ok + {:error, error} -> {:error, Exception.message(error)} + end + else + {:error, encode_errors(sink_changeset)} + end + end + defp decode_params(form, socket) do sink = decode_sink(socket.assigns.consumer.type, form["sink"]) @@ -945,6 +974,19 @@ defmodule SequinWeb.Components.ConsumerForm do } end + defp decode_sink(:postgres, sink) do + %{ + "type" => "postgres", + "host" => sink["host"], + "port" => sink["port"], + "database" => sink["database"], + "table_name" => sink["table_name"], + "username" => sink["username"], + "password" => sink["password"], + "ssl" => sink["ssl"] + } + end + defp aws_region_from_queue_url(nil), do: nil defp aws_region_from_queue_url(queue_url) do @@ -1224,6 +1266,19 @@ defmodule SequinWeb.Components.ConsumerForm do } end + defp encode_sink(%PostgresSink{} = sink) do + %{ + "type" => "postgres", + "host" => sink.host, + "port" => sink.port, + "database" => sink.database, + "table_name" => sink.table_name, + "username" => sink.username, + "password" => sink.password, + "ssl" => sink.ssl + } + end + defp encode_errors(nil), do: %{} defp encode_errors(%Ecto.Changeset{} = changeset) do @@ -1457,6 +1512,7 @@ defmodule SequinWeb.Components.ConsumerForm do :typesense -> "Typesense Sink" :meilisearch -> "Meilisearch Sink" :elasticsearch -> "Elasticsearch Sink" + :postgres -> "PostgreSQL Sink" end end @@ -1490,6 +1546,7 @@ defmodule SequinWeb.Components.ConsumerForm do :meilisearch -> {%MeilisearchSink{}, %{}} :elasticsearch -> {%ElasticsearchSink{}, %{}} :redis_string -> {%RedisStringSink{}, %{batch_size: 10}} + :postgres -> {%PostgresSink{port: 5432, ssl: false}, %{batch_size: 10}} end sink_consumer diff --git a/lib/sequin_web/live/sink_consumers/show.ex b/lib/sequin_web/live/sink_consumers/show.ex index 0e385a9c0..446276f23 100644 --- a/lib/sequin_web/live/sink_consumers/show.ex +++ b/lib/sequin_web/live/sink_consumers/show.ex @@ -23,6 +23,7 @@ defmodule SequinWeb.SinkConsumersLive.Show do alias Sequin.Consumers.MeilisearchSink alias Sequin.Consumers.NatsSink alias Sequin.Consumers.PathFunction + alias Sequin.Consumers.PostgresSink alias Sequin.Consumers.RabbitMqSink alias Sequin.Consumers.RedisStreamSink alias Sequin.Consumers.RedisStringSink @@ -1021,6 +1022,20 @@ defmodule SequinWeb.SinkConsumersLive.Show do %{type: :sequin_stream} end + defp encode_sink(%SinkConsumer{sink: %PostgresSink{} = sink}) do + %{ + type: :postgres, + host: sink.host, + port: sink.port, + database: sink.database, + table_name: sink.table_name, + username: sink.username, + password: Sequin.String.obfuscate(sink.password), + ssl: sink.ssl, + routing_mode: sink.routing_mode + } + end + defp encode_database(%PostgresDatabase{} = database, %PostgresReplicationSlot{} = slot) do %{ id: database.id, @@ -1384,17 +1399,18 @@ defmodule SequinWeb.SinkConsumersLive.Show do defp consumer_title(%{sink: %{type: :gcp_pubsub}}), do: "GCP Pub/Sub Sink" defp consumer_title(%{sink: %{type: :http_push}}), do: "Webhook Sink" defp consumer_title(%{sink: %{type: :kafka}}), do: "Kafka Sink" + defp consumer_title(%{sink: %{type: :kinesis}}), do: "Kinesis Sink" + defp consumer_title(%{sink: %{type: :meilisearch}}), do: "Meilisearch Sink" defp consumer_title(%{sink: %{type: :nats}}), do: "NATS Sink" + defp consumer_title(%{sink: %{type: :postgres}}), do: "Postgres Sink" defp consumer_title(%{sink: %{type: :rabbitmq}}), do: "RabbitMQ Sink" defp consumer_title(%{sink: %{type: :redis_stream}}), do: "Redis Stream Sink" defp consumer_title(%{sink: %{type: :redis_string}}), do: "Redis String Sink" + defp consumer_title(%{sink: %{type: :s2}}), do: "S2 Sink" defp consumer_title(%{sink: %{type: :sequin_stream}}), do: "Sequin Stream Sink" defp consumer_title(%{sink: %{type: :sns}}), do: "SNS Sink" - defp consumer_title(%{sink: %{type: :kinesis}}), do: "Kinesis Sink" - defp consumer_title(%{sink: %{type: :s2}}), do: "S2 Sink" defp consumer_title(%{sink: %{type: :sqs}}), do: "SQS Sink" defp consumer_title(%{sink: %{type: :typesense}}), do: "Typesense Sink" - defp consumer_title(%{sink: %{type: :meilisearch}}), do: "Meilisearch Sink" defp put_health(%SinkConsumer{} = consumer) do with {:ok, health} <- Health.health(consumer), diff --git a/test/support/factory/consumers_factory.ex b/test/support/factory/consumers_factory.ex index 57027383f..3a0b18a63 100644 --- a/test/support/factory/consumers_factory.ex +++ b/test/support/factory/consumers_factory.ex @@ -18,6 +18,7 @@ defmodule Sequin.Factory.ConsumersFactory do alias Sequin.Consumers.KinesisSink alias Sequin.Consumers.MeilisearchSink alias Sequin.Consumers.NatsSink + alias Sequin.Consumers.PostgresSink alias Sequin.Consumers.RabbitMqSink alias Sequin.Consumers.RedisStreamSink alias Sequin.Consumers.RedisStringSink @@ -366,6 +367,22 @@ defmodule Sequin.Factory.ConsumersFactory do ) end + defp sink(:postgres, _account_id, attrs) do + merge_attributes( + %PostgresSink{ + type: :postgres, + host: "localhost", + port: 5432, + database: "test-db", + username: Factory.word(), + password: Factory.word(), + table_name: Factory.word(), + routing_mode: "static" + }, + attrs + ) + end + def gcp_credential(attrs \\ []) do merge_attributes( %Gcp.Credentials{ diff --git a/test/support/factory/functions_factory.ex b/test/support/factory/functions_factory.ex index d3bee5f48..c41d4ef1a 100644 --- a/test/support/factory/functions_factory.ex +++ b/test/support/factory/functions_factory.ex @@ -165,7 +165,8 @@ defmodule Sequin.Factory.FunctionsFactory do :sns, :s2, :rabbitmq, - :kinesis + :kinesis, + :postgres ]) end) @@ -272,6 +273,13 @@ defmodule Sequin.Factory.FunctionsFactory do exchange_name: metadata.table_name } """ + + :postgres -> + """ + %{ + table_name: metadata.table_name + } + """ end end) diff --git a/test/support/mocks.ex b/test/support/mocks.ex index 95034e64c..47ea9abf6 100644 --- a/test/support/mocks.ex +++ b/test/support/mocks.ex @@ -63,3 +63,7 @@ Mox.defmock(Sequin.Runtime.SinkPipelineMock, Mox.defmock(Sequin.PrometheusMock, for: Sequin.Prometheus ) + +Mox.defmock(Sequin.Sinks.PostgresMock, + for: Sequin.Sinks.Postgres +)