Skip to content

Commit 617e24f

Browse files
committed
⚡️ SMS.put_messages validation and early GenServer.reply to SP
1 parent 508a0d0 commit 617e24f

5 files changed

+98
-50
lines changed

lib/sequin/databases_runtime/message_handler.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ defmodule Sequin.DatabasesRuntime.SlotProcessor.MessageHandler do
426426
put_messages(consumer_id, messages_to_ingest, attempt + 1)
427427

428428
{:error, error} ->
429-
Health.put_event(:sink_consumer, consumer_id, %Event{slug: :messages_ingested, status: :failed, error: error})
429+
Health.put_event(:sink_consumer, consumer_id, %Event{slug: :messages_ingested, status: :fail, error: error})
430430
{:error, error}
431431
end
432432
end

lib/sequin/databases_runtime/slot_message_store.ex

+34-19
Original file line numberDiff line numberDiff line change
@@ -328,33 +328,48 @@ defmodule Sequin.DatabasesRuntime.SlotMessageStore do
328328
end
329329

330330
@impl GenServer
331-
def handle_call({:put_messages, messages}, _from, %State{} = state) do
331+
def handle_call({:put_messages, messages}, from, %State{} = state) do
332332
execute_timed(:put_messages, fn ->
333333
now = Sequin.utc_now()
334334

335335
messages =
336336
messages
337+
|> Stream.reject(&State.message_exists?(state, &1))
337338
|> Stream.map(&%{&1 | ack_id: Sequin.uuid4(), ingested_at: now})
338-
# We may be receiving messages that we've already ingested and persisted, filter out
339-
# Do we receive messages that we have ingested but not persisted?
340-
|> Enum.filter(&(not State.is_message_persisted?(state, &1)))
341-
342-
{to_persist, to_put} =
343-
if state.consumer.status == :disabled do
344-
{Enum.to_list(messages), []}
345-
else
346-
Enum.split_with(messages, &State.is_message_group_persisted?(state, &1.group_id))
347-
end
348-
349-
with {:ok, state} <- State.put_messages(state, to_put),
350-
:ok <- upsert_messages(state, to_persist),
351-
{:ok, state} <- State.put_persisted_messages(state, to_persist) do
352-
Health.put_event(state.consumer, %Event{slug: :messages_ingested, status: :success})
353-
:syn.publish(:consumers, {:messages_ingested, state.consumer.id}, :messages_ingested)
339+
|> Enum.to_list()
340+
341+
# Validate first
342+
case State.validate_put_messages(state, messages) do
343+
{:ok, _incoming_payload_size_bytes} ->
344+
# Reply early since validation passed. This frees up the SlotProcessor to continue
345+
# calling other SMSs, accumulating messages, etc.
346+
GenServer.reply(from, :ok)
347+
348+
execute_timed(:put_messages_after_reply, fn ->
349+
{to_persist, to_put} =
350+
if state.consumer.status == :disabled do
351+
{messages, []}
352+
else
353+
Enum.split_with(messages, &State.is_message_group_persisted?(state, &1.group_id))
354+
end
355+
356+
{:ok, state} = State.put_messages(state, to_put)
357+
358+
:ok = upsert_messages(state, to_persist)
359+
{:ok, state} = State.put_persisted_messages(state, to_persist)
360+
361+
Health.put_event(state.consumer, %Event{slug: :messages_ingested, status: :success})
362+
:syn.publish(:consumers, {:messages_ingested, state.consumer.id}, :messages_ingested)
363+
364+
if state.test_pid do
365+
send(state.test_pid, {:put_messages_done, state.consumer.id})
366+
end
367+
368+
{:noreply, state}
369+
end)
354370

355-
{:reply, :ok, state}
356-
else
357371
{:error, error} ->
372+
# Reply with error if validation fails
358373
{:reply, {:error, error}, state}
359374
end
360375
end)

lib/sequin/databases_runtime/slot_message_store_state.ex

+36-30
Original file line numberDiff line numberDiff line change
@@ -49,24 +49,17 @@ defmodule Sequin.DatabasesRuntime.SlotMessageStore.State do
4949
:"slot_message_store_state_ordered_cursors_consumer_#{consumer.seq}"
5050
end
5151

52-
@spec put_messages(State.t(), list(message()) | Enumerable.t()) ::
53-
{:ok, State.t()} | {:error, Error.t()}
54-
def put_messages(%State{} = state, messages, opts \\ []) do
52+
@spec validate_put_messages(State.t(), list(message()) | Enumerable.t(), keyword()) ::
53+
{:ok, non_neg_integer()} | {:error, Error.t()}
54+
def validate_put_messages(%State{} = state, messages, opts \\ []) do
5555
skip_limit_check? = Keyword.get(opts, :skip_limit_check?, false)
5656

57-
# Create new messages map while filtering out existing messages
58-
new_messages =
59-
messages
60-
|> Stream.reject(&Map.has_key?(state.messages, {&1.commit_lsn, &1.commit_idx}))
61-
|> Map.new(&{{&1.commit_lsn, &1.commit_idx}, &1})
62-
63-
# Only count payload sizes for new messages
64-
incoming_payload_size_bytes = Enum.sum_by(Map.values(new_messages), & &1.payload_size_bytes)
57+
incoming_payload_size_bytes = Enum.sum_by(messages, & &1.payload_size_bytes)
6558

6659
bytes_exceeded? =
6760
state.payload_size_bytes + incoming_payload_size_bytes > state.max_memory_bytes
6861

69-
messages_exceeded? = map_size(new_messages) + map_size(state.messages) > state.setting_max_messages
62+
messages_exceeded? = length(messages) + map_size(state.messages) > state.setting_max_messages
7063

7164
cond do
7265
not skip_limit_check? and bytes_exceeded? ->
@@ -76,28 +69,41 @@ defmodule Sequin.DatabasesRuntime.SlotMessageStore.State do
7669
{:error, Error.invariant(message: "Message count limit exceeded", code: :payload_size_limit_exceeded)}
7770

7871
true ->
79-
# Insert into ETS
80-
ets_keys = Enum.map(new_messages, fn {commit_tuple, _msg} -> {commit_tuple} end)
81-
82-
state.consumer
83-
|> ordered_cursors_table()
84-
|> :ets.insert(ets_keys)
85-
86-
ack_ids_to_cursor_tuples =
87-
Map.new(new_messages, fn {_commit_tuple, msg} -> {msg.ack_id, {msg.commit_lsn, msg.commit_idx}} end)
88-
89-
state =
90-
%{
91-
state
92-
| messages: Map.merge(state.messages, new_messages),
93-
ack_ids_to_cursor_tuples: Map.merge(state.ack_ids_to_cursor_tuples, ack_ids_to_cursor_tuples),
94-
payload_size_bytes: state.payload_size_bytes + incoming_payload_size_bytes
95-
}
72+
{:ok, incoming_payload_size_bytes}
73+
end
74+
end
9675

97-
{:ok, state}
76+
@spec put_messages(State.t(), list(message()) | Enumerable.t(), keyword()) ::
77+
{:ok, State.t()} | {:error, Error.t()}
78+
def put_messages(%State{} = state, messages, opts \\ []) do
79+
messages = Enum.reject(messages, &message_exists?(state, &1))
80+
81+
with {:ok, incoming_payload_size_bytes} <- validate_put_messages(state, messages, opts) do
82+
# Insert into ETS
83+
ets_keys = Enum.map(messages, fn msg -> {{msg.commit_lsn, msg.commit_idx}} end)
84+
85+
state.consumer
86+
|> ordered_cursors_table()
87+
|> :ets.insert(ets_keys)
88+
89+
cursor_tuples_to_messages = Map.new(messages, fn msg -> {{msg.commit_lsn, msg.commit_idx}, msg} end)
90+
ack_ids_to_cursor_tuples = Map.new(messages, fn msg -> {msg.ack_id, {msg.commit_lsn, msg.commit_idx}} end)
91+
92+
{:ok,
93+
%{
94+
state
95+
| messages: Map.merge(state.messages, cursor_tuples_to_messages),
96+
ack_ids_to_cursor_tuples: Map.merge(state.ack_ids_to_cursor_tuples, ack_ids_to_cursor_tuples),
97+
payload_size_bytes: state.payload_size_bytes + incoming_payload_size_bytes
98+
}}
9899
end
99100
end
100101

102+
@spec message_exists?(State.t(), message()) :: boolean()
103+
def message_exists?(%State{} = state, message) do
104+
Map.has_key?(state.messages, {message.commit_lsn, message.commit_idx})
105+
end
106+
101107
@spec put_persisted_messages(State.t(), list(message()) | Enumerable.t()) :: {:ok, State.t()} | {:error, Error.t()}
102108
def put_persisted_messages(%State{} = state, messages) do
103109
persisted_message_groups =

test/sequin/slot_message_store_state_test.exs

+20
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,26 @@ defmodule Sequin.DatabasesRuntime.SlotMessageStoreStateTest do
6767
end
6868
end
6969

70+
describe "validate_put_messages/2" do
71+
test "returns payload size if under memory limit", %{state: state} do
72+
msg1 = ConsumersFactory.consumer_message(payload_size_bytes: 100)
73+
msg2 = ConsumersFactory.consumer_message(payload_size_bytes: 200)
74+
75+
{:ok, payload_size} = State.validate_put_messages(state, [msg1, msg2])
76+
77+
assert payload_size == 300
78+
end
79+
80+
test "returns error when exceeding setting_max_accumulated_payload_bytes", %{state: state} do
81+
state = %{state | max_memory_bytes: Size.mb(100)}
82+
83+
msg1 = ConsumersFactory.consumer_message(payload_size_bytes: Size.mb(100))
84+
msg2 = ConsumersFactory.consumer_message(payload_size_bytes: Size.mb(100))
85+
86+
assert {:error, %Error.InvariantError{}} = State.validate_put_messages(state, [msg1, msg2])
87+
end
88+
end
89+
7090
describe "put_persisted_messages/2" do
7191
test "updates persisted_message_groups", %{state: state} do
7292
msg1 = ConsumersFactory.consumer_message(group_id: "group1")

test/sequin/slot_message_store_test.exs

+7
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ defmodule Sequin.SlotMessageStoreTest do
5757

5858
:ok = SlotMessageStore.put_messages(consumer.id, [new_message])
5959

60+
consumer_id = consumer.id
61+
assert_receive {:put_messages_done, ^consumer_id}, 1000
62+
6063
persisted_messages = Consumers.list_consumer_messages_for_consumer(consumer)
6164
assert length(persisted_messages) == 3
6265

@@ -380,6 +383,8 @@ defmodule Sequin.SlotMessageStoreTest do
380383
end
381384

382385
test "persists all messages when consumer is disabled", %{consumer: consumer} do
386+
consumer_id = consumer.id
387+
383388
# Put initial message in store while active
384389
initial_message =
385390
ConsumersFactory.consumer_message(
@@ -388,6 +393,7 @@ defmodule Sequin.SlotMessageStoreTest do
388393
)
389394

390395
:ok = SlotMessageStore.put_messages(consumer.id, [initial_message])
396+
assert_receive {:put_messages_done, ^consumer_id}, 1000
391397

392398
# Verify message is not persisted yet
393399
assert [] == Consumers.list_consumer_messages_for_consumer(consumer)
@@ -410,6 +416,7 @@ defmodule Sequin.SlotMessageStoreTest do
410416
]
411417

412418
:ok = SlotMessageStore.put_messages(consumer.id, new_messages)
419+
assert_receive {:put_messages_done, ^consumer_id}, 1000
413420

414421
# Verify all messages are persisted
415422
persisted_messages = Consumers.list_consumer_messages_for_consumer(disabled_consumer)

0 commit comments

Comments
 (0)