Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions ADD-SINK.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Adding a New Sink to Sequin

This document outlines the steps required to add a new sink to Sequin. A sink is a destination where Sequin can send data changes.

## Steps:

### [Backend] Add the sink schema

Create a new sink schema in `lib/sequin/consumers/` (e.g., `my_sink.ex`). The schema should:
- Use `Ecto.Schema` and `TypedEctoSchema`
- Define required fields and their types
- Implement validation in a changeset function

Example: `lib/sequin/consumers/kafka_sink.ex`

### [Backend] Add the sink type to SinkConsumer

Update `lib/sequin/consumers/sink_consumer.ex`:
- Add the new sink type to the `@sink_types` list
- Add the new module to the `sink_module/1` mapping

### [Backend] Create the sink client

Create a new client in `lib/sequin/sinks/` (e.g., `my_sink/client.ex`). The client should:
- Handle API communication with the sink service. Use Req if the sink uses HTTP. Otherwise we may need to bring in a library like AWS or :brod.
- Implement `test_connection/1` for connection validation
- Implement methods for sending data (e.g., `append_records/2`)
- Handle error cases and logging appropriately
- If using Req, we can support testing with req_opts. If not, we need a client behavior and for the client to implement that behavior.

For HTTP see: lib/sequin/sinks/typesense/client.ex
For non-http see: lib/sequin/sinks/kafka/kafka.ex and lib/sequin/sinks/kafka/client.ex

### [Backend] Add the sink pipeline

Create a new pipeline in `lib/sequin/runtime/` (e.g., `my_sink_pipeline.ex`). The pipeline should:
- Implement the `Sequin.Runtime.SinkPipeline` behaviour
- Define batching configuration
- Handle message transformation and delivery
- Implement error handling and retries

Example: `lib/sequin/runtime/kafka_pipeline.ex`


### [Backend] Update the pipeline registry

Add the new sink type to `lib/sequin/runtime/sink_pipeline.ex` in the `pipeline_module/1` function.

### [Backend] Add transforms support

Update `lib/sequin/transforms/transforms.ex`:
- Add `to_external/2` function for the new sink type
- Add parsing support in `parse_sink/2`

### [Backend] Update configuration

Update relevant config files (e.g., `config/test.exs`) to add any necessary configuration for the new sink.

### [Frontend] Add sink type to TypeScript types

Update `assets/svelte/consumers/types.ts`:
- Add new sink type interface
- Update the Consumer union type

### [Frontend] Create sink components

Create new components in `assets/svelte/sinks/my_sink/`:
- `MySinkIcon.svelte` - Sink icon component
- `MySinkSinkCard.svelte` - Display component for sink details
- `MySinkSinkForm.svelte` - Form component for sink configuration

### [Frontend] Update consumer components

Update the following components to include the new sink:
- `assets/svelte/consumers/ShowSink.svelte`
- `assets/svelte/consumers/ShowSinkHeader.svelte`
- `assets/svelte/consumers/SinkConsumerForm.svelte`
- `assets/svelte/consumers/SinkIndex.svelte`

### [Frontend] Update consumer form handler

Update `lib/sequin_web/live/components/consumer_form.ex`:
- Add sink-specific connection testing
- Add encoding/decoding functions
- Update the sink title helper

### [Frontend] Update live view handlers

Update relevant live view handlers in `lib/sequin_web/live/`:
- Add sink-specific handling in show.ex
- Update any relevant forms or displays


### [Tests] Update existing tests

Update:
- Factory modules in `test/support/factory/`
- YAML loader tests
- Consumer form tests

### [Tests] Add test coverage

Create a minimal pipeline test with a mock client or req adapter. See:
- test/sequin/kafka_pipeline_test.exs OR
- test/sequin/typesense_pipeline_test.exs

Also create tests for:
- Sink schema and changeset validation
- Client functionality and error handling

### [DOCS] Add reference, how-to, and quickstart docs

See:
- docs/reference/sinks/kafka.mdx
- docs/how-to/stream-postgres-to-kafka.mdx
- docs/quickstart/kafka.mdx
20 changes: 20 additions & 0 deletions assets/svelte/consumers/ShowSink.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ElasticsearchConsumer,
RedisStringConsumer,
AzureEventHubConsumer,
PostgresSinkConsumer,
} from "./types";
import AzureEventHubSinkCard from "../sinks/azure_event_hub/AzureEventHubSinkCard.svelte";
import ElasticsearchSinkCard from "../sinks/elasticsearch/ElasticsearchSinkCard.svelte";
Expand All @@ -48,6 +49,8 @@
import HealthAlerts from "$lib/health/HealthAlerts.svelte";
import { Button } from "$lib/components/ui/button";
import CollapsibleCode from "../components/CollapsibleCode.svelte";
import PostgresSinkCard from "../sinks/postgres/PostgresSinkCard.svelte";
import type { Table } from "../databases/types";
export let live;
export let parent;
Expand Down Expand Up @@ -161,6 +164,12 @@
return consumer.sink.type === "rabbitmq";
}
function isPostgresSinkConsumer(
consumer: Consumer,
): consumer is PostgresSinkConsumer {
return consumer.sink.type === "postgres";
}
let chartElement;
let updateChart;
let resizeObserver;
Expand Down Expand Up @@ -761,6 +770,15 @@
unit: units[unitIndex],
};
}
function getRoutingCode(consumer: Consumer): string | null {
if (!consumer.routing || !consumer.routing.function) return null;
const func = consumer.routing.function;
if (func.type === "routing" && "code" in func) {
return func.code;
}
return null;
}
</script>

<div class="flex flex-col flex-1">
Expand Down Expand Up @@ -1254,6 +1272,8 @@
<ElasticsearchSinkCard {consumer} />
{:else if isRedisStringConsumer(consumer)}
<RedisStringSinkCard {consumer} />
{:else if isPostgresSinkConsumer(consumer)}
<PostgresSinkCard {consumer} />
{/if}

<ShowSource {consumer} {tables} />
Expand Down
3 changes: 3 additions & 0 deletions assets/svelte/consumers/ShowSinkHeader.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import AzureEventHubIcon from "../sinks/azure_event_hub/AzureEventHubIcon.svelte";
import TypesenseIcon from "../sinks/typesense/TypesenseIcon.svelte";
import ElasticsearchIcon from "../sinks/elasticsearch/ElasticsearchIcon.svelte";
import PostgresIcon from "../sinks/postgres/PostgresIcon.svelte";
import StopSinkModal from "./StopSinkModal.svelte";
import { Badge } from "$lib/components/ui/badge";

Expand Down Expand Up @@ -162,6 +163,8 @@
<KinesisIcon class="h-6 w-6 mr-2" />
{:else if consumer.sink.type === "s2"}
<S2Icon class="h-6 w-6 mr-2" />
{:else if consumer.sink.type === "postgres"}
<PostgresIcon class="h-6 w-6 mr-2" />
{/if}
<h1 class="text-xl font-semibold">
{consumer.name}
Expand Down
9 changes: 9 additions & 0 deletions assets/svelte/consumers/SinkConsumerForm.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import TypesenseSinkForm from "$lib/sinks/typesense/TypesenseSinkForm.svelte";
import MeilisearchSinkForm from "$lib/sinks/meilisearch/MeilisearchSinkForm.svelte";
import ElasticsearchSinkForm from "$lib/sinks/elasticsearch/ElasticsearchSinkForm.svelte";
import PostgresSinkForm from "$lib/sinks/postgres/PostgresSinkForm.svelte";
import * as Alert from "$lib/components/ui/alert/index.js";
import SchemaTableSelector from "../components/SchemaTableSelector.svelte";
import * as Tooltip from "$lib/components/ui/tooltip";
Expand Down Expand Up @@ -731,6 +732,14 @@
{functions}
{functionRefreshState}
/>
{:else if consumer.type === "postgres"}
<PostgresSinkForm
errors={errors.consumer}
bind:form
{functions}
{refreshFunctions}
bind:functionRefreshState
/>
{:else if consumer.type === "kafka"}
<KafkaSinkForm
errors={errors.consumer}
Expand Down
10 changes: 8 additions & 2 deletions assets/svelte/consumers/SinkIndex.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import SequinStreamIcon from "../sinks/sequin_stream/SequinStreamIcon.svelte";
import NatsIcon from "../sinks/nats/NatsIcon.svelte";
import RabbitMqIcon from "../sinks/rabbitmq/RabbitMqIcon.svelte";
import TypesenseIcon from "../sinks/typesense/TypesenseIcon.svelte";
import MeilisearchIcon from "../sinks/meilisearch/MeilisearchIcon.svelte";
import ElasticsearchIcon from "../sinks/elasticsearch/ElasticsearchIcon.svelte";
import PostgresIcon from "../sinks/postgres/PostgresIcon.svelte";
import { Badge } from "$lib/components/ui/badge";
import * as d3 from "d3";
Expand All @@ -56,7 +56,8 @@
| "nats"
| "rabbitmq"
| "typesense"
| "elasticsearch";
| "elasticsearch"
| "postgres";
status: "active" | "disabled" | "paused";
database_name: string;
Expand Down Expand Up @@ -164,6 +165,11 @@
name: "Elasticsearch",
icon: ElasticsearchIcon,
},
{
id: "postgres",
name: "Postgres",
icon: PostgresIcon,
},
];
function handleConsumerClick(id: string, type: string) {
Expand Down
10 changes: 10 additions & 0 deletions assets/svelte/consumers/dynamicRoutingDocs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,4 +250,14 @@ export const routedSinkDocs: Record<RoutedSinkType, RoutedSinkDocs> = {
},
},
},
postgres: {
fields: {
table_name: {
description: "Postgres table name",
staticValue: "<empty>",
staticFormField: "table_name",
dynamicDefault: "<empty>",
},
},
},
};
20 changes: 19 additions & 1 deletion assets/svelte/consumers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export type BaseConsumer = {
message_grouping: boolean;
ack_wait_ms: number;
max_ack_pending: number;
max_retry_count: number;
max_deliver: number;
max_waiting: number;
inserted_at: string;
Expand Down Expand Up @@ -247,6 +248,20 @@ export type ElasticsearchConsumer = BaseConsumer & {
};
};

// PostgreSQL specific sink
export type PostgresSinkConsumer = BaseConsumer & {
sink: {
type: "postgres";
host: string;
port: number;
database: string;
table_name: string;
username: string;
ssl: boolean;
routing_mode: "dynamic" | "static";
};
};

// Union type for all consumer types
export type Consumer =
| HttpPushConsumer
Expand All @@ -263,7 +278,8 @@ export type Consumer =
| TypesenseConsumer
| SnsConsumer
| ElasticsearchConsumer
| RedisStringConsumer;
| RedisStringConsumer
| PostgresSinkConsumer;

export const SinkTypeValues = [
"http_push",
Expand All @@ -281,6 +297,7 @@ export const SinkTypeValues = [
"meilisearch",
"elasticsearch",
"redis_string",
"postgres",
] as const;

export type SinkType = (typeof SinkTypeValues)[number];
Expand All @@ -301,6 +318,7 @@ export const RoutedSinkTypeValues = [
"sqs",
"sns",
"kinesis",
"postgres",
] as const;

export type RoutedSinkType = (typeof RoutedSinkTypeValues)[number];
58 changes: 58 additions & 0 deletions assets/svelte/sinks/postgres/PostgresIcon.svelte
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<svg
width="432.071pt"
height="445.383pt"
viewBox="0 0 432.071 445.383"
xml:space="preserve"
xmlns="http://www.w3.org/2000/svg"
class={$$props.class}
>
<g
id="orginal"
style="fill-rule:nonzero;clip-rule:nonzero;stroke:#000000;stroke-miterlimit:4;"
>
</g>
<g
id="Layer_x0020_3"
style="fill-rule:nonzero;clip-rule:nonzero;fill:none;stroke:#FFFFFF;stroke-width:12.4651;stroke-linecap:round;stroke-linejoin:round;stroke-miterlimit:4;"
>
<path
style="fill:#000000;stroke:#000000;stroke-width:37.3953;stroke-linecap:butt;stroke-linejoin:miter;"
d="M323.205,324.227c2.833-23.601,1.984-27.062,19.563-23.239l4.463,0.392c13.517,0.615,31.199-2.174,41.587-7c22.362-10.376,35.622-27.7,13.572-23.148c-50.297,10.376-53.755-6.655-53.755-6.655c53.111-78.803,75.313-178.836,56.149-203.322 C352.514-5.534,262.036,26.049,260.522,26.869l-0.482,0.089c-9.938-2.062-21.06-3.294-33.554-3.496c-22.761-0.374-40.032,5.967-53.133,15.904c0,0-161.408-66.498-153.899,83.628c1.597,31.936,45.777,241.655,98.47,178.31 c19.259-23.163,37.871-42.748,37.871-42.748c9.242,6.14,20.307,9.272,31.912,8.147l0.897-0.765c-0.281,2.876-0.157,5.689,0.359,9.019c-13.572,15.167-9.584,17.83-36.723,23.416c-27.457,5.659-11.326,15.734-0.797,18.367c12.768,3.193,42.305,7.716,62.268-20.224 l-0.795,3.188c5.325,4.26,4.965,30.619,5.72,49.452c0.756,18.834,2.017,36.409,5.856,46.771c3.839,10.36,8.369,37.05,44.036,29.406c29.809-6.388,52.6-15.582,54.677-101.107"
/>
<path
style="fill:#336791;stroke:none;"
d="M402.395,271.23c-50.302,10.376-53.76-6.655-53.76-6.655c53.111-78.808,75.313-178.843,56.153-203.326c-52.27-66.785-142.752-35.2-144.262-34.38l-0.486,0.087c-9.938-2.063-21.06-3.292-33.56-3.496c-22.761-0.373-40.026,5.967-53.127,15.902 c0,0-161.411-66.495-153.904,83.63c1.597,31.938,45.776,241.657,98.471,178.312c19.26-23.163,37.869-42.748,37.869-42.748c9.243,6.14,20.308,9.272,31.908,8.147l0.901-0.765c-0.28,2.876-0.152,5.689,0.361,9.019c-13.575,15.167-9.586,17.83-36.723,23.416 c-27.459,5.659-11.328,15.734-0.796,18.367c12.768,3.193,42.307,7.716,62.266-20.224l-0.796,3.188c5.319,4.26,9.054,27.711,8.428,48.969c-0.626,21.259-1.044,35.854,3.147,47.254c4.191,11.4,8.368,37.05,44.042,29.406c29.809-6.388,45.256-22.942,47.405-50.555 c1.525-19.631,4.976-16.729,5.194-34.28l2.768-8.309c3.192-26.611,0.507-35.196,18.872-31.203l4.463,0.392c13.517,0.615,31.208-2.174,41.591-7c22.358-10.376,35.618-27.7,13.573-23.148z"
/>
<path
d="M215.866,286.484c-1.385,49.516,0.348,99.377,5.193,111.495c4.848,12.118,15.223,35.688,50.9,28.045c29.806-6.39,40.651-18.756,45.357-46.051c3.466-20.082,10.148-75.854,11.005-87.281"
/>
<path
d="M173.104,38.256c0,0-161.521-66.016-154.012,84.109c1.597,31.938,45.779,241.664,98.473,178.316c19.256-23.166,36.671-41.335,36.671-41.335"
/>
<path
d="M260.349,26.207c-5.591,1.753,89.848-34.889,144.087,34.417c19.159,24.484-3.043,124.519-56.153,203.329"
/>
<path
style="stroke-linejoin:bevel;"
d="M348.282,263.953c0,0,3.461,17.036,53.764,6.653c22.04-4.552,8.776,12.774-13.577,23.155c-18.345,8.514-59.474,10.696-60.146-1.069c-1.729-30.355,21.647-21.133,19.96-28.739c-1.525-6.85-11.979-13.573-18.894-30.338 c-6.037-14.633-82.796-126.849,21.287-110.183c3.813-0.789-27.146-99.002-124.553-100.599c-97.385-1.597-94.19,119.762-94.19,119.762"
/>
<path
d="M188.604,274.334c-13.577,15.166-9.584,17.829-36.723,23.417c-27.459,5.66-11.326,15.733-0.797,18.365c12.768,3.195,42.307,7.718,62.266-20.229c6.078-8.509-0.036-22.086-8.385-25.547c-4.034-1.671-9.428-3.765-16.361,3.994z"
/>
<path
d="M187.715,274.069c-1.368-8.917,2.93-19.528,7.536-31.942c6.922-18.626,22.893-37.255,10.117-96.339c-9.523-44.029-73.396-9.163-73.436-3.193c-0.039,5.968,2.889,30.26-1.067,58.548c-5.162,36.913,23.488,68.132,56.479,64.938"
/>
<path
style="fill:#FFFFFF;stroke-width:4.155;stroke-linecap:butt;stroke-linejoin:miter;"
d="M172.517,141.7c-0.288,2.039,3.733,7.48,8.976,8.207c5.234,0.73,9.714-3.522,9.998-5.559c0.284-2.039-3.732-4.285-8.977-5.015c-5.237-0.731-9.719,0.333-9.996,2.367z"
/>
<path
style="fill:#FFFFFF;stroke-width:2.0775;stroke-linecap:butt;stroke-linejoin:miter;"
d="M331.941,137.543c0.284,2.039-3.732,7.48-8.976,8.207c-5.238,0.73-9.718-3.522-10.005-5.559c-0.277-2.039,3.74-4.285,8.979-5.015c5.239-0.73,9.718,0.333,10.002,2.368z"
/>
<path
d="M350.676,123.432c0.863,15.994-3.445,26.888-3.988,43.914c-0.804,24.748,11.799,53.074-7.191,81.435"
/>
<path style="stroke-width:3;" d="M0,60.232" />
</g>
</svg>
Loading
Loading