Skip to content

Commit bc243cc

Browse files
committed
Use extreme with backpressure
1 parent db0fd8e commit bc243cc

File tree

7 files changed

+105
-70
lines changed

7 files changed

+105
-70
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ The format is based on [Keep a
66
Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
77
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).
88

9+
## 1.1.0 - 2024-06-18
10+
11+
- Use new `Extreme.ListenerWithBackPressure`
12+
913
## 1.0.1 - 2024-01-31
1014

1115
- Bump all dependencies including bumping `:extreme` to v1.0.5 which fixes a

config/test.exs

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
import Config
22

3+
config :logger, :console,
4+
format: "$time $metadata[$level] $message\n",
5+
level: :debug,
6+
metadata: [:pid, :module, :function]
7+
8+
config :ex_unit, capture_log: true
9+
310
config :kelvin, ExtremeClient,
411
db_type: :node,
512
host: System.get_env("EVENTSTORE_HOST") || "localhost",

lib/kelvin/in_order_subscription.ex

+28-33
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ defmodule Kelvin.InOrderSubscription do
3535

3636
defstruct [
3737
:config,
38-
:subscription,
38+
:extreme_listener,
3939
:self,
4040
:max_buffer_size,
4141
demand: 0,
@@ -53,13 +53,36 @@ defmodule Kelvin.InOrderSubscription do
5353
Keyword.get(
5454
opts,
5555
:catch_up_chunk_size,
56-
Application.get_env(:kelvin, :catch_up_chunk_size, 256)
56+
Application.get_env(:kelvin, :catch_up_chunk_size, 128)
57+
)
58+
59+
connection = Keyword.fetch!(opts, :connection)
60+
stream_name = Keyword.fetch!(opts, :stream_name)
61+
62+
listener_name =
63+
opts
64+
|> Keyword.get(:name, __MODULE__)
65+
|> Module.concat(ExtremeListener)
66+
67+
{:ok, extreme_listener} =
68+
Kelvin.Listener.start_link(connection, stream_name,
69+
read_per_page: max_buffer_size,
70+
auto_subscribe: false,
71+
ack_timeout: :infinity,
72+
name: listener_name,
73+
producer: self(),
74+
get_stream_position_fun: fn ->
75+
opts
76+
|> Keyword.fetch!(:restore_stream_position!)
77+
|> _do_function()
78+
end
5779
)
5880

5981
state = %__MODULE__{
82+
extreme_listener: extreme_listener,
6083
config: Map.new(opts),
6184
self: Keyword.get(opts, :name, self()),
62-
max_buffer_size: max_buffer_size
85+
max_buffer_size: max_buffer_size * 2
6386
}
6487

6588
Process.send_after(
@@ -92,27 +115,11 @@ defmodule Kelvin.InOrderSubscription do
92115
end
93116

94117
def handle_info(:subscribe, state) do
95-
if state.subscription do
96-
# coveralls-ignore-start
97-
Logger.warn("#{inspect(__MODULE__)} is already subscribed.")
98-
# coveralls-ignore-stop
99-
else
100-
case _subscribe(state) do
101-
{:ok, sub} ->
102-
Process.link(sub)
103-
{:noreply, [], put_in(state.subscription, sub)}
104-
105-
# coveralls-ignore-start
106-
{:error, reason} ->
107-
{:stop, reason, state}
118+
Kelvin.Listener.subscribe(state.extreme_listener)
108119

109-
# coveralls-ignore-stop
110-
end
111-
end
120+
{:noreply, [], state}
112121
end
113122

114-
def handle_info(_info, state), do: {:noreply, [], state}
115-
116123
@impl GenStage
117124
def handle_call({:on_event, event}, from, state) do
118125
# when the current demand is 0, we should
@@ -162,18 +169,6 @@ defmodule Kelvin.InOrderSubscription do
162169
end
163170
end
164171

165-
defp _subscribe(state) do
166-
state.config.connection
167-
|> Extreme.RequestManager._name()
168-
|> GenServer.call(
169-
{:read_and_stay_subscribed, self(),
170-
{state.config.stream_name,
171-
_do_function(state.config.restore_stream_position!) + 1,
172-
state.max_buffer_size, true, false, :infinity}},
173-
:infinity
174-
)
175-
end
176-
177172
defp _do_function(func) when is_function(func, 0), do: func.()
178173

179174
defp _do_function({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do

lib/kelvin/listener.ex

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
defmodule Kelvin.Listener do
2+
@moduledoc false
3+
use Extreme.ListenerWithBackPressure
4+
5+
@impl Extreme.ListenerWithBackPressure
6+
def on_init(opts) do
7+
state = %{
8+
producer: Keyword.fetch!(opts, :producer),
9+
get_stream_position_fun: Keyword.fetch!(opts, :get_stream_position_fun)
10+
}
11+
12+
{:ok, state}
13+
end
14+
15+
@impl Extreme.ListenerWithBackPressure
16+
def get_last_event(_stream_name, %{} = state),
17+
do: state.get_stream_position_fun.()
18+
19+
@impl Extreme.ListenerWithBackPressure
20+
def process_push(push, _stream_name, %{} = state),
21+
do: GenServer.call(state.producer, {:on_event, push})
22+
end

mix.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ defmodule Kelvin.MixProject do
4545
defp deps do
4646
[
4747
{:gen_stage, "~> 1.0"},
48-
{:extreme, "~> 1.0 and >= 1.0.5"},
48+
{:extreme, "~> 1.1.0"},
4949
# docs
5050
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
5151
# test

mix.lock

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
88
"excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"},
99
"exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"},
10-
"extreme": {:hex, :extreme, "1.0.5", "fafb04fb514ed63667cdd9385b313d7c67aa10887a9f8f1f290cb721d1ee0e48", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "84588993fcb8f410a3b90a2defaf966d50763a1f9db665a83e0546a34335fa45"},
10+
"extreme": {:hex, :extreme, "1.1.0", "ca31dd0e983e888659f8984e1bac73744f35f98064c1f0743426863a8293e2f7", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "96332ba5ab11dbfb0c4d75cd6be8ad5a7f6f3133a1dc80c6850b86d60b3a2e6a"},
1111
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
1212
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
13-
"gpb": {:hex, :gpb, "4.21.0", "7a2eb8dd0f3032b7b46b04dcdb490ffabe43eab3a9a1f905bd03c9ec35babb0f", [:make, :rebar3], [], "hexpm", "da45984d26048d8d508d3bbffa6f4a5a5163841cefbf40809622bf92b4640de4"},
14-
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
13+
"gpb": {:hex, :gpb, "4.21.1", "72e229c242d252d690addcfd04a6416c26c4d4d2c3521e05570a7a78b48d3bd1", [:make, :rebar3], [], "hexpm", "c05c9aea9e25bd341367a43b3d3eb68e951563911072259c5ec4cb6642f4ef22"},
14+
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
1515
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
1616
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
1717
"makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"},
1818
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
19-
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
19+
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
2020
}

test/kelvin/in_order_subscription_test.exs

+39-32
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
defmodule Kelvin.InOrderSubscriptionTest do
22
use ExUnit.Case, async: true
33

4-
@moduletag :capture_log
5-
64
alias Extreme.Messages
75

86
setup do
@@ -14,7 +12,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
1412

1513
describe "given events have been written to a stream" do
1614
setup c do
17-
write_events(0..100, c.stream_name)
15+
_write_events(0..100, c.stream_name)
1816
:ok
1917
end
2018

@@ -33,7 +31,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
3331
assert event.event.data == to_string(n)
3432
end
3533

36-
write_events(101..200, c.stream_name)
34+
_write_events(101..200, c.stream_name)
3735

3836
for n <- 101..200 do
3937
assert_receive {:events, [event]}, 1_000
@@ -65,15 +63,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
6563

6664
assert_receive {:DOWN, ^monitor_ref, _, _, _}
6765

68-
# we're hardcoding the restore_stream_position! function so this will
69-
# restart from 0 instead of the current stream position as would be the
70-
# case in a real-life system
71-
for n <- 0..100 do
72-
assert_receive {:events, [event]}, 10_000
73-
assert event.event.data == to_string(n)
74-
end
75-
76-
write_events(101..200, c.stream_name)
66+
_write_events(101..200, c.stream_name)
7767

7868
for n <- 101..200 do
7969
assert_receive {:events, [event]}, 1_000
@@ -83,28 +73,41 @@ defmodule Kelvin.InOrderSubscriptionTest do
8373
end
8474

8575
describe "given only a few events have been written to a stream" do
86-
setup c do
87-
write_events(0..10, c.stream_name)
88-
:ok
89-
end
90-
9176
test "a slow subscription catches up", c do
77+
total_events = 100
78+
9279
opts = [
9380
producer_name: c.producer_name,
9481
stream_name: c.stream_name,
9582
restore_stream_position!: &restore_stream_position!/0,
9683
test_proc: self(),
9784
# note how we add an artificial bottleneck to the consumer here
98-
sleep_time: 100,
85+
sleep_time: 10,
9986
# and tune down the catch-up (and therefore max buffer queue size)
100-
catch_up_chunk_size: 1
87+
catch_up_chunk_size: 1,
88+
subscribe_after: 1
10189
# in order to simulate a consumer which is slow and get coverage
10290
# on the supply-buffering we do with the queue
10391
]
10492

10593
start_supervised!({MyInOrderSupervisor, opts})
10694

107-
for n <- 0..10 do
95+
spawn(fn ->
96+
Process.sleep(200)
97+
_write_events(0..total_events, c.stream_name)
98+
end)
99+
100+
for n <- 0..total_events do
101+
assert_receive {:events, [event]}, 6_000
102+
assert event.event.data == to_string(n)
103+
end
104+
105+
spawn(fn ->
106+
Process.sleep(200)
107+
_write_events(0..total_events, c.stream_name)
108+
end)
109+
110+
for n <- 0..total_events do
108111
assert_receive {:events, [event]}, 6_000
109112
assert event.event.data == to_string(n)
110113
end
@@ -113,19 +116,23 @@ defmodule Kelvin.InOrderSubscriptionTest do
113116

114117
defp restore_stream_position!, do: -1
115118

116-
defp write_events(range, stream) do
119+
defp _write_events(range, stream) do
117120
range
118121
|> Enum.map(fn n ->
119-
Messages.NewEvent.new(
120-
event_id: Extreme.Tools.generate_uuid(),
121-
event_type: "kelvin_test_event",
122-
data_content_type: 1,
123-
metadata_content_type: 1,
124-
# valid JSON
125-
data: to_string(n),
126-
metadata: "{}"
127-
)
122+
Process.sleep(5)
123+
124+
[
125+
Messages.NewEvent.new(
126+
event_id: Extreme.Tools.generate_uuid(),
127+
event_type: "kelvin_test_event",
128+
data_content_type: 1,
129+
metadata_content_type: 1,
130+
# valid JSON
131+
data: to_string(n),
132+
metadata: "{}"
133+
)
134+
]
135+
|> ExtremeClient.append_events(stream)
128136
end)
129-
|> ExtremeClient.append_events(stream)
130137
end
131138
end

0 commit comments

Comments
 (0)