Skip to content

Commit 3c4412a

Browse files
committed
🐛 Clean up Metrics for acked messages
* This resolves a bug where the Posthog Telemetry Reporter detaches due to `key :bytes_processed not found` * This also resolves a bug where messages stored in Postgres are counted twice towards message throughput / count
1 parent e47a798 commit 3c4412a

File tree

3 files changed

+11
-29
lines changed

3 files changed

+11
-29
lines changed

lib/sequin/consumers/consumers.ex

+6-28
Original file line numberDiff line numberDiff line change
@@ -639,36 +639,12 @@ defmodule Sequin.Consumers do
639639
:record -> ConsumerRecord
640640
end
641641

642-
{count, msgs} =
642+
{count, _} =
643643
consumer.id
644644
|> msg_module.where_consumer_id()
645645
|> msg_module.where_ack_ids(ack_ids)
646-
|> select([ce], ce)
647646
|> Repo.delete_all()
648647

649-
:telemetry.execute(
650-
[:sequin, :posthog, :event],
651-
%{event: "consumer_ack"},
652-
%{
653-
distinct_id: "00000000-0000-0000-0000-000000000000",
654-
properties: %{
655-
consumer_id: consumer.id,
656-
consumer_name: consumer.name,
657-
message_count: count,
658-
message_kind: consumer.message_kind,
659-
"$groups": %{account: consumer.account_id}
660-
}
661-
}
662-
)
663-
664-
Health.put_event(consumer, %Event{slug: :messages_delivered, status: :success})
665-
Metrics.incr_consumer_messages_processed_count(consumer, count)
666-
Metrics.incr_consumer_messages_processed_throughput(consumer, count)
667-
668-
TracerServer.messages_acked(consumer, ack_ids)
669-
670-
AcknowledgedMessages.store_messages(consumer.id, msgs)
671-
672648
{:ok, count}
673649
end
674650

@@ -804,9 +780,6 @@ defmodule Sequin.Consumers do
804780

805781
AcknowledgedMessages.store_messages(consumer.id, acked_messages)
806782

807-
Metrics.incr_consumer_messages_processed_count(consumer, count)
808-
Metrics.incr_consumer_messages_processed_throughput(consumer, count)
809-
810783
bytes_processed =
811784
Enum.sum_by(
812785
acked_messages,
@@ -815,6 +788,8 @@ defmodule Sequin.Consumers do
815788
end
816789
)
817790

791+
Metrics.incr_consumer_messages_processed_count(consumer, count)
792+
Metrics.incr_consumer_messages_processed_throughput(consumer, count)
818793
Metrics.incr_consumer_messages_processed_bytes(consumer, bytes_processed)
819794

820795
:telemetry.execute(
@@ -833,6 +808,9 @@ defmodule Sequin.Consumers do
833808
}
834809
)
835810

811+
ack_ids = Enum.map(acked_messages, & &1.ack_id)
812+
TracerServer.messages_acked(consumer, ack_ids)
813+
836814
{:ok, count}
837815
end
838816

lib/sequin/tracer/server.ex

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ defmodule Sequin.Tracer.Server do
5252

5353
def messages_acked(_consumer, []), do: :ok
5454

55-
def messages_acked(consumer, ack_ids) do
55+
def messages_acked(consumer, ack_ids) when is_list(ack_ids) do
5656
GenServer.cast(via_tuple(consumer.account_id), {:acked, consumer, ack_ids})
5757
end
5858

test/sequin/consumers_test.exs

+4
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,11 @@ defmodule Sequin.ConsumersTest do
9595
record1 = ConsumersFactory.insert_consumer_record!(consumer_id: consumer.id, state: :delivered)
9696
record2 = ConsumersFactory.insert_consumer_record!(consumer_id: consumer.id, state: :delivered)
9797

98+
record1 = %ConsumerRecord{record1 | payload_size_bytes: Size.bytes(100)}
99+
record2 = %ConsumerRecord{record2 | payload_size_bytes: Size.bytes(200)}
100+
98101
assert {:ok, 2} = Consumers.ack_messages(consumer, [record1.ack_id, record2.ack_id])
102+
assert {:ok, 2} = Consumers.after_messages_acked(consumer, [record1, record2])
99103

100104
assert {:ok, messages} = AcknowledgedMessages.fetch_messages(consumer.id)
101105
assert length(messages) == 2

0 commit comments

Comments
 (0)