Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions lib/sequin/runtime/message_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,13 @@ defmodule Sequin.Runtime.MessageHandler do
# We group_by consumer_id throughput because consumer as a key is slow!
# So we need to do fast lookups by consumer_id
consumers_by_id = Map.new(ctx.consumers, fn consumer -> {consumer.id, consumer} end)
{consumers_by_table_oid, wildcard_consumer_ids} = build_consumer_lookups(ctx.consumers)

messages
# First we get a list of consumer_ids that match the SlotProcessor.Message
|> Stream.map(fn %SlotProcessor.Message{} = message ->
matching_consumer_ids =
ctx.consumers
|> Stream.filter(&Consumers.matches_message?(&1, message))
# Then we map to a list of consumer_ids
|> Enum.map(& &1.id)
matching_consumer_ids(message, consumers_by_id, consumers_by_table_oid, wildcard_consumer_ids)

{message, matching_consumer_ids}
end)
Expand Down Expand Up @@ -258,6 +256,55 @@ defmodule Sequin.Runtime.MessageHandler do
end)
end

# Uses memoized lookups to find matching consumer IDs for a message.
# For consumers with simple include_table_oids config, we do O(1) lookup by table_oid.
# For consumers with complex filters (wildcard group), we still need to check matches_message?.
defp matching_consumer_ids(
%SlotProcessor.Message{} = message,
consumers_by_id,
consumers_by_table_oid,
wildcard_consumer_ids
) do
# Get consumers from the memoized table_oid lookup
simple_consumer_ids = Map.get(consumers_by_table_oid, message.table_oid, [])

Enum.filter(simple_consumer_ids ++ wildcard_consumer_ids, fn consumer_id ->
consumer = Map.fetch!(consumers_by_id, consumer_id)
Consumers.matches_message?(consumer, message)
end)
end

# Builds memoized consumer lookup structures for performance optimization.
# Consumers with only include_table_oids are grouped by table_oid for O(1) lookup.
# All other consumers (with schema filters or exclude rules) go into a wildcard list.
defp build_consumer_lookups(consumers) do
{simple_consumers, wildcard_consumers} = Enum.split_with(consumers, &simple_table_oid_consumer?/1)

consumers_by_table_oid =
Enum.reduce(simple_consumers, %{}, fn consumer, acc ->
table_oids = consumer.source.include_table_oids

Enum.reduce(table_oids, acc, fn table_oid, inner_acc ->
Map.update(inner_acc, table_oid, [consumer.id], fn ids -> [consumer.id | ids] end)
end)
end)

wildcard_consumer_ids = Enum.map(wildcard_consumers, & &1.id)

{consumers_by_table_oid, wildcard_consumer_ids}
end

# Returns true if the consumer has a simple source configuration with only include_table_oids
# and no other filtering rules (no schema filters, no exclude rules).
defp simple_table_oid_consumer?(%SinkConsumer{source: nil}), do: false

defp simple_table_oid_consumer?(%SinkConsumer{source: %Consumers.Source{} = source}) do
is_list(source.include_table_oids) and
is_nil(source.exclude_table_oids) and
is_nil(source.include_schemas) and
is_nil(source.exclude_schemas)
end

@decorate track_metrics("map_to_consumer_message")
defp consumer_message(%SinkConsumer{} = consumer, %PostgresDatabase{} = database, %SlotProcessor.Message{} = message) do
Consumers.consumer_message(consumer, database, message)
Expand Down
Loading