diff --git a/lib/sequin/runtime/message_handler.ex b/lib/sequin/runtime/message_handler.ex index c5ce26767..71bd9a797 100644 --- a/lib/sequin/runtime/message_handler.ex +++ b/lib/sequin/runtime/message_handler.ex @@ -222,15 +222,13 @@ defmodule Sequin.Runtime.MessageHandler do # We group_by consumer_id throughput because consumer as a key is slow! # So we need to do fast lookups by consumer_id consumers_by_id = Map.new(ctx.consumers, fn consumer -> {consumer.id, consumer} end) + {consumers_by_table_oid, wildcard_consumer_ids} = build_consumer_lookups(ctx.consumers) messages # First we get a list of consumer_ids that match the SlotProcessor.Message |> Stream.map(fn %SlotProcessor.Message{} = message -> matching_consumer_ids = - ctx.consumers - |> Stream.filter(&Consumers.matches_message?(&1, message)) - # Then we map to a list of consumer_ids - |> Enum.map(& &1.id) + matching_consumer_ids(message, consumers_by_id, consumers_by_table_oid, wildcard_consumer_ids) {message, matching_consumer_ids} end) @@ -258,6 +256,55 @@ defmodule Sequin.Runtime.MessageHandler do end) end + # Uses memoized lookups to find matching consumer IDs for a message. + # For consumers with simple include_table_oids config, we do O(1) lookup by table_oid. + # For consumers with complex filters (wildcard group), we still need to check matches_message?. + defp matching_consumer_ids( + %SlotProcessor.Message{} = message, + consumers_by_id, + consumers_by_table_oid, + wildcard_consumer_ids + ) do + # Get consumers from the memoized table_oid lookup + simple_consumer_ids = Map.get(consumers_by_table_oid, message.table_oid, []) + + Enum.filter(simple_consumer_ids ++ wildcard_consumer_ids, fn consumer_id -> + consumer = Map.fetch!(consumers_by_id, consumer_id) + Consumers.matches_message?(consumer, message) + end) + end + + # Builds memoized consumer lookup structures for performance optimization. + # Consumers with only include_table_oids are grouped by table_oid for O(1) lookup. + # All other consumers (with schema filters or exclude rules) go into a wildcard list. + defp build_consumer_lookups(consumers) do + {simple_consumers, wildcard_consumers} = Enum.split_with(consumers, &simple_table_oid_consumer?/1) + + consumers_by_table_oid = + Enum.reduce(simple_consumers, %{}, fn consumer, acc -> + table_oids = consumer.source.include_table_oids + + Enum.reduce(table_oids, acc, fn table_oid, inner_acc -> + Map.update(inner_acc, table_oid, [consumer.id], fn ids -> [consumer.id | ids] end) + end) + end) + + wildcard_consumer_ids = Enum.map(wildcard_consumers, & &1.id) + + {consumers_by_table_oid, wildcard_consumer_ids} + end + + # Returns true if the consumer has a simple source configuration with only include_table_oids + # and no other filtering rules (no schema filters, no exclude rules). + defp simple_table_oid_consumer?(%SinkConsumer{source: nil}), do: false + + defp simple_table_oid_consumer?(%SinkConsumer{source: %Consumers.Source{} = source}) do + is_list(source.include_table_oids) and + is_nil(source.exclude_table_oids) and + is_nil(source.include_schemas) and + is_nil(source.exclude_schemas) + end + @decorate track_metrics("map_to_consumer_message") defp consumer_message(%SinkConsumer{} = consumer, %PostgresDatabase{} = database, %SlotProcessor.Message{} = message) do Consumers.consumer_message(consumer, database, message)