Skip to content

Commit 5f24b02

Browse files
committed
Improve batching logic
1 parent 341fd1d commit 5f24b02

3 files changed

Lines changed: 65 additions & 6 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
hex/posthog: patch
3+
---
4+
5+
Improve events batching logic and prevent SDK from sending empty batches

lib/posthog/sender.ex

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ defmodule PostHog.Sender do
88
:api_client,
99
:max_batch_time_ms,
1010
:max_batch_events,
11+
:timer_ref,
1112
events: [],
1213
num_events: 0
1314
]
@@ -74,24 +75,29 @@ defmodule PostHog.Sender do
7475
def handle_cast({:event, event}, state) do
7576
case state do
7677
%{num_events: n, events: events} when n + 1 >= state.max_batch_events ->
78+
if state.timer_ref, do: Process.cancel_timer(state.timer_ref, async: true, info: false)
79+
7780
{:noreply, %{state | events: [event | events], num_events: n + 1},
7881
{:continue, :send_batch}}
7982

8083
%{num_events: 0, events: events} ->
81-
Process.send_after(self(), :batch_time_reached, state.max_batch_time_ms)
84+
ref = :erlang.start_timer(state.max_batch_time_ms, self(), :batch_time_reached)
8285

83-
{:noreply, %{state | events: [event | events], num_events: 1}}
86+
{:noreply, %{state | events: [event | events], num_events: 1, timer_ref: ref}}
8487

8588
%{num_events: n, events: events} ->
8689
{:noreply, %{state | events: [event | events], num_events: n + 1}}
8790
end
8891
end
8992

9093
@impl GenServer
91-
def handle_info(:batch_time_reached, state) do
94+
def handle_info({:timeout, ref, :batch_time_reached}, %{num_events: n, timer_ref: ref} = state)
95+
when n > 0 do
9296
{:noreply, state, {:continue, :send_batch}}
9397
end
9498

99+
def handle_info({:timeout, _ref, :batch_time_reached}, state), do: {:noreply, state}
100+
95101
@impl GenServer
96102
def handle_continue(:send_batch, state) do
97103
# Before we initiate an HTTP request that might block the process
@@ -101,7 +107,7 @@ defmodule PostHog.Sender do
101107
Registry.update_value(state.registry, registry_key(state.index), fn _ -> :busy end)
102108
PostHog.API.batch(state.api_client, state.events)
103109
Registry.update_value(state.registry, registry_key(state.index), fn _ -> :available end)
104-
{:noreply, %{state | events: [], num_events: 0}}
110+
{:noreply, %{state | events: [], num_events: 0, timer_ref: nil}}
105111
end
106112

107113
@impl GenServer

test/posthog/sender_test.exs

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ defmodule PostHog.SenderTest do
9797
assert %{events: ["my_event"]} = :sys.get_state(pid)
9898
end
9999

100-
test "immediately sends after reaching max_batch_events", %{
100+
test "immediately sends after reaching max_batch_events and cancels timer", %{
101101
api_client: api_client,
102102
registry: registry
103103
} do
@@ -129,14 +129,18 @@ defmodule PostHog.SenderTest do
129129
end)
130130

131131
Sender.send("foo", @supervisor_name)
132+
133+
%{timer_ref: ref} = :sys.get_state(pid)
134+
assert is_reference(ref)
135+
132136
Sender.send("bar", @supervisor_name)
133137

134138
assert_receive :ready
135139

136140
[{^pid, :busy}] = Registry.lookup(registry, {PostHog.Sender, 1})
137141
send(pid, :go)
138142

139-
assert %{events: []} = :sys.get_state(pid)
143+
assert %{events: [], timer_ref: nil} = :sys.get_state(pid)
140144
[{^pid, :available}] = Registry.lookup(registry, {PostHog.Sender, 1})
141145
end
142146

@@ -207,5 +211,49 @@ defmodule PostHog.SenderTest do
207211

208212
assert :ok = GenServer.stop(pid)
209213
end
214+
215+
test "does not send empty batch", %{api_client: api_client, registry: registry} do
216+
test_pid = self()
217+
218+
pid =
219+
start_link_supervised!(
220+
{Sender,
221+
supervisor_name: @supervisor_name,
222+
index: 1,
223+
api_client: api_client,
224+
max_batch_time_ms: 60_000,
225+
max_batch_events: 2}
226+
)
227+
228+
expect(API.Mock, :request, fn _client, method, url, opts ->
229+
assert method == :post
230+
assert url == "/batch"
231+
232+
assert opts[:json] == %{
233+
batch: ["bar", "foo"]
234+
}
235+
236+
send(test_pid, :ready)
237+
238+
receive do
239+
:go -> :ok
240+
end
241+
242+
send(test_pid, :done)
243+
end)
244+
245+
Sender.send("foo", @supervisor_name)
246+
Sender.send("bar", @supervisor_name)
247+
248+
assert_receive :ready
249+
[{^pid, :busy}] = Registry.lookup(registry, {PostHog.Sender, 1})
250+
send(pid, :go)
251+
assert_receive :done
252+
%{timer_ref: ref} = :sys.get_state(pid)
253+
[{^pid, :available}] = Registry.lookup(registry, {PostHog.Sender, 1})
254+
255+
send(pid, {:timeout, ref, :batch_time_reached})
256+
refute_receive :ready
257+
end
210258
end
211259
end

0 commit comments

Comments
 (0)